SymfonyWorld Online 2020
100% online
30+ talks + workshops
Live + Replay watch talks later

Paso 18: Volviéndonos asíncronos

5.0 version
Maintained

Volviéndonos asíncronos

La comprobación del correo no deseado durante la gestión del envío del formulario puede dar lugar a algunos problemas. Si la API de Akismet se vuelve lenta, nuestro sitio web también lo será para los usuarios. Pero lo que es peor, si tenemos un timeout o si la API de Akismet no está disponible, podemos perder comentarios.

Lo ideal sería que almacenáramos los datos enviados sin publicarlos y devolviéramos inmediatamente una respuesta. La comprobación de spam se puede hacer de forma separada.

Marcando comentarios

Tenemos que introducir un state para los comentarios: submitted, spam y published.

Agrega la propiedad state a la clase Comment:

1
$ symfony console make:entity Comment

Crea una migración de base de datos:

1
$ symfony console make:migration

Modifica la migración para actualizar todos los comentarios existentes para que sean published de forma predeterminada:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
--- a/migrations/Version00000000000000.php
+++ b/migrations/Version00000000000000.php
@@ -20,7 +20,9 @@ final class Version20200714155905 extends AbstractMigration
     public function up(Schema $schema) : void
     {
         // this up() migration is auto-generated, please modify it to your needs
-        $this->addSql('ALTER TABLE comment ADD state VARCHAR(255) NOT NULL');
+        $this->addSql('ALTER TABLE comment ADD state VARCHAR(255)');
+        $this->addSql("UPDATE comment SET state='published'");
+        $this->addSql('ALTER TABLE comment ALTER COLUMN state SET NOT NULL');
     }

     public function down(Schema $schema) : void

Migra la base de datos:

1
$ symfony console doctrine:migrations:migrate

También debemos asegurarnos de que, por defecto, el valor de state sea submitted:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
--- a/src/Entity/Comment.php
+++ b/src/Entity/Comment.php
@@ -49,9 +49,9 @@ class Comment
     private $photoFilename;

     /**
-     * @ORM\Column(type="string", length=255)
+     * @ORM\Column(type="string", length=255, options={"default": "submitted"})
      */
-    private $state;
+    private $state = 'submitted';

     public function __toString(): string
     {

Actualiza la configuración de EasyAdmin para poder ver el estado del comentario:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
--- a/config/packages/easy_admin.yaml
+++ b/config/packages/easy_admin.yaml
@@ -18,6 +18,7 @@ easy_admin:
                     - author
                     - { property: 'email', type: 'email' }
                     - { property: 'photoFilename', type: 'image', 'base_path': "/uploads/photos", label: 'Photo' }
+                    - state
                     - { property: 'createdAt', type: 'datetime' }
                 sort: ['createdAt', 'ASC']
                 filters: ['conference']
@@ -26,5 +27,6 @@ easy_admin:
                     - { property: 'conference' }
                     - { property: 'createdAt', type: datetime, type_options: { attr: { readonly: true } } }
                     - 'author'
+                    - { property: 'state' }
                     - { property: 'email', type: 'email' }
                     - text

No olvides también actualizar las pruebas configurando el state de los fixtures:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
--- a/src/DataFixtures/AppFixtures.php
+++ b/src/DataFixtures/AppFixtures.php
@@ -37,8 +37,16 @@ class AppFixtures extends Fixture
         $comment1->setAuthor('Fabien');
         $comment1->setEmail('[email protected]');
         $comment1->setText('This was a great conference.');
+        $comment1->setState('published');
         $manager->persist($comment1);

+        $comment2 = new Comment();
+        $comment2->setConference($amsterdam);
+        $comment2->setAuthor('Lucas');
+        $comment2->setEmail('[email protected]');
+        $comment2->setText('I think this one is going to be moderated.');
+        $manager->persist($comment2);
+
         $admin = new Admin();
         $admin->setRoles(['ROLE_ADMIN']);
         $admin->setUsername('admin');

Para las pruebas del controlador, simula la validación:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
--- a/tests/Controller/ConferenceControllerTest.php
+++ b/tests/Controller/ConferenceControllerTest.php
@@ -2,6 +2,8 @@

 namespace App\Tests\Controller;

+use App\Repository\CommentRepository;
+use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;

 class ConferenceControllerTest extends WebTestCase
@@ -22,11 +24,17 @@ class ConferenceControllerTest extends WebTestCase
         $client->submitForm('Submit', [
             'comment_form[author]' => 'Fabien',
             'comment_form[text]' => 'Some feedback from an automated functional test',
-            'comment_form[email]' => '[email protected]',
+            'comment_form[email]' => $email = '[email protected]',
             'comment_form[photo]' => dirname(__DIR__, 2).'/public/images/under-construction.gif',
         ]);
         $this->assertResponseRedirects();
+
+        // simulate comment validation
+        $comment = self::$container->get(CommentRepository::class)->findOneByEmail($email);
+        $comment->setState('published');
+        self::$container->get(EntityManagerInterface::class)->flush();
+
         $client->followRedirect();
         $this->assertSelectorExists('div:contains("There are 2 comments")');
     }

