Skip to content

Esecuzione asincrona

Controllare la presenza di spam durante la gestione dell'invio del form potrebbe portare ad alcuni problemi. Se le API di Akismet diventano lente, il nostro sito web lo sarà anche per gli utenti. Ma peggio ancora, se si verifica un timeout o se le API di Akismet sono temporaneamente non disponibili, potremmo perdere dei commenti.

Idealmente, dovremmo salvare i dati inviati senza pubblicarli e restituire immediatamente una risposta. Lo spam può essere controllato in un secondo momento.

Marcare i commenti

Dobbiamo introdurre uno stato (state) per i commenti: submitted, spam e published.

Aggiungiamo la proprietà state alla classe Comment:

1
$ symfony console make:entity Comment

Dovremmo anche assicurarci che il valore predefinito di state sia submitted:

1
2
3
4
5
6
7
8
9
10
11
12
13
--- a/src/Entity/Comment.php
+++ b/src/Entity/Comment.php
@@ -38,8 +38,8 @@ class Comment
     #[ORM\Column(type: 'string', length: 255, nullable: true)]
     private $photoFilename;

-    #[ORM\Column(type: 'string', length: 255)]
-    private $state;
+    #[ORM\Column(type: 'string', length: 255, options: ["default" => "submitted"])]
+    private $state = 'submitted';

     public function __toString(): string
     {

Creare una migration per il database:

1
$ symfony console make:migration

Modificare la migration per aggiornare tutti i commenti esistenti, impostando il loro stato predefinito a published:

1
2
3
4
5
6
7
8
9
10
--- a/migrations/Version00000000000000.php
+++ b/migrations/Version00000000000000.php
@@ -21,6 +21,7 @@ final class Version00000000000000 extends AbstractMigration
     {
         // this up() migration is auto-generated, please modify it to your needs
         $this->addSql('ALTER TABLE comment ADD state VARCHAR(255) DEFAULT \'submitted\' NOT NULL');
+        $this->addSql("UPDATE comment SET state='published'");
     }

     public function down(Schema $schema): void

Migrazione del database:

1
$ symfony console doctrine:migrations:migrate

Aggiornare la logica di visualizzazione per evitare che i commenti non pubblicati siano visibili sul frontend:

1
2
3
4
5
6
7
8
9
10
11
12
--- a/src/Repository/CommentRepository.php
+++ b/src/Repository/CommentRepository.php
@@ -27,7 +27,9 @@ class CommentRepository extends ServiceEntityRepository
     {
         $query = $this->createQueryBuilder('c')
             ->andWhere('c.conference = :conference')
+            ->andWhere('c.state = :state')
             ->setParameter('conference', $conference)
+            ->setParameter('state', 'published')
             ->orderBy('c.createdAt', 'DESC')
             ->setMaxResults(self::PAGINATOR_PER_PAGE)
             ->setFirstResult($offset)

Aggiornare la configurazione di EasyAdmin per poter vedere lo stato del commento:

1
2
3
4
5
6
7
8
9
10
--- a/src/Controller/Admin/CommentCrudController.php
+++ b/src/Controller/Admin/CommentCrudController.php
@@ -51,6 +51,7 @@ class CommentCrudController extends AbstractCrudController
             ->setLabel('Photo')
             ->onlyOnIndex()
         ;
+        yield TextField::new('state');

         $createdAt = DateTimeField::new('createdAt')->setFormTypeOptions([
             'html5' => true,

Non dimentichiamo di aggiornare anche i test impostando lo state nelle fixture:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
--- a/src/DataFixtures/AppFixtures.php
+++ b/src/DataFixtures/AppFixtures.php
@@ -37,8 +37,16 @@ class AppFixtures extends Fixture
         $comment1->setAuthor('Fabien');
         $comment1->setEmail('fabien@example.com');
         $comment1->setText('This was a great conference.');
+        $comment1->setState('published');
         $manager->persist($comment1);

+        $comment2 = new Comment();
+        $comment2->setConference($amsterdam);
+        $comment2->setAuthor('Lucas');
+        $comment2->setEmail('lucas@example.com');
+        $comment2->setText('I think this one is going to be moderated.');
+        $manager->persist($comment2);
+
         $admin = new Admin();
         $admin->setRoles(['ROLE_ADMIN']);
         $admin->setUsername('admin');

Per i test dei controller, simulare la validazione:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
--- a/tests/Controller/ConferenceControllerTest.php
+++ b/tests/Controller/ConferenceControllerTest.php
@@ -2,6 +2,8 @@

 namespace App\Tests\Controller;

+use App\Repository\CommentRepository;
+use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;

 class ConferenceControllerTest extends WebTestCase
@@ -22,10 +24,16 @@ class ConferenceControllerTest extends WebTestCase
         $client->submitForm('Submit', [
             'comment_form[author]' => 'Fabien',
             'comment_form[text]' => 'Some feedback from an automated functional test',
-            'comment_form[email]' => 'me@automat.ed',
+            'comment_form[email]' => $email = 'me@automat.ed',
             'comment_form[photo]' => dirname(__DIR__, 2).'/public/images/under-construction.gif',
         ]);
         $this->assertResponseRedirects();
+
+        // simulate comment validation
+        $comment = self::getContainer()->get(CommentRepository::class)->findOneByEmail($email);
+        $comment->setState('published');
+        self::getContainer()->get(EntityManagerInterface::class)->flush();
+
         $client->followRedirect();
         $this->assertSelectorExists('div:contains("There are 2 comments")');
     }

In un test PHPUnit, è possibile ottenere qualsiasi servizio dal container tramite self::$container->get(); oltretutto offre anche accesso ai servizi non pubblici.

Comprendere Messenger

Gestire codice asincrono con Symfony è il compito del componente Messenger:

1
$ symfony composer req doctrine-messenger

Quando una logica deve essere eseguita in maniera asincrona, inviare un messaggio ad un messenger bus. Questo memorizza il messaggio in una coda e restituisce immediatamente il controllo per far ripartire il flusso delle operazioni il più velocemente possibile.

Un consumer è eseguito costantemente in background in modo da leggere nuovi messaggi dalla coda ed eseguire la logica associata. Un consumer può essere eseguito sullo stesso server dell'applicazione web oppure su uno separato.

È molto simile al modo in cui vengono gestite le richieste HTTP, tranne per il fatto che non abbiamo risposte.

Scrivere un message handler

Un messaggio è un oggetto che non dovrebbe contenere alcuna logica, in quanto sarà serializzato per essere memorizzato in una coda. Pertanto utilizzate solo dati "semplici" e serializzabili.

Creare la classe CommentMessage:

src/Message/CommentMessage.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
namespace App\Message;

class CommentMessage
{
    private $id;
    private $context;

    public function __construct(int $id, array $context = [])
    {
        $this->id = $id;
        $this->context = $context;
    }

    public function getId(): int
    {
        return $this->id;
    }

    public function getContext(): array
    {
        return $this->context;
    }
}

Nel mondo di Messenger non abbiamo controller, ma message handler (gestori di messaggi).

All'interno di un nuovo namespace chiamato App\MessageHandler, creare la classe CommentMessageHandler, che saprà gestire i messaggi di tipo CommentMessage:

src/MessageHandler/CommentMessageHandler.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
namespace App\MessageHandler;

use App\Message\CommentMessage;
use App\Repository\CommentRepository;
use App\SpamChecker;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class CommentMessageHandler implements MessageHandlerInterface
{
    private $spamChecker;
    private $entityManager;
    private $commentRepository;

    public function __construct(EntityManagerInterface $entityManager, SpamChecker $spamChecker, CommentRepository $commentRepository)
    {
        $this->entityManager = $entityManager;
        $this->spamChecker = $spamChecker;
        $this->commentRepository = $commentRepository;
    }

    public function __invoke(CommentMessage $message)
    {
        $comment = $this->commentRepository->find($message->getId());
        if (!$comment) {
            return;
        }

        if (2 === $this->spamChecker->getSpamScore($comment, $message->getContext())) {
            $comment->setState('spam');
        } else {
            $comment->setState('published');
        }

        $this->entityManager->flush();
    }
}

MessageHandlerInterface è un'interfaccia marker. Aiuta solamente Symfony ad auto-registrare e auto-configurare la classe come Messenger handler. Per convenzione, la logica di gestione risiede in un metodo chiamato __invoke(). Il tipo CommentMessage sul parametro di questo metodo dice a Messenger quale classe sarà in grado di gestire.

Aggiornare il controller per utilizzare il nuovo sistema:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
--- a/src/Controller/ConferenceController.php
+++ b/src/Controller/ConferenceController.php
@@ -5,14 +5,15 @@ namespace App\Controller;
 use App\Entity\Comment;
 use App\Entity\Conference;
 use App\Form\CommentFormType;
+use App\Message\CommentMessage;
 use App\Repository\CommentRepository;
 use App\Repository\ConferenceRepository;
-use App\SpamChecker;
 use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
 use Symfony\Component\HttpFoundation\File\Exception\FileException;
 use Symfony\Component\HttpFoundation\Request;
 use Symfony\Component\HttpFoundation\Response;
+use Symfony\Component\Messenger\MessageBusInterface;
 use Symfony\Component\Routing\Annotation\Route;
 use Twig\Environment;

@@ -20,11 +21,13 @@ class ConferenceController extends AbstractController
 {
     private $twig;
     private $entityManager;
+    private $bus;

-    public function __construct(Environment $twig, EntityManagerInterface $entityManager)
+    public function __construct(Environment $twig, EntityManagerInterface $entityManager, MessageBusInterface $bus)
     {
         $this->twig = $twig;
         $this->entityManager = $entityManager;
+        $this->bus = $bus;
     }

     #[Route('/', name: 'homepage')]
@@ -36,7 +39,7 @@ class ConferenceController extends AbstractController
     }

     #[Route('/conference/{slug}', name: 'conference')]
-    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, SpamChecker $spamChecker, string $photoDir): Response
+    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, string $photoDir): Response
     {
         $comment = new Comment();
         $form = $this->createForm(CommentFormType::class, $comment);
@@ -54,6 +57,7 @@ class ConferenceController extends AbstractController
             }

             $this->entityManager->persist($comment);
+            $this->entityManager->flush();

             $context = [
                 'user_ip' => $request->getClientIp(),
@@ -61,11 +65,8 @@ class ConferenceController extends AbstractController
                 'referrer' => $request->headers->get('referer'),
                 'permalink' => $request->getUri(),
             ];
-            if (2 === $spamChecker->getSpamScore($comment, $context)) {
-                throw new \RuntimeException('Blatant spam, go away!');
-            }

-            $this->entityManager->flush();
+            $this->bus->dispatch(new CommentMessage($comment->getId(), $context));

             return $this->redirectToRoute('conference', ['slug' => $conference->getSlug()]);
         }

