Passo 18: Tornando Assíncrono

5.0 version
Maintained

Tornando Assíncrono

Verificar a existência de spam durante o processamento da submissão do formulário pode levar a alguns problemas. Se a API do Akismet ficar lenta, nosso site também ficará lento para os usuários. Pior ainda, se atingirmos um tempo limite ou se a API do Akismet estiver indisponível, poderemos perder comentários.

Idealmente, devemos armazenar os dados enviados sem publicá-los e retornar imediatamente uma resposta. A verificação de spam pode então ser feita em outro momento.

Adicionando Flags aos Comentários

Precisamos introduzir um state para os comentários: submitted, spam e published.

Adicione uma propriedade state na classe Comment:

1
$ symfony console make:entity Comment

Crie uma migração do banco de dados:

1
$ symfony console make:migration

Modifique a migração para atualizar todos os comentários existentes para serem published por padrão:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
--- a/migrations/Version00000000000000.php
+++ b/migrations/Version00000000000000.php
@@ -20,7 +20,9 @@ final class Version20200714155905 extends AbstractMigration
     public function up(Schema $schema) : void
     {
         // this up() migration is auto-generated, please modify it to your needs
-        $this->addSql('ALTER TABLE comment ADD state VARCHAR(255) NOT NULL');
+        $this->addSql('ALTER TABLE comment ADD state VARCHAR(255)');
+        $this->addSql("UPDATE comment SET state='published'");
+        $this->addSql('ALTER TABLE comment ALTER COLUMN state SET NOT NULL');
     }

     public function down(Schema $schema) : void

Migre o banco de dados:

1
$ symfony console doctrine:migrations:migrate

Também devemos garantir que, por padrão, o state seja definido como submitted:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
--- a/src/Entity/Comment.php
+++ b/src/Entity/Comment.php
@@ -49,9 +49,9 @@ class Comment
     private $photoFilename;

     /**
-     * @ORM\Column(type="string", length=255)
+     * @ORM\Column(type="string", length=255, options={"default": "submitted"})
      */
-    private $state;
+    private $state = 'submitted';

     public function __toString(): string
     {

Atualize a configuração do EasyAdmin para poder ver o state do comentário:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
--- a/config/packages/easy_admin.yaml
+++ b/config/packages/easy_admin.yaml
@@ -18,6 +18,7 @@ easy_admin:
                     - author
                     - { property: 'email', type: 'email' }
                     - { property: 'photoFilename', type: 'image', 'base_path': "/uploads/photos", label: 'Photo' }
+                    - state
                     - { property: 'createdAt', type: 'datetime' }
                 sort: ['createdAt', 'ASC']
                 filters: ['conference']
@@ -26,5 +27,6 @@ easy_admin:
                     - { property: 'conference' }
                     - { property: 'createdAt', type: datetime, type_options: { attr: { readonly: true } } }
                     - 'author'
+                    - { property: 'state' }
                     - { property: 'email', type: 'email' }
                     - text

Não se esqueça de atualizar também os testes, definindo state nas fixtures:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
--- a/src/DataFixtures/AppFixtures.php
+++ b/src/DataFixtures/AppFixtures.php
@@ -37,8 +37,16 @@ class AppFixtures extends Fixture
         $comment1->setAuthor('Fabien');
         $comment1->setEmail('[email protected]');
         $comment1->setText('This was a great conference.');
+        $comment1->setState('published');
         $manager->persist($comment1);

+        $comment2 = new Comment();
+        $comment2->setConference($amsterdam);
+        $comment2->setAuthor('Lucas');
+        $comment2->setEmail('[email protected]');
+        $comment2->setText('I think this one is going to be moderated.');
+        $manager->persist($comment2);
+
         $admin = new Admin();
         $admin->setRoles(['ROLE_ADMIN']);
         $admin->setUsername('admin');

Para os testes do controlador, simule a validação:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
--- a/tests/Controller/ConferenceControllerTest.php
+++ b/tests/Controller/ConferenceControllerTest.php
@@ -2,6 +2,8 @@

 namespace App\Tests\Controller;

+use App\Repository\CommentRepository;
+use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;

 class ConferenceControllerTest extends WebTestCase
@@ -22,11 +24,17 @@ class ConferenceControllerTest extends WebTestCase
         $client->submitForm('Submit', [
             'comment_form[author]' => 'Fabien',
             'comment_form[text]' => 'Some feedback from an automated functional test',
-            'comment_form[email]' => '[email protected]',
+            'comment_form[email]' => $email = '[email protected]',
             'comment_form[photo]' => dirname(__DIR__, 2).'/public/images/under-construction.gif',
         ]);
         $this->assertResponseRedirects();
+
+        // simulate comment validation
+        $comment = self::$container->get(CommentRepository::class)->findOneByEmail($email);
+        $comment->setState('published');
+        self::$container->get(EntityManagerInterface::class)->flush();
+
         $client->followRedirect();
         $this->assertSelectorExists('div:contains("There are 2 comments")');
     }

