Étape 18: Faire de l’asynchrone

5.0 version
Maintained

Faire de l’asynchrone

Vérifier la présence de spam pendant le traitement de la soumission du formulaire peut entraîner certains problèmes. Si l’API d’Akismet devient lente, notre site web sera également lent pour les internautes. Mais pire encore, si nous atteignons le délai d’attente maximal ou si l’API d’Akismet n’est pas disponible, nous pourrions perdre des commentaires.

Idéalement, nous devrions stocker les données soumises, sans les publier, et renvoyer une réponse immédiatement. La vérification du spam pourra être faite par la suite.

Marquer les commentaires

Nous avons besoin d’introduire un état (state) pour les commentaires : submitted, spam et published.

Ajoutez la propriété state à la classe Comment :

1
$ symfony console make:entity Comment

Créez une migration de base de données :

1
$ symfony console make:migration

Modifiez la migration pour mettre à jour tous les commentaires existants comme étant published par défaut :

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
--- a/migrations/Version00000000000000.php
+++ b/migrations/Version00000000000000.php
@@ -20,7 +20,9 @@ final class Version20200714155905 extends AbstractMigration
     public function up(Schema $schema) : void
     {
         // this up() migration is auto-generated, please modify it to your needs
-        $this->addSql('ALTER TABLE comment ADD state VARCHAR(255) NOT NULL');
+        $this->addSql('ALTER TABLE comment ADD state VARCHAR(255)');
+        $this->addSql("UPDATE comment SET state='published'");
+        $this->addSql('ALTER TABLE comment ALTER COLUMN state SET NOT NULL');
     }

     public function down(Schema $schema) : void

Migrez la base de données :

1
$ symfony console doctrine:migrations:migrate

Nous devrions également nous assurer que, par défaut, le paramètre state est initialisé avec la valeur submitted :

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
--- a/src/Entity/Comment.php
+++ b/src/Entity/Comment.php
@@ -49,9 +49,9 @@ class Comment
     private $photoFilename;

     /**
-     * @ORM\Column(type="string", length=255)
+     * @ORM\Column(type="string", length=255, options={"default": "submitted"})
      */
-    private $state;
+    private $state = 'submitted';

     public function __toString(): string
     {

Modifiez la configuration d’EasyAdmin pour voir l’état du commentaire :

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
--- a/config/packages/easy_admin.yaml
+++ b/config/packages/easy_admin.yaml
@@ -18,6 +18,7 @@ easy_admin:
                     - author
                     - { property: 'email', type: 'email' }
                     - { property: 'photoFilename', type: 'image', 'base_path': "/uploads/photos", label: 'Photo' }
+                    - state
                     - { property: 'createdAt', type: 'datetime' }
                 sort: ['createdAt', 'ASC']
                 filters: ['conference']
@@ -26,5 +27,6 @@ easy_admin:
                     - { property: 'conference' }
                     - { property: 'createdAt', type: datetime, type_options: { attr: { readonly: true } } }
                     - 'author'
+                    - { property: 'state' }
                     - { property: 'email', type: 'email' }
                     - text

N’oubliez pas de modifier les tests en renseignant le state dans les fixtures :

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
--- a/src/DataFixtures/AppFixtures.php
+++ b/src/DataFixtures/AppFixtures.php
@@ -37,8 +37,16 @@ class AppFixtures extends Fixture
         $comment1->setAuthor('Fabien');
         $comment1->setEmail('[email protected]');
         $comment1->setText('This was a great conference.');
+        $comment1->setState('published');
         $manager->persist($comment1);

+        $comment2 = new Comment();
+        $comment2->setConference($amsterdam);
+        $comment2->setAuthor('Lucas');
+        $comment2->setEmail('[email protected]');
+        $comment2->setText('I think this one is going to be moderated.');
+        $manager->persist($comment2);
+
         $admin = new Admin();
         $admin->setRoles(['ROLE_ADMIN']);
         $admin->setUsername('admin');

Pour les tests du contrôleur, simulez la validation :

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
--- a/tests/Controller/ConferenceControllerTest.php
+++ b/tests/Controller/ConferenceControllerTest.php
@@ -2,6 +2,8 @@

 namespace App\Tests\Controller;

+use App\Repository\CommentRepository;
+use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;

 class ConferenceControllerTest extends WebTestCase
@@ -22,11 +24,17 @@ class ConferenceControllerTest extends WebTestCase
         $client->submitForm('Submit', [
             'comment_form[author]' => 'Fabien',
             'comment_form[text]' => 'Some feedback from an automated functional test',
-            'comment_form[email]' => '[email protected]',
+            'comment_form[email]' => $email = '[email protected]',
             'comment_form[photo]' => dirname(__DIR__, 2).'/public/images/under-construction.gif',
         ]);
         $this->assertResponseRedirects();
+
+        // simulate comment validation
+        $comment = self::$container->get(CommentRepository::class)->findOneByEmail($email);
+        $comment->setState('published');
+        self::$container->get(EntityManagerInterface::class)->flush();
+
         $client->followRedirect();
         $this->assertSelectorExists('div:contains("There are 2 comments")');
     }