A partir de una prueba de PHPUnit, puedes obtener cualquier servicio del contenedor a través de self::$container->get(); también da acceso a servicios no públicos.

Entendiendo Messenger

El componente Messenger es el encargado de la gestión de código asíncrono cuando usamos Symfony:

1
$ symfony composer req messenger

Cuando alguna lógica deba ser ejecutada de forma asíncrona, se envía un mensaje (message) a un bus de mensajería (message bus). El bus almacena el mensaje en una cola (queue) y vuelve inmediatamente para permitir que el flujo de operaciones se reanude lo más rápido posible.

Un consumidor se ejecuta continuamente en segundo plano para leer nuevos mensajes en la cola y ejecutar la lógica asociada. El consumidor puede ejecutarse en el mismo servidor que la aplicación web o en un servidor separado.

Es muy similar a la forma en que se manejan las peticiones HTTP, excepto que no tenemos respuestas.

Creando un manejador de mensajes (Message handler)

Un mensaje es una clase de objeto de datos que no debe contener ninguna lógica. Será serializado para ser almacenado en una cola, por lo que sólo se almacenarán datos serializables «simples».

Crea la clase CommentMessage:

src/Message/CommentMessage.php
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
namespace App\Message;

class CommentMessage
{
    private $id;
    private $context;

    public function __construct(int $id, array $context = [])
    {
        $this->id = $id;
        $this->context = $context;
    }

    public function getId(): int
    {
        return $this->id;
    }

    public function getContext(): array
    {
        return $this->context;
    }
}

En el mundo de Messenger, no tenemos controladores, sino manejadores de mensajes.

Crea una clase CommentMessageHandler bajo un nuevo espacio de nombres App\MessageHandler que sepa cómo manejar los mensajes CommentMessage:

src/MessageHandler/CommentMessageHandler.php
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
namespace App\MessageHandler;

use App\Message\CommentMessage;
use App\Repository\CommentRepository;
use App\SpamChecker;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class CommentMessageHandler implements MessageHandlerInterface
{
    private $spamChecker;
    private $entityManager;
    private $commentRepository;

    public function __construct(EntityManagerInterface $entityManager, SpamChecker $spamChecker, CommentRepository $commentRepository)
    {
        $this->entityManager = $entityManager;
        $this->spamChecker = $spamChecker;
        $this->commentRepository = $commentRepository;
    }

    public function __invoke(CommentMessage $message)
    {
        $comment = $this->commentRepository->find($message->getId());
        if (!$comment) {
            return;
        }

        if (2 === $this->spamChecker->getSpamScore($comment, $message->getContext())) {
            $comment->setState('spam');
        } else {
            $comment->setState('published');
        }

        $this->entityManager->flush();
    }
}

MessageHandlerInterface es una interfaz que actúa como marcador. Sólo ayuda a Symfony a auto-registrarse y a auto-configurar la clase como un manejador de Messenger. Por convención, la lógica de un manejador vive en un método llamado __invoke(). El type-hint CommentMessage en el argumento de este método le dice a Messenger qué clase manejará.