Invece di dipendere dallo Spam Checker, ora inviamo un messaggio alla coda, e il gestore (handler) in un secondo momento deciderà cosa farne.

Abbiamo ottenuto qualcosa di inaspettato. Abbiamo disaccoppiato il nostro controller dallo Spam Checker e spostato la logica in una nuova classe: l'handler (il nostro gestore). Questo infatti è un perfetto caso d'uso per una coda. Testiamo il codice. Tutto è ancora eseguito in maniera sincrona, ma il codice è probabilmente già "migliore".

Eseguiamolo in maniera asincrona

Per impostazione predefinita, gli handler (i gestori) sono chiamati in modo sincrono. Per essere eseguiti in maniera asincrona, è necessario configurare esplicitamente la coda da usare per ognuno di essi, nel file di configurazione config/packages/messenger.yaml:

1
2
3
4
5
6
7
8
--- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -21,4 +21,4 @@ framework:
             Symfony\Component\Notifier\Message\SmsMessage: async

             # Route your messages to the transports
-            # 'App\Message\YourMessage': async
+            App\Message\CommentMessage: async

La configurazione indica al bus di inviare istanze di tipo App\Message\CommentMessage nella coda di tipo async, definita da un DSN (MESSENGER_TRANSPORT_DSN), che punta a Doctrine come configurato in .env. In linguaggio naturale diremmo che stiamo usando PostgreSQL come coda per i nostri messaggi.

