The Symfony Messenger component continues to evolve in Symfony 8.1 with improvements across worker execution, transport behavior, serialization, and failure handling.

Batch Fetching Messages

Nicolas Grekas
Contributed by Nicolas Grekas in #63662

In previous Symfony versions, workers fetched one message at a time from the transport, requiring one network round-trip per message. Symfony 8.1 adds a --fetch-size option to the messenger:consume command so workers can request multiple messages in a single call:

1
$ php bin/console messenger:consume async --fetch-size=8

Transports use this hint when supported by their underlying protocol (Amazon SQS up to 10 messages per ReceiveMessage call, Redis XREADGROUP COUNT, Doctrine LIMIT, AMQP repeated basic_get).

Configurable Service Reset Interval

Nicolas Grekas
Contributed by Nicolas Grekas in #63666

By default, messenger:consume resets services after each message to avoid leaking state between handlers. The --no-reset option disables resets entirely, which improves performance but can cause memory or state issues in long-running workers.

Symfony 8.1 introduces a middle ground by allowing an integer value for --no-reset. Services are then reset every N messages instead of after every message:

1
2
3
4
5
# default: reset services after every message
$ php bin/console messenger:consume async

# reset services every 100 messages (new in 8.1)
$ php bin/console messenger:consume async --no-reset=100

Custom Serialized Type Name for Messages

Grégoire Pineau
Contributed by Grégoire Pineau in #63061

When applications exchange messages through a broker, they must agree on how messages are encoded. Symfony's Serializer transport currently stores the PHP fully-qualified class name in a type header, but that value is often not useful for non-Symfony consumers, or even Symfony applications using different namespaces.

Symfony 8.1 adds a serializedTypeName argument to the #[AsMessage] attribute. Its value is used as the type header instead of the FQCN:

1
2
3
4
5
6
7
8
9
10
11
12
namespace App\Crawler\Message;

use Symfony\Component\Messenger\Attribute\AsMessage;

#[AsMessage(serializedTypeName: 'crawler.vectorization_finished')]
final readonly class VectorizationFinished
{
    public function __construct(
        public string $crawlId,
    ) {
    }
}

Per-Message Priority on AMQP

Valentin Nazarov
Contributed by Valentin Nazarov in #41574

RabbitMQ supports message priorities through the priority property, allowing higher-priority messages to overtake lower-priority ones within the same queue. Symfony 8.1 exposes this through a new AmqpPriorityStamp for the AMQP transport, similar to the existing BeanstalkdPriorityStamp:

1
2
3
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpPriorityStamp;

$bus->dispatch($message, [new AmqpPriorityStamp(5)]);

Per-message priority support is intentionally limited to AMQP because transports such as Redis, Doctrine, or SQS do not support native priorities. For cross-transport priority routing, configure separate queues and consume them in priority order:

1
$ php bin/console messenger:consume high_priority low_priority

Idle Timeout for Batch Handlers

HypeMC
Contributed by HypeMC in #63277

The BatchHandlerTrait lets a single handler process messages in batches once the configured batch size is reached. However, in low-throughput scenarios, batches can take a long time to fill, leaving messages buffered in memory.

Symfony 8.1 adds an optional getIdleTimeout() method to the trait. When the worker remains idle for the configured duration, the current batch is flushed even if it is not full:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;

class IndexProductsHandler implements BatchHandlerInterface
{
    use BatchHandlerTrait;

    // flush partial batches after 5 seconds of inactivity
    private function getIdleTimeout(): ?float
    {
        return 5.0;
    }

    private function shouldFlush(): bool
    {
        return 100 <= \count($this->jobs);
    }

    // ...
}

Non-Blocking PostgreSQL LISTEN/NOTIFY

d-ph
Contributed by d-ph in #47666

The Doctrine transport uses PostgreSQL's LISTEN/NOTIFY feature to wake workers as soon as new messages arrive instead of relying on polling.

Before Symfony 8.1, the blocking LISTEN call happened inside the transport itself. When consuming from several PostgreSQL queues with priorities, the worker could block on the first queue and never check the others.

Symfony 8.1 moves the blocking LISTEN/NOTIFY wait into a dedicated worker subscriber triggered on the idle event. The worker loop now checks all transports in priority order on every iteration and only blocks on NOTIFY after all queues are found empty.

