Black Friday 2021 Offers 30% discount on Certification, Coaching, and SymfonyInsight Teams subscriptions

New in Symfony 5.4: Messenger Improvements

Symfony 5.4 is backed by Private Packagist. Private Packagist is a fast, reliable, and secure Composer repository for your private packages. It mirrors all your open-source dependencies for better availability and monitors them for security vulnerabilities.

Configurable handlers with PHP attributes

Contributed by
Alireza Mirsepassi
in #43588.

PHP attributes are a great way of adding metadata to PHP code. In Symfony we're adding the option of using PHP attributes to configure most things. That's why in Symfony 5.4 we're allowing to configure message handlers with attributes.

Instead of having to implement the MessageHandlerInterface, you can now add the AsMessageHandler attribute to any PHP class and use it as a message handler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler(fromTransport: 'async', priority: 10)]
class SmsNotificationHandler
{
    public function __invoke(SmsNotification $message)
    {
        // ...
    }
}

Worker metadata

Contributed by
Oleg Krasavin
in #42335.

Currently there's no simple way to get worker metadata such as its transport name. In Symfony 5.4 we're improving this with the introduction of a new WorkerMetadata class which is accessible via $worker->getWorkerMetadata(). For example, inside a method of some listener/subscriber that handles Symfony Messenger events you could use something like this:

1
2
3
4
5
6
7
8
9
public function resetServices(WorkerRunningEvent $event): void
{
    $actualTransportName = $event->getWorker()->getWorkerMetadata()->getTransportName();
    if (!$event->isWorkerIdle() || !in_array($actualTransportName, $this->receiversName, true)) {
        return;
    }

    $this->servicesResetter->reset();
}

Reset container services between messages

Contributed by
Grégoire Pineau
in #41163 and #43322.

Container services are not reset automatically when handling messages. This can be a problem for example with the Monolog fingers crossed handler. Since services are not reset, if the first message triggers an error, the next messages will log and ultimately overflow the buffer.

In Symfony 5.4 we're improving this situation with the option to automatically reset services after handling a message. To use this feature, set the new reset_on_message option to true in your messenger configuration:

1
2
3
4
5
6
7
8
9
# config/packages/messenger.yaml
framework:
    messenger:
        reset_on_message: true
        transports:
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
            failed: 'doctrine://default?queue_name=failed'
            sync: 'sync://'

Handle messages in batches

Contributed by
Nicolas Grekas
in #43354.

Sometimes, when using the Messenger component, you could handle multiple messages at once instead of processing them one by one. In Symfony 5.4 we've introduced a new BatchHandlerInterface that allows your handlers to process messages in batches.

Handlers implementing this interface should expect a new $ack optional argument to be provided when __invoke() is called. If you don't provide the $ack argument, the message is handled synchronously as usual. If you provide $ack, __invoke() is expected to buffer the message and its $ack function, and to return the number of pending messages in the batch. Here is what a batch handler might look like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class MyBatchHandler implements BatchHandlerInterface
{
    use BatchHandlerTrait;

    public function __invoke(MyMessage $message, Acknowledger $ack = null)
    {
        return $this->handle($message, $ack);
    }

    private function process(array $jobs): void
    {
        foreach ($jobs as [$message, $ack]) {
            try {
                // [...] compute $result from $message
                $ack->ack($result);
            } catch (\Throwable $e) {
                $ack->nack($e);
            }
        }
    }
}

The size of the batch is controlled by BatchHandlerTrait::shouldFlush() (defaults to 10).

Help the Symfony project!

As with any Open-Source project, contributing code or documentation is the most common way to help, but we also have a wide range of sponsoring opportunities.

Comments

The new option "reset_on_message" is a smooth way to introduce a breaking change. It must be set to "true" in 5.4 in order to solve the deprecation notice, but can be unset when upgrading to 6.0 since the only allowed value is "true". In 6.1, this option will be deprecated and it will be removed in 7.0.
In example config "reset_on_message: true" must be in "messenger" section not in "async".
Seems like should be "BatchHandlerTrait::shouldFlush()" instead of "BatchHandlerTrait::shouldProcess()".
Thanks for reporting those issues! I've just updated the blog post contents.
Now "reset_on_message: true" is in "transports" section but it must be in "messenger".
@Alex I've fixed it again. Hopefully this time for real :)
Does the reset also affect doctrine? We had issues in the past where doctrine was bloating even when clearing the entity Manager
What's wrong with having to implement interfaces? (instead of using attributes)
@Thomas problem is, if you add the attribute AND the interface. You handler will be registered twice. One for all transports and one for this specific transport.
Login with SymfonyConnect to post a comment