Impostare tabelle e trigger PostgreSQL:

1
$ symfony console make:migration

Ed eseguire le migrazioni sul database:

1
$ symfony console doctrine:migrations:migrate

Tip

Dietro le quinte, Symfony utilizza il sistema interno pub/sub (LISTEN/NOTIFY) di PostgreSQL, che è performante, scalabile e transazionale. Potete leggere il capitolo RabbitMQ se volete utilizzare Rabbit come message broker invece di PostgreSQL.

Consumare i messaggi

Se si tenta di inviare un nuovo commento, lo Spam Checker non verrà più chiamato. Chiamare error_log() nel metodo getSpamScore() per averne conferma. Se controlliamo, un messaggio è invece in attesa nella coda, pronto per essere consumato da qualche processo.

In Symfony è presente un comando per gestire i consumer. Eseguiamolo:

1
$ symfony console messenger:consume async -vv

Dovrebbe consumare immediatamente il messaggio inviato, grazie al commento inviato:

1
2
3
4
5
6
7
8
9
10
11
[OK] Consuming messages from transports "async".

 // The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.

 // Quit the worker with CONTROL-C.

11:30:20 INFO      [messenger] Received message App\Message\CommentMessage ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]
11:30:20 INFO      [http_client] Request: "POST https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [http_client] Response: "200 https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [messenger] Message App\Message\CommentMessage handled by App\MessageHandler\CommentMessageHandler::__invoke ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage","handler" => "App\MessageHandler\CommentMessageHandler::__invoke"]
11:30:20 INFO      [messenger] App\Message\CommentMessage was handled successfully (acknowledging to transport). ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]

