Passo 18: Esecuzione asincrona

5.0 version
Maintained

Esecuzione asincrona

Controllare la presenza di spam durante la gestione dell’invio del form potrebbe portare ad alcuni problemi. Se le API di Akismet diventano lente, il nostro sito web lo sarà anche per gli utenti. Ma peggio ancora, se si verifica un timeout o se le API di Akismet sono temporaneamente non disponibili, potremmo perdere dei commenti.

Idealmente, dovremmo salvare i dati inviati senza pubblicarli e restituire immediatamente una risposta. Lo spam può essere controllato in un secondo momento.

Marcare i commenti

Dobbiamo introdurre uno stato (state) per i commenti: submitted, spam e published.

Aggiungiamo la proprietà state alla classe Comment:

1
$ symfony console make:entity Comment

Creare una migration per il database:

1
$ symfony console make:migration

Modificare la migration per aggiornare tutti i commenti esistenti, impostando il loro stato predefinito a published:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
--- a/migrations/Version00000000000000.php
+++ b/migrations/Version00000000000000.php
@@ -20,7 +20,9 @@ final class Version20200714155905 extends AbstractMigration
     public function up(Schema $schema) : void
     {
         // this up() migration is auto-generated, please modify it to your needs
-        $this->addSql('ALTER TABLE comment ADD state VARCHAR(255) NOT NULL');
+        $this->addSql('ALTER TABLE comment ADD state VARCHAR(255)');
+        $this->addSql("UPDATE comment SET state='published'");
+        $this->addSql('ALTER TABLE comment ALTER COLUMN state SET NOT NULL');
     }

     public function down(Schema $schema) : void

Migrazione del database:

1
$ symfony console doctrine:migrations:migrate

Dovremmo anche assicurarci che il valore predefinito di state sia submitted:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
--- a/src/Entity/Comment.php
+++ b/src/Entity/Comment.php
@@ -49,9 +49,9 @@ class Comment
     private $photoFilename;

     /**
-     * @ORM\Column(type="string", length=255)
+     * @ORM\Column(type="string", length=255, options={"default": "submitted"})
      */
-    private $state;
+    private $state = 'submitted';

     public function __toString(): string
     {

Aggiornare la configurazione di EasyAdmin per poter vedere lo stato del commento:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
--- a/config/packages/easy_admin.yaml
+++ b/config/packages/easy_admin.yaml
@@ -18,6 +18,7 @@ easy_admin:
                     - author
                     - { property: 'email', type: 'email' }
                     - { property: 'photoFilename', type: 'image', 'base_path': "/uploads/photos", label: 'Photo' }
+                    - state
                     - { property: 'createdAt', type: 'datetime' }
                 sort: ['createdAt', 'ASC']
                 filters: ['conference']
@@ -26,5 +27,6 @@ easy_admin:
                     - { property: 'conference' }
                     - { property: 'createdAt', type: datetime, type_options: { attr: { readonly: true } } }
                     - 'author'
+                    - { property: 'state' }
                     - { property: 'email', type: 'email' }
                     - text

Non dimentichiamo di aggiornare anche i test impostando lo state nelle fixture:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
--- a/src/DataFixtures/AppFixtures.php
+++ b/src/DataFixtures/AppFixtures.php
@@ -37,8 +37,16 @@ class AppFixtures extends Fixture
         $comment1->setAuthor('Fabien');
         $comment1->setEmail('[email protected]');
         $comment1->setText('This was a great conference.');
+        $comment1->setState('published');
         $manager->persist($comment1);

+        $comment2 = new Comment();
+        $comment2->setConference($amsterdam);
+        $comment2->setAuthor('Lucas');
+        $comment2->setEmail('[email protected]');
+        $comment2->setText('I think this one is going to be moderated.');
+        $manager->persist($comment2);
+
         $admin = new Admin();
         $admin->setRoles(['ROLE_ADMIN']);
         $admin->setUsername('admin');

Per i test dei controller, simulare la validazione:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
--- a/tests/Controller/ConferenceControllerTest.php
+++ b/tests/Controller/ConferenceControllerTest.php
@@ -2,6 +2,8 @@

 namespace App\Tests\Controller;

+use App\Repository\CommentRepository;
+use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;

 class ConferenceControllerTest extends WebTestCase
@@ -22,11 +24,17 @@ class ConferenceControllerTest extends WebTestCase
         $client->submitForm('Submit', [
             'comment_form[author]' => 'Fabien',
             'comment_form[text]' => 'Some feedback from an automated functional test',
-            'comment_form[email]' => '[email protected]',
+            'comment_form[email]' => $email = '[email protected]',
             'comment_form[photo]' => dirname(__DIR__, 2).'/public/images/under-construction.gif',
         ]);
         $this->assertResponseRedirects();
+
+        // simulate comment validation
+        $comment = self::$container->get(CommentRepository::class)->findOneByEmail($email);
+        $comment->setState('published');
+        self::$container->get(EntityManagerInterface::class)->flush();
+
         $client->followRedirect();
         $this->assertSelectorExists('div:contains("There are 2 comments")');
     }