À partir d’un test PHPUnit, vous pouvez obtenir n’importe quel service depuis le conteneur grâce à self::$container->get() ; il donne également accès aux services non publics.

Comprendre Messenger

La gestion du code asynchrone avec Symfony est faite par le composant Messenger :

1
$ symfony composer req messenger

Lorsqu’une action doit être exécutée de manière asynchrone, envoyez un message à un messenger bus. Le bus stocke le message dans une file d’attente et rend immédiatement la main pour permettre au flux des opérations de reprendre aussi vite que possible.

Un consumer s’exécute continuellement en arrière-plan pour lire les nouveaux messages dans la file d’attente et exécuter la logique associée. Le consumer peut s’exécuter sur le même serveur que l’application web, ou sur un serveur séparé.

C’est très similaire à la façon dont les requêtes HTTP sont traitées, sauf que nous n’avons pas de réponse.

Coder un gestionnaire de messages

Un message est une classe de données (data object), qui ne doit contenir aucune logique. Il sera sérialisé pour être stocké dans une file d’attente, donc ne stockez que des données « simples » et sérialisables.

Créez la classe CommentMessage :

src/Message/CommentMessage.php
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
namespace App\Message;

class CommentMessage
{
    private $id;
    private $context;

    public function __construct(int $id, array $context = [])
    {
        $this->id = $id;
        $this->context = $context;
    }

    public function getId(): int
    {
        return $this->id;
    }

    public function getContext(): array
    {
        return $this->context;
    }
}

Dans le monde de Messenger, nous n’avons pas de contrôleurs, mais des gestionnaires de messages.

Sous un nouveau namespace App\MessageHandler, créez une classe CommentMessageHandler qui saura comment gérer les messages CommentMessage :

src/MessageHandler/CommentMessageHandler.php
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
namespace App\MessageHandler;

use App\Message\CommentMessage;
use App\Repository\CommentRepository;
use App\SpamChecker;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class CommentMessageHandler implements MessageHandlerInterface
{
    private $spamChecker;
    private $entityManager;
    private $commentRepository;

    public function __construct(EntityManagerInterface $entityManager, SpamChecker $spamChecker, CommentRepository $commentRepository)
    {
        $this->entityManager = $entityManager;
        $this->spamChecker = $spamChecker;
        $this->commentRepository = $commentRepository;
    }

    public function __invoke(CommentMessage $message)
    {
        $comment = $this->commentRepository->find($message->getId());
        if (!$comment) {
            return;
        }

        if (2 === $this->spamChecker->getSpamScore($comment, $message->getContext())) {
            $comment->setState('spam');
        } else {
            $comment->setState('published');
        }

        $this->entityManager->flush();
    }
}

MessageHandlerInterface est une interface de marqueur. Elle aide seulement Symfony à enregistrer et à configurer automatiquement la classe en tant que gestionnaire Messenger. Par convention, la logique d’un gestionnaire réside dans une méthode appelée __invoke(). Le type CommentMessage précisé en tant qu’argument unique de cette méthode indique à Messenger quelle classe elle va gérer.