A partir de um teste do PHPUnit, você pode obter qualquer serviço do container via self::$container->get(); ele também dá acesso a serviços não públicos.

Entendendo o Messenger

Gerenciar código assíncrono com o Symfony é tarefa do Componente Messenger:

1
$ symfony composer req messenger

Quando alguma lógica deve ser executada de forma assíncrona, envie uma mensagem para um barramento de mensagens. O barramento armazena a mensagem em uma fila e retorna imediatamente para permitir que o fluxo de operações seja retomado o mais rápido possível.

Um consumidor é executado continuamente em segundo plano para ler novas mensagens na fila e executar a lógica associada. O consumidor pode executar no mesmo servidor que a aplicação web ou em um servidor separado.

É muito semelhante à forma como as requisições HTTP são tratadas, exceto que não temos respostas.

Programando um Manipulador de Mensagens

Uma mensagem é uma classe de objeto de dados que não deve conter nenhuma lógica. Ela será serializada para ser armazenada em uma fila, portanto, armazene somente dados serializáveis “simples”.

Crie a classe CommentMessage:

src/Message/CommentMessage.php
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
namespace App\Message;

class CommentMessage
{
    private $id;
    private $context;

    public function __construct(int $id, array $context = [])
    {
        $this->id = $id;
        $this->context = $context;
    }

    public function getId(): int
    {
        return $this->id;
    }

    public function getContext(): array
    {
        return $this->context;
    }
}

No mundo do Messenger, não temos controladores, mas manipuladores de mensagens.

Crie uma classe CommentMessageHandler sob um novo namespace App\MessageHandler que saiba como manipular mensagens CommentMessage:

src/MessageHandler/CommentMessageHandler.php
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
namespace App\MessageHandler;

use App\Message\CommentMessage;
use App\Repository\CommentRepository;
use App\SpamChecker;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class CommentMessageHandler implements MessageHandlerInterface
{
    private $spamChecker;
    private $entityManager;
    private $commentRepository;

    public function __construct(EntityManagerInterface $entityManager, SpamChecker $spamChecker, CommentRepository $commentRepository)
    {
        $this->entityManager = $entityManager;
        $this->spamChecker = $spamChecker;
        $this->commentRepository = $commentRepository;
    }

    public function __invoke(CommentMessage $message)
    {
        $comment = $this->commentRepository->find($message->getId());
        if (!$comment) {
            return;
        }

        if (2 === $this->spamChecker->getSpamScore($comment, $message->getContext())) {
            $comment->setState('spam');
        } else {
            $comment->setState('published');
        }

        $this->entityManager->flush();
    }
}

MessageHandlerInterface é uma interface marcadora. Ela só ajuda o Symfony a auto-registrar e auto-configurar a classe como um manipulador de mensagens. Por convenção, a lógica de um manipulador reside em um método chamado __invoke(). A declaração de tipo CommentMessage no único argumento desse método diz ao Messenger qual classe ele irá manipular.