This change is fully transparent. Existing applications using the Doctrine transport with PostgreSQL benefit automatically.

Decode Failures Routed Through Failure Handling

Nicolas Grekas
Contributed by Nicolas Grekas in #62888

Previously, when Messenger could not decode a message, for example because its class no longer existed after a refactor, the transport raised a MessageDecodingFailedException and silently discarded the message from the queue.

Symfony 8.1 now routes decode failures through the normal failure-handling pipeline. The message remains acknowledgeable, is wrapped in a MessageDecodingFailedException envelope, and goes through the configured retry and failure transports like any other failed message.

A new DecodeFailedMessageMiddleware retries decoding on every retry attempt. If a fix is deployed later, such as restoring a missing class or updating a serializer, the message can decode successfully on a subsequent attempt and continue through the bus with all original stamps preserved.

Listable Redis Receiver

Mudassar Ali
Contributed by Mudassar Ali in #61462

The ListableReceiverInterface lets monitoring tools and bundles such as zenstruck/messenger-monitor-bundle inspect pending messages without consuming them. The Doctrine transport already implemented this interface; Symfony 8.1 adds the same support to the Redis transport.

RedisReceiver now exposes all() and find() methods backed by Redis Stream XRANGE commands:

1
2
3
4
5
// list all pending messages on the transport
$envelopes = $receiver->all();

// find a specific message by Redis stream ID
$envelope = $receiver->find('1700000000000-0');

Force Redis Cluster via DSN

Alex Vlasov
Contributed by Alex Vlasov in #54866

In previous Symfony versions, configuring a Redis Cluster transport required listing every cluster node in the DSN. This approach is fragile when cluster nodes change and impossible when only a load-balanced endpoint is exposed.

Symfony 8.1 adds a redis_cluster=true option to the Redis transport DSN. When enabled, the Redis client connects in cluster mode through a single endpoint and relies on cluster discovery for the remaining nodes:

1
2
3
4
5
# before: enumerate every node in the cluster
MESSENGER_TRANSPORT_DSN=redis://redis-0:6379,redis://redis-1:6379

# after: a single endpoint with redis_cluster=true
MESSENGER_TRANSPORT_DSN=redis://redis-cluster:6379?redis_cluster=true

Delayed Quorum Queues for AMQP

miquel-angel
Contributed by miquel-angel in #60298

The AMQP transport uses dedicated delay queues to defer message delivery. With RabbitMQ quorum queues, these delay queues require an explicit x-expires argument so the broker eventually deletes them, but the value must outlive every delayed message still stored in the queue.

Symfony 8.1 rewrites the delay queue strategy for quorum queues to use one queue per day, with an expiration set to 1 day + delay + 10 seconds. Messages scheduled for the same calendar day now share a queue, and that queue is guaranteed to survive long enough for the slowest message to be delivered.

This fixes the long-standing issue where delayed quorum queues could expire before their messages became due.

Disable Default AMQP Queue Binding

Max
Contributed by Max in #63346

When declaring an AMQP transport without explicitly listing queues, messenger:setup-transports creates a default messages queue and binds it to the configured exchange. This is useful for consumers but unnecessary for write-only transports that only publish to an exchange.

Symfony 8.1 lets you disable default queue creation by setting queues to [] or false:

1
2
3
4
5
6
7
8
9
10
11
12
13
framework:
    messenger:
        transports:
            # write-only transport: no queue is declared or bound
            outgoing_events:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange:
                        name: 'EVENTS'
                        type: fanout
                    queues: false

            # ...

Release Deduplication Lock on Definitive Failure

Ousama Ben Younes
Contributed by Ousama Ben Younes in #64070

Symfony 7.3 introduced the deduplication middleware, which acquires a lock keyed by a DeduplicateStamp so duplicate dispatches of the same message are skipped. The lock is released after the message is successfully handled, or kept until expiration (300 seconds by default) while the message keeps failing.

In Symfony 8.1, when a message fails definitively, meaning the retry strategy gives up and the message is moved to the failure transport, the deduplication lock is released immediately. New dispatches using the same key can then enter the queue right away instead of being skipped until the TTL expires.

Published in #Living on the edge