Modifiez le contrôleur pour utiliser le nouveau système :

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
--- a/src/Controller/ConferenceController.php
+++ b/src/Controller/ConferenceController.php
@@ -5,14 +5,15 @@ namespace App\Controller;
 use App\Entity\Comment;
 use App\Entity\Conference;
 use App\Form\CommentFormType;
+use App\Message\CommentMessage;
 use App\Repository\CommentRepository;
 use App\Repository\ConferenceRepository;
-use App\SpamChecker;
 use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
 use Symfony\Component\HttpFoundation\File\Exception\FileException;
 use Symfony\Component\HttpFoundation\Request;
 use Symfony\Component\HttpFoundation\Response;
+use Symfony\Component\Messenger\MessageBusInterface;
 use Symfony\Component\Routing\Annotation\Route;
 use Twig\Environment;

@@ -20,11 +21,13 @@ class ConferenceController extends AbstractController
 {
     private $twig;
     private $entityManager;
+    private $bus;

-    public function __construct(Environment $twig, EntityManagerInterface $entityManager)
+    public function __construct(Environment $twig, EntityManagerInterface $entityManager, MessageBusInterface $bus)
     {
         $this->twig = $twig;
         $this->entityManager = $entityManager;
+        $this->bus = $bus;
     }

     /**
@@ -40,7 +43,7 @@ class ConferenceController extends AbstractController
     /**
      * @Route("/conference/{slug}", name="conference")
      */
-    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, SpamChecker $spamChecker, string $photoDir)
+    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, string $photoDir)
     {
         $comment = new Comment();
         $form = $this->createForm(CommentFormType::class, $comment);
@@ -59,6 +62,7 @@ class ConferenceController extends AbstractController
             }

             $this->entityManager->persist($comment);
+            $this->entityManager->flush();

             $context = [
                 'user_ip' => $request->getClientIp(),
@@ -66,11 +70,8 @@ class ConferenceController extends AbstractController
                 'referrer' => $request->headers->get('referer'),
                 'permalink' => $request->getUri(),
             ];
-            if (2 === $spamChecker->getSpamScore($comment, $context)) {
-                throw new \RuntimeException('Blatant spam, go away!');
-            }

-            $this->entityManager->flush();
+            $this->bus->dispatch(new CommentMessage($comment->getId(), $context));

             return $this->redirectToRoute('conference', ['slug' => $conference->getSlug()]);
         }

Au lieu de dépendre du SpamChecker, nous envoyons maintenant un message dans le bus. Le gestionnaire décide alors ce qu’il en fait.

Nous avons fait quelque chose que nous n’avions pas prévu. Nous avons découplé notre contrôleur du vérificateur de spam, et déplacé la logique vers une nouvelle classe, le gestionnaire. C’est un cas d’utilisation parfait pour le bus. Testez le code, il fonctionne. Tout se fait encore de manière synchrone, mais le code est probablement déjà « mieux ».

Filtrer les commentaires affichés