In un test PHPUnit, è possibile ottenere qualsiasi servizio dal container tramite self::$container->get(), che oltretutto offre anche accesso ai servizi non pubblici.

Comprendere Messenger

Gestire codice asincrono con Symfony è il compito del componente Messenger:

1
$ symfony composer req messenger

Quando una logica deve essere eseguita in maniera asincrona, inviare un messaggio ad un messenger bus. Questo memorizza il messaggio in una coda e restituisce immediatamente il controllo per far ripartire il flusso delle operazioni il più velocemente possibile.

Un consumer è eseguito costantemente in background in modo da leggere nuovi messaggi dalla coda ed eseguire la logica associata. Un consumer può essere eseguito sullo stesso server dell’applicazione web oppure su uno separato.

È molto simile al modo in cui vengono gestite le richieste HTTP, tranne per il fatto che non abbiamo risposte.

Scrivere un message handler

Un messaggio è un oggetto che non dovrebbe contenere alcuna logica, in quanto sarà serializzato per essere memorizzato in una coda. Pertanto utilizzate solo dati «semplici» e serializzabili.

Creare la classe CommentMessage:

src/Message/CommentMessage.php
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
namespace App\Message;

class CommentMessage
{
    private $id;
    private $context;

    public function __construct(int $id, array $context = [])
    {
        $this->id = $id;
        $this->context = $context;
    }

    public function getId(): int
    {
        return $this->id;
    }

    public function getContext(): array
    {
        return $this->context;
    }
}

Nel mondo di Messenger non abbiamo controller, ma message handler (gestori di messaggi).

All’interno di un nuovo namespace chiamato App\MessageHandler, creare la classe CommentMessageHandler, che saprà gestire i messaggi di tipo CommentMessage:

src/MessageHandler/CommentMessageHandler.php
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
namespace App\MessageHandler;

use App\Message\CommentMessage;
use App\Repository\CommentRepository;
use App\SpamChecker;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class CommentMessageHandler implements MessageHandlerInterface
{
    private $spamChecker;
    private $entityManager;
    private $commentRepository;

    public function __construct(EntityManagerInterface $entityManager, SpamChecker $spamChecker, CommentRepository $commentRepository)
    {
        $this->entityManager = $entityManager;
        $this->spamChecker = $spamChecker;
        $this->commentRepository = $commentRepository;
    }

    public function __invoke(CommentMessage $message)
    {
        $comment = $this->commentRepository->find($message->getId());
        if (!$comment) {
            return;
        }

        if (2 === $this->spamChecker->getSpamScore($comment, $message->getContext())) {
            $comment->setState('spam');
        } else {
            $comment->setState('published');
        }

        $this->entityManager->flush();
    }
}

MessageHandlerInterface è un’interfaccia marker. Aiuta solamente Symfony ad auto-registrare e auto-configurare la classe come Messenger handler. Per convenzione, la logica di gestione risiede in un metodo chiamato __invoke(). Il tipo CommentMessage sul parametro di questo metodo dice a Messenger quale classe sarà in grado di gestire.

