Passo 18: Tornando Assíncrono
Tornando Assíncrono¶
Verificar a existência de spam durante o processamento da submissão do formulário pode levar a alguns problemas. Se a API do Akismet ficar lenta, nosso site também ficará lento para os usuários. Pior ainda, se atingirmos um tempo limite ou se a API do Akismet estiver indisponível, poderemos perder comentários.
Idealmente, devemos armazenar os dados enviados sem publicá-los e retornar imediatamente uma resposta. A verificação de spam pode então ser feita em outro momento.
Adicionando Flags aos Comentários¶
Precisamos introduzir um state
para os comentários: submitted
, spam
e published
.
Adicione uma propriedade state
na classe Comment
:
1 | $ symfony console make:entity Comment
|
Crie uma migração do banco de dados:
1 | $ symfony console make:migration
|
Modifique a migração para atualizar todos os comentários existentes para serem published
por padrão:
1 2 3 4 5 6 7 8 9 10 11 12 13 | --- a/migrations/Version00000000000000.php
+++ b/migrations/Version00000000000000.php
@@ -20,7 +20,9 @@ final class Version20200714155905 extends AbstractMigration
public function up(Schema $schema) : void
{
// this up() migration is auto-generated, please modify it to your needs
- $this->addSql('ALTER TABLE comment ADD state VARCHAR(255) NOT NULL');
+ $this->addSql('ALTER TABLE comment ADD state VARCHAR(255)');
+ $this->addSql("UPDATE comment SET state='published'");
+ $this->addSql('ALTER TABLE comment ALTER COLUMN state SET NOT NULL');
}
public function down(Schema $schema) : void
|
Migre o banco de dados:
1 | $ symfony console doctrine:migrations:migrate
|
Também devemos garantir que, por padrão, o state
seja definido como submitted
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | --- a/src/Entity/Comment.php
+++ b/src/Entity/Comment.php
@@ -55,9 +55,9 @@ class Comment
private $photoFilename;
/**
- * @ORM\Column(type="string", length=255)
+ * @ORM\Column(type="string", length=255, options={"default": "submitted"})
*/
- private $state;
+ private $state = 'submitted';
public function __toString(): string
{
|
Atualize a configuração do EasyAdmin para poder ver o state do comentário:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | --- a/config/packages/easy_admin.yaml
+++ b/config/packages/easy_admin.yaml
@@ -18,6 +18,7 @@ easy_admin:
- author
- { property: 'email', type: 'email' }
- { property: 'photoFilename', type: 'image', 'base_path': "/uploads/photos", label: 'Photo' }
+ - state
- { property: 'createdAt', type: 'datetime' }
sort: ['createdAt', 'ASC']
filters: ['conference']
@@ -26,5 +27,6 @@ easy_admin:
- { property: 'conference' }
- { property: 'createdAt', type: datetime, type_options: { disabled: true } }
- 'author'
+ - { property: 'state' }
- { property: 'email', type: 'email' }
- text
|
Não se esqueça de atualizar também os testes, definindo state
nas fixtures:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | --- a/src/DataFixtures/AppFixtures.php
+++ b/src/DataFixtures/AppFixtures.php
@@ -37,8 +37,16 @@ class AppFixtures extends Fixture
$comment1->setAuthor('Fabien');
$comment1->setEmail('[email protected]');
$comment1->setText('This was a great conference.');
+ $comment1->setState('published');
$manager->persist($comment1);
+ $comment2 = new Comment();
+ $comment2->setConference($amsterdam);
+ $comment2->setAuthor('Lucas');
+ $comment2->setEmail('[email protected]');
+ $comment2->setText('I think this one is going to be moderated.');
+ $manager->persist($comment2);
+
$admin = new Admin();
$admin->setRoles(['ROLE_ADMIN']);
$admin->setUsername('admin');
|
Para os testes do controlador, simule a validação:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | --- a/tests/Controller/ConferenceControllerTest.php
+++ b/tests/Controller/ConferenceControllerTest.php
@@ -2,6 +2,8 @@
namespace App\Tests\Controller;
+use App\Repository\CommentRepository;
+use Doctrine\ORM\EntityManagerInterface;
use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
class ConferenceControllerTest extends WebTestCase
@@ -22,10 +24,16 @@ class ConferenceControllerTest extends WebTestCase
$client->submitForm('Submit', [
'comment_form[author]' => 'Fabien',
'comment_form[text]' => 'Some feedback from an automated functional test',
- 'comment_form[email]' => '[email protected]',
+ 'comment_form[email]' => $email = '[email protected]',
'comment_form[photo]' => dirname(__DIR__, 2).'/public/images/under-construction.gif',
]);
$this->assertResponseRedirects();
+
+ // simulate comment validation
+ $comment = self::$container->get(CommentRepository::class)->findOneByEmail($email);
+ $comment->setState('published');
+ self::$container->get(EntityManagerInterface::class)->flush();
+
$client->followRedirect();
$this->assertSelectorExists('div:contains("There are 2 comments")');
}
|
A partir de um teste do PHPUnit, você pode obter qualquer serviço do container via self::$container->get()
; ele também dá acesso a serviços não públicos.
Entendendo o Messenger¶
Gerenciar código assíncrono com o Symfony é tarefa do Componente Messenger:
1 | $ symfony composer req messenger
|
Quando alguma lógica deve ser executada de forma assíncrona, envie uma mensagem para um barramento de mensagens. O barramento armazena a mensagem em uma fila e retorna imediatamente para permitir que o fluxo de operações seja retomado o mais rápido possível.
Um consumidor é executado continuamente em segundo plano para ler novas mensagens na fila e executar a lógica associada. O consumidor pode executar no mesmo servidor que a aplicação web ou em um servidor separado.
É muito semelhante à forma como as requisições HTTP são tratadas, exceto que não temos respostas.
Programando um Manipulador de Mensagens¶
Uma mensagem é uma classe de objeto de dados que não deve conter nenhuma lógica. Ela será serializada para ser armazenada em uma fila, portanto, armazene somente dados serializáveis “simples”.
Crie a classe CommentMessage
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | namespace App\Message;
class CommentMessage
{
private $id;
private $context;
public function __construct(int $id, array $context = [])
{
$this->id = $id;
$this->context = $context;
}
public function getId(): int
{
return $this->id;
}
public function getContext(): array
{
return $this->context;
}
}
|
No mundo do Messenger, não temos controladores, mas manipuladores de mensagens.
Crie uma classe CommentMessageHandler
sob um novo namespace App\MessageHandler
que saiba como manipular mensagens CommentMessage
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | namespace App\MessageHandler;
use App\Message\CommentMessage;
use App\Repository\CommentRepository;
use App\SpamChecker;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
class CommentMessageHandler implements MessageHandlerInterface
{
private $spamChecker;
private $entityManager;
private $commentRepository;
public function __construct(EntityManagerInterface $entityManager, SpamChecker $spamChecker, CommentRepository $commentRepository)
{
$this->entityManager = $entityManager;
$this->spamChecker = $spamChecker;
$this->commentRepository = $commentRepository;
}
public function __invoke(CommentMessage $message)
{
$comment = $this->commentRepository->find($message->getId());
if (!$comment) {
return;
}
if (2 === $this->spamChecker->getSpamScore($comment, $message->getContext())) {
$comment->setState('spam');
} else {
$comment->setState('published');
}
$this->entityManager->flush();
}
}
|
MessageHandlerInterface
é uma interface marcadora. Ela só ajuda o Symfony a auto-registrar e auto-configurar a classe como um manipulador de mensagens. Por convenção, a lógica de um manipulador reside em um método chamado __invoke()
. A declaração de tipo CommentMessage
no único argumento desse método diz ao Messenger qual classe ele irá manipular.
Atualize o controlador para usar o novo sistema:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | --- a/src/Controller/ConferenceController.php
+++ b/src/Controller/ConferenceController.php
@@ -5,14 +5,15 @@ namespace App\Controller;
use App\Entity\Comment;
use App\Entity\Conference;
use App\Form\CommentFormType;
+use App\Message\CommentMessage;
use App\Repository\CommentRepository;
use App\Repository\ConferenceRepository;
-use App\SpamChecker;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\File\Exception\FileException;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
+use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\Annotation\Route;
use Twig\Environment;
@@ -20,11 +21,13 @@ class ConferenceController extends AbstractController
{
private $twig;
private $entityManager;
+ private $bus;
- public function __construct(Environment $twig, EntityManagerInterface $entityManager)
+ public function __construct(Environment $twig, EntityManagerInterface $entityManager, MessageBusInterface $bus)
{
$this->twig = $twig;
$this->entityManager = $entityManager;
+ $this->bus = $bus;
}
/**
@@ -40,7 +43,7 @@ class ConferenceController extends AbstractController
/**
* @Route("/conference/{slug}", name="conference")
*/
- public function show(Request $request, Conference $conference, CommentRepository $commentRepository, SpamChecker $spamChecker, string $photoDir): Response
+ public function show(Request $request, Conference $conference, CommentRepository $commentRepository, string $photoDir): Response
{
$comment = new Comment();
$form = $this->createForm(CommentFormType::class, $comment);
@@ -58,6 +61,7 @@ class ConferenceController extends AbstractController
}
$this->entityManager->persist($comment);
+ $this->entityManager->flush();
$context = [
'user_ip' => $request->getClientIp(),
@@ -65,11 +69,8 @@ class ConferenceController extends AbstractController
'referrer' => $request->headers->get('referer'),
'permalink' => $request->getUri(),
];
- if (2 === $spamChecker->getSpamScore($comment, $context)) {
- throw new \RuntimeException('Blatant spam, go away!');
- }
- $this->entityManager->flush();
+ $this->bus->dispatch(new CommentMessage($comment->getId(), $context));
return $this->redirectToRoute('conference', ['slug' => $conference->getSlug()]);
}
|
Em vez de depender do Verificador de Spam, despachamos agora uma mensagem ao barramento. O manipulador decide então o que fazer com ela.
Conseguimos algo inesperado. Desacoplamos nosso controlador do Verificador de Spam e movemos a lógica para uma nova classe, o manipulador. É um caso de uso perfeito para o barramento. Teste o código, ele funciona. Tudo ainda é feito de forma síncrona, mas o código provavelmente já está “melhor”.
Restringindo os Comentários Exibidos¶
Atualize a lógica de exibição para evitar que os comentários não publicados apareçam no frontend:
1 2 3 4 5 6 7 8 9 10 11 12 | --- a/src/Repository/CommentRepository.php
+++ b/src/Repository/CommentRepository.php
@@ -27,7 +27,9 @@ class CommentRepository extends ServiceEntityRepository
{
$query = $this->createQueryBuilder('c')
->andWhere('c.conference = :conference')
+ ->andWhere('c.state = :state')
->setParameter('conference', $conference)
+ ->setParameter('state', 'published')
->orderBy('c.createdAt', 'DESC')
->setMaxResults(self::PAGINATOR_PER_PAGE)
->setFirstResult($offset)
|
Tornando Realmente Assíncrono¶
Por padrão, os manipuladores são chamados de forma síncrona. Para tornar assíncrono, você precisa configurar explicitamente qual fila usar para cada manipulador no arquivo de configuração config/packages/messenger.yaml
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | --- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -5,10 +5,10 @@ framework:
transports:
# https://symfony.com/doc/current/messenger.html#transport-configuration
- # async: '%env(MESSENGER_TRANSPORT_DSN)%'
+ async: '%env(RABBITMQ_DSN)%'
# failed: 'doctrine://default?queue_name=failed'
# sync: 'sync://'
routing:
# Route your messages to the transports
- # 'App\Message\YourMessage': async
+ App\Message\CommentMessage: async
|
A configuração diz ao barramento para enviar instâncias de App\Message\CommentMessage
para a fila async
, que é definida por um DSN, armazenado na variável de ambiente RABBITMQ_DSN
.
Adicionando o RabbitMQ à Stack do Docker¶
Como você deve ter adivinhado, vamos usar o RabbitMQ:
1 2 3 4 5 6 7 8 9 10 | --- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -12,3 +12,7 @@ services:
redis:
image: redis:5-alpine
ports: [6379]
+
+ rabbitmq:
+ image: rabbitmq:3.7-management
+ ports: [5672, 15672]
|
Reiniciando os Serviços do Docker¶
Para forçar o Docker Compose a levar o container do RabbitMQ em consideração, pare e reinicie os containers:
1 2 | $ docker-compose stop
$ docker-compose up -d
|
1 | $ sleep 10
|
Consumindo Mensagens¶
Se você tentar enviar um novo comentário, o verificador de spam não será mais chamado. Adicione uma chamada error_log()
no método getSpamScore()
para confirmar. Em vez disso, uma mensagem está esperando no RabbitMQ, pronta para ser consumida por alguns processos.
Como você pode imaginar, o Symfony vem com um comando consumidor. Execute-o agora:
1 | $ symfony console messenger:consume async -vv
|
Ele deve consumir imediatamente a mensagem despachada quando o comentário foi submetido:
1 2 3 4 5 6 7 8 9 10 11 | [OK] Consuming messages from transports "async".
// The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.
// Quit the worker with CONTROL-C.
11:30:20 INFO [messenger] Received message App\Message\CommentMessage ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]
11:30:20 INFO [http_client] Request: "POST https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO [http_client] Response: "200 https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO [messenger] Message App\Message\CommentMessage handled by App\MessageHandler\CommentMessageHandler::__invoke ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage","handler" => "App\MessageHandler\CommentMessageHandler::__invoke"]
11:30:20 INFO [messenger] App\Message\CommentMessage was handled successfully (acknowledging to transport). ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]
|
A atividade do consumidor da mensagem é registrada no log, mas você recebe feedback instantâneo no console, passando a flag -vv
. Você deve ser capaz até de identificar a chamada à API do Akismet.
Para parar o consumidor, pressione Ctrl+C
.
Explorando a Interface de Gerenciamento Web do RabbitMQ¶
Se você quiser ver filas e mensagens fluindo através do RabbitMQ, abra sua interface de gerenciamento web:
1 | $ symfony open:local:rabbitmq
|
Ou a partir da barra de ferramentas para depuração web:

Use guest
/guest
para fazer login na interface de gerenciamento do RabbitMQ:

Executando Workers em Segundo Plano¶
Em vez de iniciar o consumidor toda vez que publicarmos um comentário e o pararmos logo depois, queremos executá-lo continuamente sem ter muitas janelas ou abas do terminal abertas.
A CLI do Symfony pode gerenciar esses comandos em segundo plano ou workers usando a flag daemon (-d
) no comando run
.
Execute o consumidor de mensagem novamente, mas envie-o em segundo plano:
1 | $ symfony run -d --watch=config,src,templates,vendor symfony console messenger:consume async
|
A opção --watch
diz ao Symfony que o comando deve ser reiniciado sempre que houver uma alteração no sistema de arquivos nos diretórios config/
, src/
, templates/
ou vendor/
.
Nota
Não use -vv
, pois você duplicaria as mensagens em server:log
(mensagens registradas no log e mensagens do console).
Se o consumidor parar de funcionar por alguma razão (limite de memória, bug, …), ele será reiniciado automaticamente. E se o consumidor falhar muito rápido, a CLI do Symfony irá desistir.
Os logs são transmitidos através do symfony server:log
junto com todos os outros logs provenientes do PHP, do servidor web e da aplicação:
1 | $ symfony server:log
|
Use o comando server:status
para listar todos os workers em segundo plano gerenciados para o projeto atual:
1 2 3 4 | $ symfony server:status
Web server listening on https://127.0.0.1:8000
Command symfony console messenger:consume async running with PID 15774 (watching config/, src/, templates/)
|
Para parar um worker, pare o servidor web ou mate o processo com o PID fornecido pelo comando server:status
:
1 | $ kill 15774
|
Repetindo Mensagens com Falha¶
E se o Akismet estiver inacessível enquanto consumimos uma mensagem? Não há impacto para as pessoas que enviam comentários, mas a mensagem é perdida e o spam não é verificado.
O Messenger tem um mecanismo para tentar novamente quando uma exceção ocorre durante a manipulação de uma mensagem. Vamos configurá-lo:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | --- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -5,10 +5,17 @@ framework:
transports:
# https://symfony.com/doc/current/messenger.html#transport-configuration
- async: '%env(RABBITMQ_DSN)%'
- # failed: 'doctrine://default?queue_name=failed'
+ async:
+ dsn: '%env(RABBITMQ_DSN)%'
+ retry_strategy:
+ max_retries: 3
+ multiplier: 2
+
+ failed: 'doctrine://default?queue_name=failed'
# sync: 'sync://'
+ failure_transport: failed
+
routing:
# Route your messages to the transports
App\Message\CommentMessage: async
|
Se ocorrer um problema durante a manipulação de uma mensagem, o consumidor tentará novamente 3 vezes antes de desistir. Mas, ao invés de descartar a mensagem, ele irá armazená-la em um armazenamento mais permanente, a fila failed
, que usa o banco de dados Doctrine.
Inspecione as mensagens com falha e tente novamente através dos seguintes comandos:
1 2 3 | $ symfony console messenger:failed:show
$ symfony console messenger:failed:retry
|
Implantando o RabbitMQ¶
Adicionar o RabbitMQ aos servidores de produção pode ser feito adicionando-o à lista de serviços:
1 2 3 4 5 6 7 8 9 10 11 | --- a/.symfony/services.yaml
+++ b/.symfony/services.yaml
@@ -5,3 +5,8 @@ db:
rediscache:
type: redis:5.0
+
+queue:
+ type: rabbitmq:3.7
+ disk: 1024
+ size: S
|
Referencie-o também na configuração do container web e habilite a extensão amqp
do PHP:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | --- a/.symfony.cloud.yaml
+++ b/.symfony.cloud.yaml
@@ -4,6 +4,7 @@ type: php:7.4
runtime:
extensions:
+ - amqp
- redis
- pdo_pgsql
- apcu
@@ -26,6 +27,7 @@ disk: 512
relationships:
database: "db:postgresql"
redis: "rediscache:redis"
+ rabbitmq: "queue:rabbitmq"
web:
locations:
|
Quando o serviço RabbitMQ é instalado em um projeto, você pode acessar sua interface de gerenciamento web abrindo o túnel primeiro:
1 2 3 4 5 | $ symfony tunnel:open
$ symfony open:remote:rabbitmq
# when done
$ symfony tunnel:close
|
Executando Workers na SymfonyCloud¶
Para consumir mensagens do RabbitMQ, precisamos executar o comando messenger:consume
continuamente. Na SymfonyCloud, esse é o papel de um worker:
1 2 3 4 5 6 7 8 9 10 11 | --- a/.symfony.cloud.yaml
+++ b/.symfony.cloud.yaml
@@ -54,3 +54,8 @@ hooks:
set -x -e
(>&2 symfony-deploy)
+
+workers:
+ messages:
+ commands:
+ start: symfony console messenger:consume async -vv --time-limit=3600 --memory-limit=128M
|
Assim como na CLI do Symfony, a SymfonyCloud gerencia reinicializações e logs.
Para obter os logs de um worker, use:
1 | $ symfony logs --worker=messages all
|
- « Previous Passo 17: Testes
- Next » Passo 19: Tomando Decisões com um Workflow
This work, including the code samples, is licensed under a Creative Commons BY-NC-SA 4.0 license.