Modifiez la logique d’affichage pour éviter que des commentaires non publiés n’apparaissent sur le site :

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
--- a/src/Repository/CommentRepository.php
+++ b/src/Repository/CommentRepository.php
@@ -25,7 +25,9 @@ class CommentRepository extends ServiceEntityRepository
     {
         return $this->createQueryBuilder('c')
             ->andWhere('c.conference = :conference')
+            ->andWhere('c.state = :state')
             ->setParameter('conference', $conference)
+            ->setParameter('state', 'published')
             ->orderBy('c.createdAt', 'DESC')
             ->setMaxResults($limit)
             ->setFirstResult($offset)

Faire vraiment de l’asynchrone

Par défaut, les gestionnaires sont appelés de manière synchrone. Pour les rendre asynchrone, vous devez configurer explicitement la file d’attente à utiliser pour chaque gestionnaire dans le fichier de configuration config/packages/messenger.yaml :

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
--- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -5,10 +5,10 @@ framework:

         transports:
             # https://symfony.com/doc/current/messenger.html#transport-configuration
-            # async: '%env(MESSENGER_TRANSPORT_DSN)%'
+            async: '%env(RABBITMQ_DSN)%'
             # failed: 'doctrine://default?queue_name=failed'
             # sync: 'sync://'

         routing:
             # Route your messages to the transports
-            # 'App\Message\YourMessage': async
+            App\Message\CommentMessage: async

La configuration indique au bus d’envoyer les instances de App\Message\CommentMessage à la file d’attente async, qui est définie par un DSN stocké dans la variable d’environnement RABBITMQ_DSN.

Ajouter RabbitMQ comme service Docker

Comme vous l’avez deviné, nous allons utiliser RabbitMQ :

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -12,3 +12,7 @@ services:
     redis:
         image: redis:5-alpine
         ports: [6379]
+
+    rabbitmq:
+        image: rabbitmq:3.7-management
+        ports: [5672, 15672]

Redémarrer les services Docker

Pour forcer Docker Compose à prendre en compte le conteneur RabbitMQ, arrêtez les conteneurs et redémarrez-les :

1
2
$ docker-compose stop
$ docker-compose up -d
1
$ sleep 10

Consommer des messages

Si vous essayez de soumettre un nouveau commentaire, le vérificateur de spam ne sera plus appelé. Ajoutez un appel à la fonction error_log() dans la méthode getSpamScore() pour le confirmer. Au lieu d’avoir un nouveau commentaire, un message est en attente dans RabbitMQ, prêt à être consommé par d’autres processus.

Comme vous pouvez l’imaginer, Symfony est livré avec une commande pour consommer les messages. Exécutez-la maintenant :

1
$ symfony console messenger:consume async -vv

Cette commande devrait immédiatement consommer le message envoyé pour le commentaire soumis :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
 [OK] Consuming messages from transports "async".

 // The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.

 // Quit the worker with CONTROL-C.

11:30:20 INFO      [messenger] Received message App\Message\CommentMessage ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]
11:30:20 INFO      [http_client] Request: "POST https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [http_client] Response: "200 https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [messenger] Message App\Message\CommentMessage handled by App\MessageHandler\CommentMessageHandler::__invoke ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage","handler" => "App\MessageHandler\CommentMessageHandler::__invoke"]
11:30:20 INFO      [messenger] App\Message\CommentMessage was handled successfully (acknowledging to transport). ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]

L’activité du consumer de messages est enregistrée dans les logs, mais vous pouvez avoir un affichage instantané dans la console en passant l’option -vv. Vous devriez même voir l’appel vers l’API d’Akismet.

Pour arrêter le consumer, appuyez sur Ctrl+C.

Explorer l’interface web de gestion de RabbitMQ

Si vous voulez voir les files d’attente et les messages passés par RabbitMQ, ouvrez son interface web de gestion :

1
$ symfony open:local:rabbitmq

Ou à partir de la web debug toolbar :

Utilisez guest/guest pour vous connecter à l’interface de gestion RabbitMQ :

Lancer des workers en arrière-plan

Au lieu de lancer le consumer à chaque fois que nous publions un commentaire et de l’arrêter immédiatement après, nous voulons l’exécuter en continu sans avoir trop de fenêtres ou d’onglets du terminal ouverts.

La commande symfony peut gérer des commandes en tâche de fond ou des workers en utilisant l’option daemon (-d) sur la commande run.

Exécutez à nouveau le consumer du message, mais en tâche de fond :

1
$ symfony run -d --watch=config,src,templates,vendor symfony console messenger:consume async

L’option --watch indique à Symfony que la commande doit être redémarrée chaque fois qu’il y a un changement dans un des fichiers des répertoires config/, vendor/, src/ ou templates/.

Note

N’utilisez pas -vv pour éviter les doublons dans server:log (messages enregistrés et messages de la console).

Si le consumer cesse de fonctionner pour une raison quelconque (limite de mémoire, bogue, etc.), il sera redémarré automatiquement. Et s’il tombe en panne trop rapidement, la commande symfony s’arrêtera.

Les logs sont diffusés en continu par la commande symfony server:log, en même temps que ceux de PHP, du serveur web et de l’application :

1
$ symfony server:log