Aggiornare il controller per utilizzare il nuovo sistema:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
--- a/src/Controller/ConferenceController.php
+++ b/src/Controller/ConferenceController.php
@@ -5,14 +5,15 @@ namespace App\Controller;
 use App\Entity\Comment;
 use App\Entity\Conference;
 use App\Form\CommentFormType;
+use App\Message\CommentMessage;
 use App\Repository\CommentRepository;
 use App\Repository\ConferenceRepository;
-use App\SpamChecker;
 use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
 use Symfony\Component\HttpFoundation\File\Exception\FileException;
 use Symfony\Component\HttpFoundation\Request;
 use Symfony\Component\HttpFoundation\Response;
+use Symfony\Component\Messenger\MessageBusInterface;
 use Symfony\Component\Routing\Annotation\Route;
 use Twig\Environment;

@@ -20,11 +21,13 @@ class ConferenceController extends AbstractController
 {
     private $twig;
     private $entityManager;
+    private $bus;

-    public function __construct(Environment $twig, EntityManagerInterface $entityManager)
+    public function __construct(Environment $twig, EntityManagerInterface $entityManager, MessageBusInterface $bus)
     {
         $this->twig = $twig;
         $this->entityManager = $entityManager;
+        $this->bus = $bus;
     }

     /**
@@ -40,7 +43,7 @@ class ConferenceController extends AbstractController
     /**
      * @Route("/conference/{slug}", name="conference")
      */
-    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, SpamChecker $spamChecker, string $photoDir)
+    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, string $photoDir)
     {
         $comment = new Comment();
         $form = $this->createForm(CommentFormType::class, $comment);
@@ -59,6 +62,7 @@ class ConferenceController extends AbstractController
             }

             $this->entityManager->persist($comment);
+            $this->entityManager->flush();

             $context = [
                 'user_ip' => $request->getClientIp(),
@@ -66,11 +70,8 @@ class ConferenceController extends AbstractController
                 'referrer' => $request->headers->get('referer'),
                 'permalink' => $request->getUri(),
             ];
-            if (2 === $spamChecker->getSpamScore($comment, $context)) {
-                throw new \RuntimeException('Blatant spam, go away!');
-            }

-            $this->entityManager->flush();
+            $this->bus->dispatch(new CommentMessage($comment->getId(), $context));

             return $this->redirectToRoute('conference', ['slug' => $conference->getSlug()]);
         }

Invece di dipendere dallo Spam Checker, ora inviamo un messaggio alla coda, e il gestore (handler) in un secondo momento deciderà cosa farne.

Abbiamo ottenuto qualcosa di inaspettato. Abbiamo disaccoppiato il nostro controller dallo Spam Checker e spostato la logica in una nuova classe: l’handler (il nostro gestore). Questo infatti è un perfetto caso d’uso per una coda. Testiamo il codice. Tutto è ancora eseguito in maniera sincrona, ma il codice è probabilmente già «migliore».

Nascondere i commenti non pubblicati