Atualize o controlador para usar o novo sistema:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
--- a/src/Controller/ConferenceController.php
+++ b/src/Controller/ConferenceController.php
@@ -5,14 +5,15 @@ namespace App\Controller;
 use App\Entity\Comment;
 use App\Entity\Conference;
 use App\Form\CommentFormType;
+use App\Message\CommentMessage;
 use App\Repository\CommentRepository;
 use App\Repository\ConferenceRepository;
-use App\SpamChecker;
 use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
 use Symfony\Component\HttpFoundation\File\Exception\FileException;
 use Symfony\Component\HttpFoundation\Request;
 use Symfony\Component\HttpFoundation\Response;
+use Symfony\Component\Messenger\MessageBusInterface;
 use Symfony\Component\Routing\Annotation\Route;
 use Twig\Environment;

@@ -20,11 +21,13 @@ class ConferenceController extends AbstractController
 {
     private $twig;
     private $entityManager;
+    private $bus;

-    public function __construct(Environment $twig, EntityManagerInterface $entityManager)
+    public function __construct(Environment $twig, EntityManagerInterface $entityManager, MessageBusInterface $bus)
     {
         $this->twig = $twig;
         $this->entityManager = $entityManager;
+        $this->bus = $bus;
     }

     /**
@@ -40,7 +43,7 @@ class ConferenceController extends AbstractController
     /**
      * @Route("/conference/{slug}", name="conference")
      */
-    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, SpamChecker $spamChecker, string $photoDir)
+    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, string $photoDir)
     {
         $comment = new Comment();
         $form = $this->createForm(CommentFormType::class, $comment);
@@ -59,6 +62,7 @@ class ConferenceController extends AbstractController
             }

             $this->entityManager->persist($comment);
+            $this->entityManager->flush();

             $context = [
                 'user_ip' => $request->getClientIp(),
@@ -66,11 +70,8 @@ class ConferenceController extends AbstractController
                 'referrer' => $request->headers->get('referer'),
                 'permalink' => $request->getUri(),
             ];
-            if (2 === $spamChecker->getSpamScore($comment, $context)) {
-                throw new \RuntimeException('Blatant spam, go away!');
-            }

-            $this->entityManager->flush();
+            $this->bus->dispatch(new CommentMessage($comment->getId(), $context));

             return $this->redirectToRoute('conference', ['slug' => $conference->getSlug()]);
         }

Em vez de depender do Verificador de Spam, despachamos agora uma mensagem ao barramento. O manipulador decide então o que fazer com ela.

Conseguimos algo inesperado. Desacoplamos nosso controlador do Verificador de Spam e movemos a lógica para uma nova classe, o manipulador. É um caso de uso perfeito para o barramento. Teste o código, ele funciona. Tudo ainda é feito de forma síncrona, mas o código provavelmente já está “melhor”.

Restringindo os Comentários Exibidos