Utilisez la commande server:status pour lister tous les workers en arrière-plan gérés pour le projet en cours :

1
2
3
4
$ symfony server:status

Web server listening on https://127.0.0.1:8000
  Command symfony console messenger:consume async running with PID 15774 (watching config/, src/, templates/)

Pour arrêter un worker, arrêtez le serveur web ou tuez le PID (identifiant du processus) donné par la commande server:status :

1
$ kill 15774

Renvoyer des messages ayant échoué

Que faire si Akismet est en panne alors qu’un message est en train d’être consommé ? Il n’y a aucun impact pour les personnes qui soumettent des commentaires, mais le message est perdu et le spam n’est pas vérifié.

Messenger dispose d’un mécanisme de relance lorsqu’une exception se produit lors du traitement d’un message. Configurons-le :

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
--- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -5,10 +5,17 @@ framework:

         transports:
             # https://symfony.com/doc/current/messenger.html#transport-configuration
-            async: '%env(RABBITMQ_DSN)%'
-            # failed: 'doctrine://default?queue_name=failed'
+            async:
+                dsn: '%env(RABBITMQ_DSN)%'
+                retry_strategy:
+                    max_retries: 3
+                    multiplier: 2
+
+            failed: 'doctrine://default?queue_name=failed'
             # sync: 'sync://'

+        failure_transport: failed
+
         routing:
             # Route your messages to the transports
             App\Message\CommentMessage: async

Si un problème survient lors de la manipulation d’un message, le consumer réessaiera 3 fois avant d’abandonner. Mais au lieu de jeter le message, il le stockera dans une mémoire plus permanente, la file d’attente failed, qui utilise la base de données Doctrine.

Inspectez les messages ayant échoué et relancez-les à l’aide des commandes suivantes :

1
2
3
$ symfony console messenger:failed:show

$ symfony console messenger:failed:retry

Déployer RabbitMQ

L’ajout de RabbitMQ aux serveurs de production peut se faire en l’ajoutant à la liste des services :

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
--- a/.symfony/services.yaml
+++ b/.symfony/services.yaml
@@ -5,3 +5,8 @@ db:

 rediscache:
     type: redis:5.0
+
+queue:
+    type: rabbitmq:3.7
+    disk: 1024
+    size: S

Référencez-le également dans la configuration du conteneur web et activez l’extension PHP amqp :

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
--- a/.symfony.cloud.yaml
+++ b/.symfony.cloud.yaml
@@ -4,6 +4,7 @@ type: php:7.4

 runtime:
     extensions:
+        - amqp
         - pdo_pgsql
         - apcu
         - mbstring
@@ -22,6 +23,7 @@ variables:
 relationships:
     database: "db:postgresql"
     redis: "rediscache:redis"
+    rabbitmq: "queue:rabbitmq"

 web:
     locations:

Lorsque le service RabbitMQ est installé sur un projet, vous pouvez accéder à son interface de gestion web en ouvrant d’abord le tunnel :

1
2
3
4
5
$ symfony tunnel:open
$ symfony open:remote:rabbitmq

# when done
$ symfony tunnel:close

Exécuter des workers sur SymfonyCloud

Pour consommer les messages de RabbitMQ, nous devons exécuter la commande messenger:consume en continu. Sur SymfonyCloud, c’est le rôle d’un worker :

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
--- a/.symfony.cloud.yaml
+++ b/.symfony.cloud.yaml
@@ -46,3 +46,12 @@ hooks:
         set -x -e

         (>&2 symfony-deploy)
+
+workers:
+    messages:
+        commands:
+            start: |
+                set -x -e
+
+                (>&2 symfony-deploy)
+                php bin/console messenger:consume async -vv --time-limit=3600 --memory-limit=128M

Comme pour la commande symfony, SymfonyCloud gère les redémarrages et les logs.

Pour obtenir les logs d’un worker, utilisez :

1
$ symfony logs --worker=messages all

  • « Previous Étape 17: Tester
  • Next » Étape 19: Prendre des décisions avec un workflow

This work, including the code samples, is licensed under a Creative Commons BY-NC-SA 4.0 license.