Actualiza el controlador para utilizar el nuevo sistema:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
--- a/src/Controller/ConferenceController.php
+++ b/src/Controller/ConferenceController.php
@@ -5,14 +5,15 @@ namespace App\Controller;
 use App\Entity\Comment;
 use App\Entity\Conference;
 use App\Form\CommentFormType;
+use App\Message\CommentMessage;
 use App\Repository\CommentRepository;
 use App\Repository\ConferenceRepository;
-use App\SpamChecker;
 use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
 use Symfony\Component\HttpFoundation\File\Exception\FileException;
 use Symfony\Component\HttpFoundation\Request;
 use Symfony\Component\HttpFoundation\Response;
+use Symfony\Component\Messenger\MessageBusInterface;
 use Symfony\Component\Routing\Annotation\Route;
 use Twig\Environment;

@@ -20,11 +21,13 @@ class ConferenceController extends AbstractController
 {
     private $twig;
     private $entityManager;
+    private $bus;

-    public function __construct(Environment $twig, EntityManagerInterface $entityManager)
+    public function __construct(Environment $twig, EntityManagerInterface $entityManager, MessageBusInterface $bus)
     {
         $this->twig = $twig;
         $this->entityManager = $entityManager;
+        $this->bus = $bus;
     }

     /**
@@ -40,7 +43,7 @@ class ConferenceController extends AbstractController
     /**
      * @Route("/conference/{slug}", name="conference")
      */
-    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, SpamChecker $spamChecker, string $photoDir)
+    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, string $photoDir)
     {
         $comment = new Comment();
         $form = $this->createForm(CommentFormType::class, $comment);
@@ -59,6 +62,7 @@ class ConferenceController extends AbstractController
             }

             $this->entityManager->persist($comment);
+            $this->entityManager->flush();

             $context = [
                 'user_ip' => $request->getClientIp(),
@@ -66,11 +70,8 @@ class ConferenceController extends AbstractController
                 'referrer' => $request->headers->get('referer'),
                 'permalink' => $request->getUri(),
             ];
-            if (2 === $spamChecker->getSpamScore($comment, $context)) {
-                throw new \RuntimeException('Blatant spam, go away!');
-            }

-            $this->entityManager->flush();
+            $this->bus->dispatch(new CommentMessage($comment->getId(), $context));

             return $this->redirectToRoute('conference', ['slug' => $conference->getSlug()]);
         }

En lugar de depender del Spam Checker, ahora enviamos un mensaje al bus. El manejador entonces decide qué hacer con él.

Hemos logrado algo inesperado. Hemos desacoplado nuestro controlador del Spam Checker y hemos movido la lógica a una nueva clase, el manejador. Es un caso de uso perfecto para el bus. Prueba el código, funciona. Todo se sigue haciendo sincrónicamente, pero el código probablemente ya sea «mejor».

Restringiendo comentarios visualizados