Aggiornare la logica di visualizzazione per evitare che i commenti non pubblicati siano visibili sul frontend:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
--- a/src/Repository/CommentRepository.php
+++ b/src/Repository/CommentRepository.php
@@ -25,7 +25,9 @@ class CommentRepository extends ServiceEntityRepository
     {
         return $this->createQueryBuilder('c')
             ->andWhere('c.conference = :conference')
+            ->andWhere('c.state = :state')
             ->setParameter('conference', $conference)
+            ->setParameter('state', 'published')
             ->orderBy('c.createdAt', 'DESC')
             ->setMaxResults($limit)
             ->setFirstResult($offset)

Eseguiamolo in maniera asincrona

Per impostazione predefinita, gli handler (i gestori) sono chiamati in modo sincrono. Per essere eseguiti in maniera asincrona, è necessario configurare esplicitamente la coda da usare per ognuno di essi, nel file di configurazione config/packages/messenger.yaml:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
--- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -5,10 +5,10 @@ framework:

         transports:
             # https://symfony.com/doc/current/messenger.html#transport-configuration
-            # async: '%env(MESSENGER_TRANSPORT_DSN)%'
+            async: '%env(RABBITMQ_DSN)%'
             # failed: 'doctrine://default?queue_name=failed'
             # sync: 'sync://'

         routing:
             # Route your messages to the transports
-            # 'App\Message\YourMessage': async
+            App\Message\CommentMessage: async

La configurazione indica al bus di inviare istanze di tipo App\Message\CommentMessage nella coda di tipo async, definita da un DSN, memorizzato nella variabile d’ambiente RABBITMQ_DSN.

Aggiungere RabbitMQ allo stack Docker

Come avrete intuito, useremo RabbitMQ:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -12,3 +12,7 @@ services:
     redis:
         image: redis:5-alpine
         ports: [6379]
+
+    rabbitmq:
+        image: rabbitmq:3.7-management
+        ports: [5672, 15672]

Riavvio dei servizi Docker

Per forzare Docker Compose a leggere il container RabbitMQ, fermare i container e riavviarli:

1
2
$ docker-compose stop
$ docker-compose up -d
1
$ sleep 10

Consumare i messaggi

Se si tenta di inviare un nuovo commento, lo Spam Checker non verrà più chiamato. Chiamare error_log() nel metodo getSpamScore() per averne conferma. Se controlliamo, un messaggio è invece in attesa su RabbitMQ, pronto per essere consumato da qualche processo.

In Symfony è presente un comando per gestire i consumer. Eseguiamolo:

1
$ symfony console messenger:consume async -vv

Dovrebbe consumare immediatamente il messaggio inviato, grazie al commento inviato:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
 [OK] Consuming messages from transports "async".

 // The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.

 // Quit the worker with CONTROL-C.

11:30:20 INFO      [messenger] Received message App\Message\CommentMessage ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]
11:30:20 INFO      [http_client] Request: "POST https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [http_client] Response: "200 https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [messenger] Message App\Message\CommentMessage handled by App\MessageHandler\CommentMessageHandler::__invoke ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage","handler" => "App\MessageHandler\CommentMessageHandler::__invoke"]
11:30:20 INFO      [messenger] App\Message\CommentMessage was handled successfully (acknowledging to transport). ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]

L’attività di consumo dei messaggi dalla coda viene salvata nei log, ma è possibile ottenere un feedback immediato in console aggiungendo al comando l’opzione -vv. In questo modo si dovrebbe anche poter vedere la chiamata alle API di Akismet.

Per fermare il consumer premere Ctrl+C.

L’interfaccia web di RabbitMQ

Se volete vedere le code e i messaggi che scorrono attraverso RabbitMQ, aprite la sua interfaccia web:

1
$ symfony open:local:rabbitmq

Oppure dalla barra degli strumenti di debug:

Utilizzare guest/guest per accedere all’interfaccia web di RabbitMQ:

Esecuzione in background dei worker

Invece di eseguire il consumer ogni volta che si pubblica un commento per poi fermarlo subito dopo, vogliamo che sia sempre in esecuzione senza avere troppe finestre del terminale o schede del aperte.

La CLI di Symfony può eseguire questi comandi in background aggiungendo l’opzione demone (-d) al comando run.

Eseguire di nuovo il consumer, ma questa volta in background:

1
$ symfony run -d --watch=config,src,templates,vendor symfony console messenger:consume async

L’opzione --watch dice a Symfony che il comando deve essere riavviato ogni volta che si verifica una modifica al filesystem nelle cartelle config/, src/, templates/ oppure vendor/.

Nota

Non utilizzare l’opzione -vv, altrimenti ci saranno messaggi duplicati in server:log (log dei messaggi e messaggi della console).

Se il consumer smette di funzionare a causa di un errore (memory limit, bug, ecc.), verrà riavviato automaticamente. Invece, nel caso in cui questo smetta di funzionare troppo velocemente, la CLI di Symfony smetterà di riavviarlo.

I log possono essere mostrati eseguendo il comando symfony server:log visualizzando così anche tutti gli altri log provenienti da PHP, server web e applicazione:

1
$ symfony server:log