L'attività di consumo dei messaggi dalla coda viene salvata nei log, ma è possibile ottenere un feedback immediato in console aggiungendo al comando l'opzione -vv. In questo modo si dovrebbe anche poter vedere la chiamata alle API di Akismet.

Per fermare il consumer premere Ctrl+C.

Esecuzione in background dei worker

Invece di eseguire il consumer ogni volta che si pubblica un commento per poi fermarlo subito dopo, vogliamo che sia sempre in esecuzione senza avere troppe finestre del terminale o schede del aperte.

La CLI di Symfony può eseguire questi comandi in background aggiungendo l'opzione demone (-d) al comando run.

Eseguire di nuovo il consumer, ma questa volta in background:

1
$ symfony run -d --watch=config,src,templates,vendor symfony console messenger:consume async -vv

L'opzione --watch dice a Symfony che il comando deve essere riavviato ogni volta che si verifica una modifica al filesystem nelle cartelle config/, src/, templates/ oppure vendor/.

Note

Non utilizzare l'opzione -vv, altrimenti ci saranno messaggi duplicati in server:log (log dei messaggi e messaggi della console).

Se il consumer smette di funzionare a causa di un errore (memory limit, bug, ecc.), verrà riavviato automaticamente. Invece, nel caso in cui questo smetta di funzionare troppo velocemente, la CLI di Symfony smetterà di riavviarlo.

I log possono essere mostrati eseguendo il comando symfony server:log visualizzando così anche tutti gli altri log provenienti da PHP, server web e applicazione:

1
$ symfony server:log

Utilizzare il comando server:status per visualizzare tutti i worker gestiti in background per questo progetto:

1
2
3
4
$ symfony server:status

Web server listening on https://127.0.0.1:8000
  Command symfony console messenger:consume async running with PID 15774 (watching config/, src/, templates/)

Per fermare un worker occorre fermare il server web, oppure eseguire il comando di sistema "kill" seguito dal suo PID, che si può recuperare tramite il comando server:status:

1
$ kill 15774

Riprovare con i messaggi falliti

E se le API di Akismet non fossero disponibili mentre viene consumato un messaggio? Questo non farà alcuna differenza per l'utente che invia un commento, ma il messaggio andrà perso, e non ci sarà alcun controllo sulla presenza di spam.

Messenger ha un meccanismo di "retry" per i casi in cui si verifichi un'eccezione durante la gestione di un messaggio:

config/packages/messenger.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
framework:
    messenger:
        failure_transport: failed

        transports:
            # https://symfony.com/doc/current/messenger.html#transport-configuration
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    use_notify: true
                    check_delayed_interval: 60000
                retry_strategy:
                    max_retries: 3
                    multiplier: 2
            failed: 'doctrine://default?queue_name=failed'
            # sync: 'sync://'

Se si verifica un errore durante la gestione di un messaggio, il consumer riproverà tre volte prima rinunciare. Ma invece di scartare il messaggio, lo memorizzerà permanentemente nella coda failed, che usa un'altra tabella di database.

Ispezionare i messaggi che sono falliti e provare a gestirli di nuovo con i seguenti comandi:

1
2
3
$ symfony console messenger:failed:show

$ symfony console messenger:failed:retry

Eseguire i worker su Platform.sh

Per consumare i messaggi da PostgreSQL, dobbiamo eseguire il comando messenger:consume. Su Platform.sh, questo è il ruolo di un worker:

.platform.app.yaml
1
2
3
4
5
workers:
    messenger:
        commands:
            # Consume "async" messages (as configured in the routing section of config/packages/messenger.yaml)
            start: symfony console --time-limit=3600 --memory-limit=64M messenger:consume async

Come per la CLI di Symfony, Platform.sh gestisce riavvii e log.

Per mostrare i log di un worker, utilizzare:

1
$ symfony cloud:logs --worker=messages all
This work, including the code samples, is licensed under a Creative Commons BY-NC-SA 4.0 license.
TOC
    Version