Actualiza la lógica de visualización para evitar que aparezcan comentarios no publicados en el frontend:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
--- a/src/Repository/CommentRepository.php
+++ b/src/Repository/CommentRepository.php
@@ -25,7 +25,9 @@ class CommentRepository extends ServiceEntityRepository
     {
         return $this->createQueryBuilder('c')
             ->andWhere('c.conference = :conference')
+            ->andWhere('c.state = :state')
             ->setParameter('conference', $conference)
+            ->setParameter('state', 'published')
             ->orderBy('c.createdAt', 'DESC')
             ->setMaxResults($limit)
             ->setFirstResult($offset)

Volviéndonos asíncronos de verdad

Por defecto, los manejadores son llamados sincrónicamente. Para ser asíncronos, es necesario configurar explícitamente qué cola utilizar para cada manejador en el fichero de configuración config/packages/messenger.yaml:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
--- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -5,10 +5,10 @@ framework:

         transports:
             # https://symfony.com/doc/current/messenger.html#transport-configuration
-            # async: '%env(MESSENGER_TRANSPORT_DSN)%'
+            async: '%env(RABBITMQ_DSN)%'
             # failed: 'doctrine://default?queue_name=failed'
             # sync: 'sync://'

         routing:
             # Route your messages to the transports
-            # 'App\Message\YourMessage': async
+            App\Message\CommentMessage: async

La configuración le dice al bus que envíe instancias de App\Message\CommentMessage a la cola async, que está definida por un DSN, almacenado en la variable de entorno RABBITMQ_DSN.

Agregando RabbitMQ al Stack de Docker

Como habrás adivinado, vamos a usar RabbitMQ:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -12,3 +12,7 @@ services:
     redis:
         image: redis:5-alpine
         ports: [6379]
+
+    rabbitmq:
+        image: rabbitmq:3.7-management
+        ports: [5672, 15672]

Reiniciando los servicios de Docker

Para obligar a Docker Compose a tener en cuenta el contenedor RabbitMQ, detén los contenedores y reinícialos:

1
2
$ docker-compose stop
$ docker-compose up -d
1
$ sleep 10

Consumiendo mensajes

Si intentas enviar un nuevo comentario, ya no se llamará al verificador de spam. Agrega una llamada error_log() en el método getSpamScore() para confirmar. En su lugar, un mensaje está esperando en RabbitMQ, listo para ser consumido por algunos procesos.

Como te puedes imaginar, Symfony viene con un comando consumer para el consumidor. Ejecútalo ahora:

1
$ symfony console messenger:consume async -vv

Debe consumir inmediatamente el mensaje despachado para el comentario enviado:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
 [OK] Consuming messages from transports "async".

 // The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.

 // Quit the worker with CONTROL-C.

11:30:20 INFO      [messenger] Received message App\Message\CommentMessage ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]
11:30:20 INFO      [http_client] Request: "POST https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [http_client] Response: "200 https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [messenger] Message App\Message\CommentMessage handled by App\MessageHandler\CommentMessageHandler::__invoke ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage","handler" => "App\MessageHandler\CommentMessageHandler::__invoke"]
11:30:20 INFO      [messenger] App\Message\CommentMessage was handled successfully (acknowledging to transport). ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]

La actividad del consumidor de mensajes se registra, pero obtendrás información en tiempo real en la consola si le pasas el parámetro -vv. Incluso deberías ser capaz de detectar la llamada a la API de Akismet.

Para detener al consumidor, pulsa Ctrl+C.

Explorando la interfaz de administración web de RabbitMQ

Si deseas ver colas y mensajes que fluyen a través de RabbitMQ, abre su interfaz de administración web:

1
$ symfony open:local:rabbitmq

O desde la barra de herramientas de depuración web:

Utiliza guest/ guest para iniciar sesión en la interfaz de administración de RabbitMQ:

Ejecutando workers en segundo plano

En lugar de lanzar al consumidor cada vez que publicamos un comentario y detenerlo inmediatamente después, queremos ejecutarlo continuamente sin tener demasiadas ventanas de terminal o pestañas abiertas.

El comando Symfony puede administrar dichos comandos en segundo plano o workers (trabajadores) usando el parámetro de daemon (-d) en el comando run.

Ejecuta de nuevo el consumidor del mensaje, pero envíalo en segundo plano:

1
$ symfony run -d --watch=config,src,templates,vendor symfony console messenger:consume async

La opción --watch le dice a Symfony que el comando debe ser reiniciado siempre que haya un cambio en el sistema de archivos en los directorios config/, src/, templates/ o vendor/.

Nota

No uses -vv ya que obtendrías mensajes duplicados en server:log (los mensajes registrados y los mensajes de consola).

Si el consumidor dejara de trabajar por alguna razón (por quedarse sin memoria, por un fallo…), se reiniciará automáticamente. Y si el consumidor falla demasiado rápido, el comando Symfony desistirá de reiniciarlo.

Usando symfony server:log los registros generados se unirán a todos los demás registros procedentes de PHP, el servidor web y la aplicación:

1
$ symfony server:log

Utiliza el comando server:status para listar todos los workers en segundo plano pertenecientes al proyecto actual:

1
2
3
4
$ symfony server:status

Web server listening on https://127.0.0.1:8000
  Command symfony console messenger:consume async running with PID 15774 (watching config/, src/, templates/)

Para detener a un worker, detén el servidor web o mata el proceso que tiene el PID que se muestra con el comando server:status:

1
$ kill 15774

Reintentando cuando los mensajes fallan

¿Qué pasa si Akismet se cae mientras se consume un mensaje? Las personas que envían comentarios no notarán nada, pero el mensaje se pierde y el spam no se controla.

Messenger tiene un mecanismo de reintento para cuando ocurre una excepción mientras se maneja un mensaje. Vamos a configurarlo:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
--- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -5,10 +5,17 @@ framework:

         transports:
             # https://symfony.com/doc/current/messenger.html#transport-configuration
-            async: '%env(RABBITMQ_DSN)%'
-            # failed: 'doctrine://default?queue_name=failed'
+            async:
+                dsn: '%env(RABBITMQ_DSN)%'
+                retry_strategy:
+                    max_retries: 3
+                    multiplier: 2
+
+            failed: 'doctrine://default?queue_name=failed'
             # sync: 'sync://'

+        failure_transport: failed
+
         routing:
             # Route your messages to the transports
             App\Message\CommentMessage: async

Si ocurre un problema mientras se maneja un mensaje, el consumidor volverá a intentarlo 3 veces antes de darse por vencido. Pero en lugar de descartar el mensaje, lo almacenará en un almacenamiento más permanente, la cola failed, que utiliza la base de datos Doctrine.

Inspecciona los mensajes fallidos y vuelve a intentarlo mediante los siguientes comandos:

1
2
3
$ symfony console messenger:failed:show

$ symfony console messenger:failed:retry

Desplegando RabbitMQ

La inclusión de RabbitMQ en los servidores de producción puede hacerse añadiéndolo a la lista de servicios:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
--- a/.symfony/services.yaml
+++ b/.symfony/services.yaml
@@ -5,3 +5,8 @@ db:

 rediscache:
     type: redis:5.0
+
+queue:
+    type: rabbitmq:3.7
+    disk: 1024
+    size: S

Referéncialo también en la configuración del contenedor web y habilita la extensión de PHP amqp:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
--- a/.symfony.cloud.yaml
+++ b/.symfony.cloud.yaml
@@ -4,6 +4,7 @@ type: php:7.4

 runtime:
     extensions:
+        - amqp
         - pdo_pgsql
         - apcu
         - mbstring
@@ -22,6 +23,7 @@ variables:
 relationships:
     database: "db:postgresql"
     redis: "rediscache:redis"
+    rabbitmq: "queue:rabbitmq"

 web:
     locations:

Cuando el servicio RabbitMQ está instalado en un proyecto, puedes acceder a su interfaz de gestión web abriendo primero el túnel:

1
2
3
4
5
$ symfony tunnel:open
$ symfony open:remote:rabbitmq

# when done
$ symfony tunnel:close

Ejecutando workers en SymfonyCloud

Para consumir mensajes de RabbitMQ, necesitamos ejecutar el comando messenger:consume continuamente. En SymfonyCloud, este es el rol de un worker:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
--- a/.symfony.cloud.yaml
+++ b/.symfony.cloud.yaml
@@ -46,3 +46,12 @@ hooks:
         set -x -e

         (>&2 symfony-deploy)
+
+workers:
+    messages:
+        commands:
+            start: |
+                set -x -e
+
+                (>&2 symfony-deploy)
+                php bin/console messenger:consume async -vv --time-limit=3600 --memory-limit=128M

Al igual que hace el comando Symfony, SymfonyCloud también gestiona los reinicios y los registros.

Para obtener logs de un worker, usa:

1
$ symfony logs --worker=messages all

This work, including the code samples, is licensed under a Creative Commons BY-NC-SA 4.0 license.