Utilizzare il comando server:status per visualizzare tutti i worker gestiti in background per questo progetto:

1
2
3
4
$ symfony server:status

Web server listening on https://127.0.0.1:8000
  Command symfony console messenger:consume async running with PID 15774 (watching config/, src/, templates/)

Per fermare un worker occorre fermare il server web, oppure eseguire il comando di sistema «kill» seguito dal suo PID, che si può recuperare tramite il comando server:status:

1
$ kill 15774

Riprovare con i messaggi falliti

E se le API di Akismet non fossero disponibili mentre viene consumato un messaggio? Questo non farà alcuna differenza per l’utente che invia un commento, ma il messaggio andrà perso, e non ci sarà alcun controllo sulla presenza di spam.

Messenger ha un meccanismo di «retry» per i casi in cui si verifichi un’eccezione durante la gestione di un messaggio. Configuriamolo:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
--- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -5,10 +5,17 @@ framework:

         transports:
             # https://symfony.com/doc/current/messenger.html#transport-configuration
-            async: '%env(RABBITMQ_DSN)%'
-            # failed: 'doctrine://default?queue_name=failed'
+            async:
+                dsn: '%env(RABBITMQ_DSN)%'
+                retry_strategy:
+                    max_retries: 3
+                    multiplier: 2
+
+            failed: 'doctrine://default?queue_name=failed'
             # sync: 'sync://'

+        failure_transport: failed
+
         routing:
             # Route your messages to the transports
             App\Message\CommentMessage: async

Se si verifica un errore durante la gestione di un messaggio, il consumer riproverà tre volte prima rinunciare. Ma invece di scartare il messaggio, lo memorizzerà in un luogo più sicuro: la coda failed, che usa il database configurato su Doctrine.

Ispezionare i messaggi che sono falliti e provare a gestirli di nuovo con i seguenti comandi:

1
2
3
$ symfony console messenger:failed:show

$ symfony console messenger:failed:retry

Eseguire il deploy di RabbitMQ

L’aggiunta di RabbitMQ ai server di produzione può essere fatta nell’elenco dei servizi:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
--- a/.symfony/services.yaml
+++ b/.symfony/services.yaml
@@ -5,3 +5,8 @@ db:

 rediscache:
     type: redis:5.0
+
+queue:
+    type: rabbitmq:3.7
+    disk: 1024
+    size: S

Farvi riferimento nella configurazione principale del container e abilitare l’estensione amqp di PHP:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
--- a/.symfony.cloud.yaml
+++ b/.symfony.cloud.yaml
@@ -4,6 +4,7 @@ type: php:7.4

 runtime:
     extensions:
+        - amqp
         - pdo_pgsql
         - apcu
         - mbstring
@@ -22,6 +23,7 @@ variables:
 relationships:
     database: "db:postgresql"
     redis: "rediscache:redis"
+    rabbitmq: "queue:rabbitmq"

 web:
     locations:

Quando il servizio RabbitMQ è installato su un progetto, è possibile accedere alla sua interfaccia web aprendo prima un tunnel:

1
2
3
4
5
$ symfony tunnel:open
$ symfony open:remote:rabbitmq

# when done
$ symfony tunnel:close

Eseguire i worker su SymfonyCloud

Per consumare i messaggi da RabbitMQ, dobbiamo eseguire il comando messenger:consume. Su SymfonyCloud, questo è il ruolo di un worker:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
--- a/.symfony.cloud.yaml
+++ b/.symfony.cloud.yaml
@@ -46,3 +46,12 @@ hooks:
         set -x -e

         (>&2 symfony-deploy)
+
+workers:
+    messages:
+        commands:
+            start: |
+                set -x -e
+
+                (>&2 symfony-deploy)
+                php bin/console messenger:consume async -vv --time-limit=3600 --memory-limit=128M

Come per la CLI di Symfony, SymfonyCloud gestisce riavvii e log.

Per mostrare i log di un worker, utilizzare:

1
$ symfony logs --worker=messages all

This work, including the code samples, is licensed under a Creative Commons BY-NC-SA 4.0 license.