Step 18: Going Async
Going Async¶
Checking for spam during the handling of the form submission might lead to some problems. If the Akismet API becomes slow, our website will also be slow for users. But even worse, if we hit a timeout or if the Akismet API is unavailable, we might lose comments.
Ideally, we should store the submitted data without publishing it, and immediately return a response. Checking for spam can then be done out of band.
Flagging Comments¶
We need to introduce a state
for comments: submitted
, spam
, and
published
.
Add the state
property to the Comment
class:
1 | $ symfony console make:entity Comment
|
Create a database migration:
1 | $ symfony console make:migration
|
Modify the migration to update all existing comments to be published
by
default:
1 2 3 4 5 6 7 8 9 10 11 12 13 | --- a/migrations/Version00000000000000.php
+++ b/migrations/Version00000000000000.php
@@ -20,7 +20,9 @@ final class Version20200714155905 extends AbstractMigration
public function up(Schema $schema) : void
{
// this up() migration is auto-generated, please modify it to your needs
- $this->addSql('ALTER TABLE comment ADD state VARCHAR(255) NOT NULL');
+ $this->addSql('ALTER TABLE comment ADD state VARCHAR(255)');
+ $this->addSql("UPDATE comment SET state='published'");
+ $this->addSql('ALTER TABLE comment ALTER COLUMN state SET NOT NULL');
}
public function down(Schema $schema) : void
|
Migrate the database:
1 | $ symfony console doctrine:migrations:migrate
|
We should also make sure that, by default, the state
is set to
submitted
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | --- a/src/Entity/Comment.php
+++ b/src/Entity/Comment.php
@@ -55,9 +55,9 @@ class Comment
private $photoFilename;
/**
- * @ORM\Column(type="string", length=255)
+ * @ORM\Column(type="string", length=255, options={"default": "submitted"})
*/
- private $state;
+ private $state = 'submitted';
public function __toString(): string
{
|
Update the EasyAdmin configuration to be able to see the comment’s state:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | --- a/config/packages/easy_admin.yaml
+++ b/config/packages/easy_admin.yaml
@@ -18,6 +18,7 @@ easy_admin:
- author
- { property: 'email', type: 'email' }
- { property: 'photoFilename', type: 'image', 'base_path': "/uploads/photos", label: 'Photo' }
+ - state
- { property: 'createdAt', type: 'datetime' }
sort: ['createdAt', 'ASC']
filters: ['conference']
@@ -26,5 +27,6 @@ easy_admin:
- { property: 'conference' }
- { property: 'createdAt', type: datetime, type_options: { disabled: true } }
- 'author'
+ - { property: 'state' }
- { property: 'email', type: 'email' }
- text
|
Don’t forget to also update the tests by setting the state
of the fixtures:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | --- a/src/DataFixtures/AppFixtures.php
+++ b/src/DataFixtures/AppFixtures.php
@@ -37,8 +37,16 @@ class AppFixtures extends Fixture
$comment1->setAuthor('Fabien');
$comment1->setEmail('[email protected]');
$comment1->setText('This was a great conference.');
+ $comment1->setState('published');
$manager->persist($comment1);
+ $comment2 = new Comment();
+ $comment2->setConference($amsterdam);
+ $comment2->setAuthor('Lucas');
+ $comment2->setEmail('[email protected]');
+ $comment2->setText('I think this one is going to be moderated.');
+ $manager->persist($comment2);
+
$admin = new Admin();
$admin->setRoles(['ROLE_ADMIN']);
$admin->setUsername('admin');
|
For the controller tests, simulate the validation:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | --- a/tests/Controller/ConferenceControllerTest.php
+++ b/tests/Controller/ConferenceControllerTest.php
@@ -2,6 +2,8 @@
namespace App\Tests\Controller;
+use App\Repository\CommentRepository;
+use Doctrine\ORM\EntityManagerInterface;
use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
class ConferenceControllerTest extends WebTestCase
@@ -22,10 +24,16 @@ class ConferenceControllerTest extends WebTestCase
$client->submitForm('Submit', [
'comment_form[author]' => 'Fabien',
'comment_form[text]' => 'Some feedback from an automated functional test',
- 'comment_form[email]' => '[email protected]',
+ 'comment_form[email]' => $email = '[email protected]',
'comment_form[photo]' => dirname(__DIR__, 2).'/public/images/under-construction.gif',
]);
$this->assertResponseRedirects();
+
+ // simulate comment validation
+ $comment = self::$container->get(CommentRepository::class)->findOneByEmail($email);
+ $comment->setState('published');
+ self::$container->get(EntityManagerInterface::class)->flush();
+
$client->followRedirect();
$this->assertSelectorExists('div:contains("There are 2 comments")');
}
|
From a PHPUnit test, you can get any service from the container via
self::$container->get()
; it also gives access to non-public services.
Understanding Messenger¶
Managing asynchronous code with Symfony is the job of the Messenger Component:
1 | $ symfony composer req messenger
|
When some logic should be executed asynchronously, send a message to a messenger bus. The bus stores the message in a queue and returns immediately to let the flow of operations resume as fast as possible.
A consumer runs continuously in the background to read new messages on the queue and execute the associated logic. The consumer can run on the same server as the web application or on a separate one.
It is very similar to the way HTTP requests are handled, except that we don’t have responses.
Coding a Message Handler¶
A message is a data object class that should not hold any logic. It will be serialized to be stored in a queue, so only store “simple” serializable data.
Create the CommentMessage
class:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | namespace App\Message;
class CommentMessage
{
private $id;
private $context;
public function __construct(int $id, array $context = [])
{
$this->id = $id;
$this->context = $context;
}
public function getId(): int
{
return $this->id;
}
public function getContext(): array
{
return $this->context;
}
}
|
In the Messenger world, we don’t have controllers, but message handlers.
Create a CommentMessageHandler
class under a new App\MessageHandler
namespace that knows how to handle CommentMessage
messages:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | namespace App\MessageHandler;
use App\Message\CommentMessage;
use App\Repository\CommentRepository;
use App\SpamChecker;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
class CommentMessageHandler implements MessageHandlerInterface
{
private $spamChecker;
private $entityManager;
private $commentRepository;
public function __construct(EntityManagerInterface $entityManager, SpamChecker $spamChecker, CommentRepository $commentRepository)
{
$this->entityManager = $entityManager;
$this->spamChecker = $spamChecker;
$this->commentRepository = $commentRepository;
}
public function __invoke(CommentMessage $message)
{
$comment = $this->commentRepository->find($message->getId());
if (!$comment) {
return;
}
if (2 === $this->spamChecker->getSpamScore($comment, $message->getContext())) {
$comment->setState('spam');
} else {
$comment->setState('published');
}
$this->entityManager->flush();
}
}
|
MessageHandlerInterface
is a marker interface. It only helps Symfony
auto-register and auto-configure the class as a Messenger handler. By
convention, the logic of a handler lives in a method called __invoke()
. The
CommentMessage
type hint on this method’s one argument tells Messenger
which class this will handle.
Update the controller to use the new system:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | --- a/src/Controller/ConferenceController.php
+++ b/src/Controller/ConferenceController.php
@@ -5,14 +5,15 @@ namespace App\Controller;
use App\Entity\Comment;
use App\Entity\Conference;
use App\Form\CommentFormType;
+use App\Message\CommentMessage;
use App\Repository\CommentRepository;
use App\Repository\ConferenceRepository;
-use App\SpamChecker;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\File\Exception\FileException;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
+use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\Annotation\Route;
use Twig\Environment;
@@ -20,11 +21,13 @@ class ConferenceController extends AbstractController
{
private $twig;
private $entityManager;
+ private $bus;
- public function __construct(Environment $twig, EntityManagerInterface $entityManager)
+ public function __construct(Environment $twig, EntityManagerInterface $entityManager, MessageBusInterface $bus)
{
$this->twig = $twig;
$this->entityManager = $entityManager;
+ $this->bus = $bus;
}
/**
@@ -40,7 +43,7 @@ class ConferenceController extends AbstractController
/**
* @Route("/conference/{slug}", name="conference")
*/
- public function show(Request $request, Conference $conference, CommentRepository $commentRepository, SpamChecker $spamChecker, string $photoDir): Response
+ public function show(Request $request, Conference $conference, CommentRepository $commentRepository, string $photoDir): Response
{
$comment = new Comment();
$form = $this->createForm(CommentFormType::class, $comment);
@@ -58,6 +61,7 @@ class ConferenceController extends AbstractController
}
$this->entityManager->persist($comment);
+ $this->entityManager->flush();
$context = [
'user_ip' => $request->getClientIp(),
@@ -65,11 +69,8 @@ class ConferenceController extends AbstractController
'referrer' => $request->headers->get('referer'),
'permalink' => $request->getUri(),
];
- if (2 === $spamChecker->getSpamScore($comment, $context)) {
- throw new \RuntimeException('Blatant spam, go away!');
- }
- $this->entityManager->flush();
+ $this->bus->dispatch(new CommentMessage($comment->getId(), $context));
return $this->redirectToRoute('conference', ['slug' => $conference->getSlug()]);
}
|
Instead of depending on the Spam Checker, we now dispatch a message on the bus. The handler then decides what to do with it.
We have achieved something unexpected. We have decoupled our controller from the Spam Checker and moved the logic to a new class, the handler. It is a perfect use case for the bus. Test the code, it works. Everything is still done synchronously, but the code is probably already “better”.
Restricting Displayed Comments¶
Update the display logic to avoid non-published comments from appearing on the frontend:
1 2 3 4 5 6 7 8 9 10 11 12 | --- a/src/Repository/CommentRepository.php
+++ b/src/Repository/CommentRepository.php
@@ -27,7 +27,9 @@ class CommentRepository extends ServiceEntityRepository
{
$query = $this->createQueryBuilder('c')
->andWhere('c.conference = :conference')
+ ->andWhere('c.state = :state')
->setParameter('conference', $conference)
+ ->setParameter('state', 'published')
->orderBy('c.createdAt', 'DESC')
->setMaxResults(self::PAGINATOR_PER_PAGE)
->setFirstResult($offset)
|
Going Async for Real¶
By default, handlers are called synchronously. To go async, you need to
explicitly configure which queue to use for each handler in the
config/packages/messenger.yaml
configuration file:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | --- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -5,10 +5,10 @@ framework:
transports:
# https://symfony.com/doc/current/messenger.html#transport-configuration
- # async: '%env(MESSENGER_TRANSPORT_DSN)%'
+ async: '%env(RABBITMQ_DSN)%'
# failed: 'doctrine://default?queue_name=failed'
# sync: 'sync://'
routing:
# Route your messages to the transports
- # 'App\Message\YourMessage': async
+ App\Message\CommentMessage: async
|
The configuration tells the bus to send instances of
App\Message\CommentMessage
to the async
queue, which is defined by a
DSN, stored in the RABBITMQ_DSN
environment variable.
Adding RabbitMQ to the Docker Stack¶
As you might have guessed, we are going to use RabbitMQ:
1 2 3 4 5 6 7 8 9 10 | --- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -12,3 +12,7 @@ services:
redis:
image: redis:5-alpine
ports: [6379]
+
+ rabbitmq:
+ image: rabbitmq:3.7-management
+ ports: [5672, 15672]
|
Restarting Docker Services¶
To force Docker Compose to take the RabbitMQ container into account, stop the containers and restart them:
1 2 | $ docker-compose stop
$ docker-compose up -d
|
1 | $ sleep 10
|
Consuming Messages¶
If you try to submit a new comment, the spam checker won’t be called anymore.
Add an error_log()
call in the getSpamScore()
method to confirm.
Instead, a message is waiting in RabbitMQ, ready to be consumed by some
processes.
As you might imagine, Symfony comes with a consumer command. Run it now:
1 | $ symfony console messenger:consume async -vv
|
It should immediately consume the message dispatched for the submitted comment:
1 2 3 4 5 6 7 8 9 10 11 | [OK] Consuming messages from transports "async".
// The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.
// Quit the worker with CONTROL-C.
11:30:20 INFO [messenger] Received message App\Message\CommentMessage ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]
11:30:20 INFO [http_client] Request: "POST https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO [http_client] Response: "200 https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO [messenger] Message App\Message\CommentMessage handled by App\MessageHandler\CommentMessageHandler::__invoke ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage","handler" => "App\MessageHandler\CommentMessageHandler::__invoke"]
11:30:20 INFO [messenger] App\Message\CommentMessage was handled successfully (acknowledging to transport). ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]
|
The message consumer activity is logged, but you get instant feedback on the
console by passing the -vv
flag. You should even be able to spot the call
to the Akismet API.
To stop the consumer, press Ctrl+C
.
Exploring the RabbitMQ Web Management Interface¶
If you want to see queues and messages flowing through RabbitMQ, open its web management interface:
1 | $ symfony open:local:rabbitmq
|
Or from the web debug toolbar:

Use guest
/guest
to login to the RabbitMQ management UI:

Running Workers in the Background¶
Instead of launching the consumer every time we post a comment and stopping it immediately after, we want to run it continuously without having too many terminal windows or tabs open.
The Symfony CLI can manage such background commands or workers by using the
daemon flag (-d
) on the run
command.
Run the message consumer again, but send it in the background:
1 | $ symfony run -d --watch=config,src,templates,vendor symfony console messenger:consume async
|
The --watch
option tells Symfony that the command must be restarted
whenever there is a filesystem change in the config/
, src/
,
templates/
, or vendor/
directories.
Note
Do not use -vv
as you would have duplicated messages in server:log
(logged messages and console messages).
If the consumer stops working for some reason (memory limit, bug, …), it will be restarted automatically. And if the consumer fails too fast, the Symfony CLI will give up.
Logs are streamed via symfony server:log
with all the other logs coming
from PHP, the web server, and the application:
1 | $ symfony server:log
|
Use the server:status
command to list all background workers managed for
the current project:
1 2 3 4 | $ symfony server:status
Web server listening on https://127.0.0.1:8000
Command symfony console messenger:consume async running with PID 15774 (watching config/, src/, templates/)
|
To stop a worker, stop the web server or kill the PID given by the
server:status
command:
1 | $ kill 15774
|
Retrying Failed Messages¶
What if Akismet is down while consuming a message? There is no impact for people submitting comments, but the message is lost and spam is not checked.
Messenger has a retry mechanism for when an exception occurs while handling a message. Let’s configure it:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | --- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -5,10 +5,17 @@ framework:
transports:
# https://symfony.com/doc/current/messenger.html#transport-configuration
- async: '%env(RABBITMQ_DSN)%'
- # failed: 'doctrine://default?queue_name=failed'
+ async:
+ dsn: '%env(RABBITMQ_DSN)%'
+ retry_strategy:
+ max_retries: 3
+ multiplier: 2
+
+ failed: 'doctrine://default?queue_name=failed'
# sync: 'sync://'
+ failure_transport: failed
+
routing:
# Route your messages to the transports
App\Message\CommentMessage: async
|
If a problem occurs while handling a message, the consumer will retry 3 times
before giving up. But instead of discarding the message, it will store it in a
more permanent storage, the failed
queue, which uses the Doctrine database.
Inspect failed messages and retry them via the following commands:
1 2 3 | $ symfony console messenger:failed:show
$ symfony console messenger:failed:retry
|
Deploying RabbitMQ¶
Adding RabbitMQ to the production servers can be done by adding it to the list of services:
1 2 3 4 5 6 7 8 9 10 11 | --- a/.symfony/services.yaml
+++ b/.symfony/services.yaml
@@ -5,3 +5,8 @@ db:
rediscache:
type: redis:5.0
+
+queue:
+ type: rabbitmq:3.7
+ disk: 1024
+ size: S
|
Reference it in the web container configuration as well and enable the amqp
PHP extension:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | --- a/.symfony.cloud.yaml
+++ b/.symfony.cloud.yaml
@@ -4,6 +4,7 @@ type: php:7.4
runtime:
extensions:
+ - amqp
- redis
- pdo_pgsql
- apcu
@@ -26,6 +27,7 @@ disk: 512
relationships:
database: "db:postgresql"
redis: "rediscache:redis"
+ rabbitmq: "queue:rabbitmq"
web:
locations:
|
When the RabbitMQ service is installed on a project, you can access its web management interface by opening the tunnel first:
1 2 3 4 5 | $ symfony tunnel:open
$ symfony open:remote:rabbitmq
# when done
$ symfony tunnel:close
|
Running Workers on SymfonyCloud¶
To consume messages from RabbitMQ, we need to run the messenger:consume
command continuously. On SymfonyCloud, this is the role of a worker:
1 2 3 4 5 6 7 8 9 10 11 | --- a/.symfony.cloud.yaml
+++ b/.symfony.cloud.yaml
@@ -54,3 +54,8 @@ hooks:
set -x -e
(>&2 symfony-deploy)
+
+workers:
+ messages:
+ commands:
+ start: symfony console messenger:consume async -vv --time-limit=3600 --memory-limit=128M
|
Like for the Symfony CLI, SymfonyCloud manages restarts and logs.
To get logs for a worker, use:
1 | $ symfony logs --worker=messages all
|
- « Previous Step 17: Testing
- Next » Step 19: Making Decisions with a Workflow
This work, including the code samples, is licensed under a Creative Commons BY-NC-SA 4.0 license.