Skip to content

Transactional Messages: Handle New Messages After Handling is Done

Warning: You are browsing the documentation for Symfony 4.x, which is no longer maintained.

Read the updated version of this page for Symfony 7.2 (the current stable version).

A message handler can dispatch new messages while handling others, to either the same or a different bus (if the application has multiple buses). Any errors or exceptions that occur during this process can have unintended consequences, such as:

  • If using the DoctrineTransactionMiddleware and a dispatched message throws an exception, then any database transactions in the original handler will be rolled back.
  • If the message is dispatched to a different bus, then the dispatched message will be handled even if some code later in the current handler throws an exception.

An Example RegisterUser Process

Let's take the example of an application with both a command and an event bus. The application dispatches a command named RegisterUser to the command bus. The command is handled by the RegisterUserHandler which creates a User object, stores that object to a database and dispatches a UserRegistered message to the event bus.

There are many handlers to the UserRegistered message, one handler may send a welcome email to the new user. We are using the DoctrineTransactionMiddleware to wrap all database queries in one database transaction.

Problem 1: If an exception is thrown when sending the welcome email, then the user will not be created because the DoctrineTransactionMiddleware will rollback the Doctrine transaction, in which the user has been created.

Problem 2: If an exception is thrown when saving the user to the database, the welcome email is still sent because it is handled asynchronously.

DispatchAfterCurrentBusMiddleware Middleware

For many applications, the desired behavior is to only handle messages that are dispatched by a handler once that handler has fully finished. This can be done by using the DispatchAfterCurrentBusMiddleware and adding a DispatchAfterCurrentBusStamp stamp to the message Envelope:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// src/Messenger/CommandHandler/RegisterUserHandler.php
namespace App\Messenger\CommandHandler;

use App\Entity\User;
use App\Messenger\Command\RegisterUser;
use App\Messenger\Event\UserRegistered;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;

class RegisterUserHandler
{
    private $eventBus;
    private $em;

    public function __construct(MessageBusInterface $eventBus, EntityManagerInterface $em)
    {
        $this->eventBus = $eventBus;
        $this->em = $em;
    }

    public function __invoke(RegisterUser $command)
    {
        $user = new User($command->getUuid(), $command->getName(), $command->getEmail());
        $this->em->persist($user);

        // The DispatchAfterCurrentBusStamp marks the event message to be handled
        // only if this handler does not throw an exception.

        $event = new UserRegistered($command->getUuid());
        $this->eventBus->dispatch(
            (new Envelope($event))
                ->with(new DispatchAfterCurrentBusStamp())
        );

        // ...
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// src/Messenger/EventSubscriber/WhenUserRegisteredThenSendWelcomeEmail.php
namespace App\Messenger\EventSubscriber;

use App\Entity\User;
use App\Messenger\Event\UserRegistered;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Mailer\MailerInterface;
use Symfony\Component\Mime\RawMessage;

class WhenUserRegisteredThenSendWelcomeEmail
{
    private $mailer;
    private $em;

    public function __construct(MailerInterface $mailer, EntityManagerInterface $em)
    {
        $this->mailer = $mailer;
        $this->em = $em;
    }

    public function __invoke(UserRegistered $event)
    {
        $user = $this->em->getRepository(User::class)->find($event->getUuid());

        $this->mailer->send(new RawMessage('Welcome '.$user->getFirstName()));
    }
}

This means that the UserRegistered message would not be handled until after the RegisterUserHandler had completed and the new User was persisted to the database. If the RegisterUserHandler encounters an exception, the UserRegistered event will never be handled. And if an exception is thrown while sending the welcome email, the Doctrine transaction will not be rolled back.

Note

If WhenUserRegisteredThenSendWelcomeEmail throws an exception, that exception will be wrapped into a DelayedMessageHandlingException. Using DelayedMessageHandlingException::getExceptions will give you all exceptions that are thrown while handling a message with the DispatchAfterCurrentBusStamp.

The dispatch_after_current_bus middleware is enabled by default. If you're configuring your middleware manually, be sure to register dispatch_after_current_bus before doctrine_transaction in the middleware chain. Also, the dispatch_after_current_bus middleware must be loaded for all of the buses being used.

This work, including the code samples, is licensed under a Creative Commons BY-SA 3.0 license.
TOC
    Version