Atualize a lógica de exibição para evitar que os comentários não publicados apareçam no frontend:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
--- a/src/Repository/CommentRepository.php
+++ b/src/Repository/CommentRepository.php
@@ -25,7 +25,9 @@ class CommentRepository extends ServiceEntityRepository
     {
         return $this->createQueryBuilder('c')
             ->andWhere('c.conference = :conference')
+            ->andWhere('c.state = :state')
             ->setParameter('conference', $conference)
+            ->setParameter('state', 'published')
             ->orderBy('c.createdAt', 'DESC')
             ->setMaxResults($limit)
             ->setFirstResult($offset)

Tornando Realmente Assíncrono

Por padrão, os manipuladores são chamados de forma síncrona. Para tornar assíncrono, você precisa configurar explicitamente qual fila usar para cada manipulador no arquivo de configuração config/packages/messenger.yaml:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
--- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -5,10 +5,10 @@ framework:

         transports:
             # https://symfony.com/doc/current/messenger.html#transport-configuration
-            # async: '%env(MESSENGER_TRANSPORT_DSN)%'
+            async: '%env(RABBITMQ_DSN)%'
             # failed: 'doctrine://default?queue_name=failed'
             # sync: 'sync://'

         routing:
             # Route your messages to the transports
-            # 'App\Message\YourMessage': async
+            App\Message\CommentMessage: async

A configuração diz ao barramento para enviar instâncias de App\Message\CommentMessage para a fila async, que é definida por um DSN, armazenado na variável de ambiente RABBITMQ_DSN.

Adicionando o RabbitMQ à Stack do Docker

Como você deve ter adivinhado, vamos usar o RabbitMQ:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -12,3 +12,7 @@ services:
     redis:
         image: redis:5-alpine
         ports: [6379]
+
+    rabbitmq:
+        image: rabbitmq:3.7-management
+        ports: [5672, 15672]

Reiniciando os Serviços do Docker

Para forçar o Docker Compose a levar o container do RabbitMQ em consideração, pare e reinicie os containers:

1
2
$ docker-compose stop
$ docker-compose up -d
1
$ sleep 10

Consumindo Mensagens

Se você tentar enviar um novo comentário, o verificador de spam não será mais chamado. Adicione uma chamada error_log() no método getSpamScore() para confirmar. Em vez disso, uma mensagem está esperando no RabbitMQ, pronta para ser consumida por alguns processos.

Como você pode imaginar, o Symfony vem com um comando consumidor. Execute-o agora:

1
$ symfony console messenger:consume async -vv

Ele deve consumir imediatamente a mensagem despachada quando o comentário foi submetido:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
 [OK] Consuming messages from transports "async".

 // The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.

 // Quit the worker with CONTROL-C.

11:30:20 INFO      [messenger] Received message App\Message\CommentMessage ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]
11:30:20 INFO      [http_client] Request: "POST https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [http_client] Response: "200 https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [messenger] Message App\Message\CommentMessage handled by App\MessageHandler\CommentMessageHandler::__invoke ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage","handler" => "App\MessageHandler\CommentMessageHandler::__invoke"]
11:30:20 INFO      [messenger] App\Message\CommentMessage was handled successfully (acknowledging to transport). ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]

A atividade do consumidor da mensagem é registrada no log, mas você recebe feedback instantâneo no console, passando a flag -vv. Você deve ser capaz até de identificar a chamada à API do Akismet.

Para parar o consumidor, pressione Ctrl+C.

Explorando a Interface de Gerenciamento Web do RabbitMQ

Se você quiser ver filas e mensagens fluindo através do RabbitMQ, abra sua interface de gerenciamento web:

1
$ symfony open:local:rabbitmq

Ou a partir da barra de ferramentas para depuração web:

Use guest/guest para fazer login na interface de gerenciamento do RabbitMQ:

Executando Workers em Segundo Plano

Em vez de iniciar o consumidor toda vez que publicarmos um comentário e o pararmos logo depois, queremos executá-lo continuamente sem ter muitas janelas ou abas do terminal abertas.

A CLI do Symfony pode gerenciar esses comandos em segundo plano ou workers usando a flag daemon (-d) no comando run.

Execute o consumidor de mensagem novamente, mas envie-o em segundo plano:

1
$ symfony run -d --watch=config,src,templates,vendor symfony console messenger:consume async

A opção --watch diz ao Symfony que o comando deve ser reiniciado sempre que houver uma alteração no sistema de arquivos nos diretórios config/, src/, templates/ ou vendor/.

Nota

Não use -vv, pois você duplicaria as mensagens em server:log (mensagens registradas no log e mensagens do console).

Se o consumidor parar de funcionar por alguma razão (limite de memória, bug, …), ele será reiniciado automaticamente. E se o consumidor falhar muito rápido, a CLI do Symfony irá desistir.

Os logs são transmitidos através do symfony server:log junto com todos os outros logs provenientes do PHP, do servidor web e da aplicação:

1
$ symfony server:log

Use o comando server:status para listar todos os workers em segundo plano gerenciados para o projeto atual:

1
2
3
4
$ symfony server:status

Web server listening on https://127.0.0.1:8000
  Command symfony console messenger:consume async running with PID 15774 (watching config/, src/, templates/)

Para parar um worker, pare o servidor web ou mate o processo com o PID fornecido pelo comando server:status:

1
$ kill 15774

Repetindo Mensagens com Falha

E se o Akismet estiver inacessível enquanto consumimos uma mensagem? Não há impacto para as pessoas que enviam comentários, mas a mensagem é perdida e o spam não é verificado.

O Messenger tem um mecanismo para tentar novamente quando uma exceção ocorre durante a manipulação de uma mensagem. Vamos configurá-lo:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
--- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -5,10 +5,17 @@ framework:

         transports:
             # https://symfony.com/doc/current/messenger.html#transport-configuration
-            async: '%env(RABBITMQ_DSN)%'
-            # failed: 'doctrine://default?queue_name=failed'
+            async:
+                dsn: '%env(RABBITMQ_DSN)%'
+                retry_strategy:
+                    max_retries: 3
+                    multiplier: 2
+
+            failed: 'doctrine://default?queue_name=failed'
             # sync: 'sync://'

+        failure_transport: failed
+
         routing:
             # Route your messages to the transports
             App\Message\CommentMessage: async

Se ocorrer um problema durante a manipulação de uma mensagem, o consumidor tentará novamente 3 vezes antes de desistir. Mas, ao invés de descartar a mensagem, ele irá armazená-la em um armazenamento mais permanente, a fila failed, que usa o banco de dados Doctrine.

Inspecione as mensagens com falha e tente novamente através dos seguintes comandos:

1
2
3
$ symfony console messenger:failed:show

$ symfony console messenger:failed:retry

Implantando o RabbitMQ

Adicionar o RabbitMQ aos servidores de produção pode ser feito adicionando-o à lista de serviços:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
--- a/.symfony/services.yaml
+++ b/.symfony/services.yaml
@@ -5,3 +5,8 @@ db:

 rediscache:
     type: redis:5.0
+
+queue:
+    type: rabbitmq:3.7
+    disk: 1024
+    size: S

Referencie-o também na configuração do container web e habilite a extensão amqp do PHP:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
--- a/.symfony.cloud.yaml
+++ b/.symfony.cloud.yaml
@@ -4,6 +4,7 @@ type: php:7.4

 runtime:
     extensions:
+        - amqp
         - pdo_pgsql
         - apcu
         - mbstring
@@ -22,6 +23,7 @@ variables:
 relationships:
     database: "db:postgresql"
     redis: "rediscache:redis"
+    rabbitmq: "queue:rabbitmq"

 web:
     locations:

Quando o serviço RabbitMQ é instalado em um projeto, você pode acessar sua interface de gerenciamento web abrindo o túnel primeiro:

1
2
3
4
5
$ symfony tunnel:open
$ symfony open:remote:rabbitmq

# when done
$ symfony tunnel:close

Executando Workers na SymfonyCloud

Para consumir mensagens do RabbitMQ, precisamos executar o comando messenger:consume continuamente. Na SymfonyCloud, esse é o papel de um worker:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
--- a/.symfony.cloud.yaml
+++ b/.symfony.cloud.yaml
@@ -46,3 +46,12 @@ hooks:
         set -x -e

         (>&2 symfony-deploy)
+
+workers:
+    messages:
+        commands:
+            start: |
+                set -x -e
+
+                (>&2 symfony-deploy)
+                php bin/console messenger:consume async -vv --time-limit=3600 --memory-limit=128M

Assim como na CLI do Symfony, a SymfonyCloud gerencia reinicializações e logs.

Para obter os logs de um worker, use:

1
$ symfony logs --worker=messages all

This work, including the code samples, is licensed under a Creative Commons BY-NC-SA 4.0 license.