Pas 18: Procesând asincron

5.0 version
Maintained

Procesând asincron

Verificarea spamului în timpul procesării expedierii formularului poate genera unele probleme. Dacă API-ul Akismet devine lent, site-ul nostru va fi de asemenea lent pentru utilizatori. Dar și mai rău, dacă atingem un interval de timp sau dacă API-ul Akismet nu este disponibil, am putea pierde comentarii.

În mod ideal, ar trebui să stocăm datele expediate fără să le publicăm și să returnăm un răspuns imediat. Verificarea spamului poate fi făcut independent.

Marcarea comentariilor

Trebuie să introducem un state pentru comentarii: submitted, spam și published.

Adaugă proprietatea state la clasa Comment:

1
$ symfony console make:entity Comment

Creează o migrare a bazei de date:

1
$ symfony console make:migration

Modifică migrarea pentru a actualiza toate comentariile existente cu statutul published în mod implicit:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
--- a/migrations/Version00000000000000.php
+++ b/migrations/Version00000000000000.php
@@ -20,7 +20,9 @@ final class Version20200714155905 extends AbstractMigration
     public function up(Schema $schema) : void
     {
         // this up() migration is auto-generated, please modify it to your needs
-        $this->addSql('ALTER TABLE comment ADD state VARCHAR(255) NOT NULL');
+        $this->addSql('ALTER TABLE comment ADD state VARCHAR(255)');
+        $this->addSql("UPDATE comment SET state='published'");
+        $this->addSql('ALTER TABLE comment ALTER COLUMN state SET NOT NULL');
     }

     public function down(Schema $schema) : void

Migrează baza de date:

1
$ symfony console doctrine:migrations:migrate

De asemenea, ar trebui să ne asigurăm că, în mod implicit, state este setată pe submitted:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
--- a/src/Entity/Comment.php
+++ b/src/Entity/Comment.php
@@ -49,9 +49,9 @@ class Comment
     private $photoFilename;

     /**
-     * @ORM\Column(type="string", length=255)
+     * @ORM\Column(type="string", length=255, options={"default": "submitted"})
      */
-    private $state;
+    private $state = 'submitted';

     public function __toString(): string
     {

Actualizează configurația EasyAdmin pentru a putea vedea starea comentariului:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
--- a/config/packages/easy_admin.yaml
+++ b/config/packages/easy_admin.yaml
@@ -18,6 +18,7 @@ easy_admin:
                     - author
                     - { property: 'email', type: 'email' }
                     - { property: 'photoFilename', type: 'image', 'base_path': "/uploads/photos", label: 'Photo' }
+                    - state
                     - { property: 'createdAt', type: 'datetime' }
                 sort: ['createdAt', 'ASC']
                 filters: ['conference']
@@ -26,5 +27,6 @@ easy_admin:
                     - { property: 'conference' }
                     - { property: 'createdAt', type: datetime, type_options: { attr: { readonly: true } } }
                     - 'author'
+                    - { property: 'state' }
                     - { property: 'email', type: 'email' }
                     - text

Nu uita să actualizezi și testele prin setarea parametrului state a datelor de test:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
--- a/src/DataFixtures/AppFixtures.php
+++ b/src/DataFixtures/AppFixtures.php
@@ -37,8 +37,16 @@ class AppFixtures extends Fixture
         $comment1->setAuthor('Fabien');
         $comment1->setEmail('[email protected]');
         $comment1->setText('This was a great conference.');
+        $comment1->setState('published');
         $manager->persist($comment1);

+        $comment2 = new Comment();
+        $comment2->setConference($amsterdam);
+        $comment2->setAuthor('Lucas');
+        $comment2->setEmail('[email protected]');
+        $comment2->setText('I think this one is going to be moderated.');
+        $manager->persist($comment2);
+
         $admin = new Admin();
         $admin->setRoles(['ROLE_ADMIN']);
         $admin->setUsername('admin');

Pentru testele controlerului, simulează validarea:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
--- a/tests/Controller/ConferenceControllerTest.php
+++ b/tests/Controller/ConferenceControllerTest.php
@@ -2,6 +2,8 @@

 namespace App\Tests\Controller;

+use App\Repository\CommentRepository;
+use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;

 class ConferenceControllerTest extends WebTestCase
@@ -22,11 +24,17 @@ class ConferenceControllerTest extends WebTestCase
         $client->submitForm('Submit', [
             'comment_form[author]' => 'Fabien',
             'comment_form[text]' => 'Some feedback from an automated functional test',
-            'comment_form[email]' => '[email protected]',
+            'comment_form[email]' => $email = '[email protected]',
             'comment_form[photo]' => dirname(__DIR__, 2).'/public/images/under-construction.gif',
         ]);
         $this->assertResponseRedirects();
+
+        // simulate comment validation
+        $comment = self::$container->get(CommentRepository::class)->findOneByEmail($email);
+        $comment->setState('published');
+        self::$container->get(EntityManagerInterface::class)->flush();
+
         $client->followRedirect();
         $this->assertSelectorExists('div:contains("There are 2 comments")');
     }

De la un test PHPUnit, poți obține orice serviciu din container prin self::$container->get(); aceasta oferă, de asemenea, acces la servicii non-publice.

Componenta Messenger

Gestionarea codului asincron cu Symfony este sarcina componentei Messenger:

1
$ symfony composer req messenger

Când o anumită logică ar trebui să fie executată în mod asincron, expediază un mesaj la un bus messenger. Bus-ul stochează mesajul într-o coadă de mesaje (queue) și încetează execuția imediat pentru a permite reluarea fluxului de operații cât mai repede posibil.

Un consumator rulează continuu în fundal pentru a citi mesaje noi din coada de mesaje și pentru a executa logica asociată. Consumatorul poate rula pe același server ca și aplicația web sau pe unul separat.

Este foarte similar cu modul în care sunt gestionate cererile HTTP, cu excepția faptului că nu avem răspunsuri.

Dezvoltarea unui Handler Messenger

Un mesaj este obiectul cu date al unei clase care nu ar trebui să dețină nicio logică. Acesta va fi serializat pentru a fi stocat într-o coadă de mesaje, astfel încât să stocăm doar date simple serializabile.

Creează clasa CommentMessage:

src/Message/CommentMessage.php
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
namespace App\Message;

class CommentMessage
{
    private $id;
    private $context;

    public function __construct(int $id, array $context = [])
    {
        $this->id = $id;
        $this->context = $context;
    }

    public function getId(): int
    {
        return $this->id;
    }

    public function getContext(): array
    {
        return $this->context;
    }
}

În lumea Messenger, nu avem controlere, ci handlere de mesaje.

Creează o clasă CommentMessageHandler sub un nou spațiu de nume App\MessageHandler care știe să gestioneze mesajele CommentMessage:

src/MessageHandler/CommentMessageHandler.php
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
namespace App\MessageHandler;

use App\Message\CommentMessage;
use App\Repository\CommentRepository;
use App\SpamChecker;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class CommentMessageHandler implements MessageHandlerInterface
{
    private $spamChecker;
    private $entityManager;
    private $commentRepository;

    public function __construct(EntityManagerInterface $entityManager, SpamChecker $spamChecker, CommentRepository $commentRepository)
    {
        $this->entityManager = $entityManager;
        $this->spamChecker = $spamChecker;
        $this->commentRepository = $commentRepository;
    }

    public function __invoke(CommentMessage $message)
    {
        $comment = $this->commentRepository->find($message->getId());
        if (!$comment) {
            return;
        }

        if (2 === $this->spamChecker->getSpamScore($comment, $message->getContext())) {
            $comment->setState('spam');
        } else {
            $comment->setState('published');
        }

        $this->entityManager->flush();
    }
}

MessageHandlerInterface este o interfață marker. Aceasta ajută Symfony la înregistrarea și la configurarea automată a clasei ca un handler (manipulator) de mesagerie. Prin convenție, logica unui handler este implementată într-o metodă numită __invoke(). Indicația de tip CommentMessage pe argumentul acestei metode indică Messenger-ului clasa care va manipula acest mesaj.

Actualizează controlerul pentru a utiliza noul sistem:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
--- a/src/Controller/ConferenceController.php
+++ b/src/Controller/ConferenceController.php
@@ -5,14 +5,15 @@ namespace App\Controller;
 use App\Entity\Comment;
 use App\Entity\Conference;
 use App\Form\CommentFormType;
+use App\Message\CommentMessage;
 use App\Repository\CommentRepository;
 use App\Repository\ConferenceRepository;
-use App\SpamChecker;
 use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
 use Symfony\Component\HttpFoundation\File\Exception\FileException;
 use Symfony\Component\HttpFoundation\Request;
 use Symfony\Component\HttpFoundation\Response;
+use Symfony\Component\Messenger\MessageBusInterface;
 use Symfony\Component\Routing\Annotation\Route;
 use Twig\Environment;

@@ -20,11 +21,13 @@ class ConferenceController extends AbstractController
 {
     private $twig;
     private $entityManager;
+    private $bus;

-    public function __construct(Environment $twig, EntityManagerInterface $entityManager)
+    public function __construct(Environment $twig, EntityManagerInterface $entityManager, MessageBusInterface $bus)
     {
         $this->twig = $twig;
         $this->entityManager = $entityManager;
+        $this->bus = $bus;
     }

     /**
@@ -40,7 +43,7 @@ class ConferenceController extends AbstractController
     /**
      * @Route("/conference/{slug}", name="conference")
      */
-    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, SpamChecker $spamChecker, string $photoDir)
+    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, string $photoDir)
     {
         $comment = new Comment();
         $form = $this->createForm(CommentFormType::class, $comment);
@@ -59,6 +62,7 @@ class ConferenceController extends AbstractController
             }

             $this->entityManager->persist($comment);
+            $this->entityManager->flush();

             $context = [
                 'user_ip' => $request->getClientIp(),
@@ -66,11 +70,8 @@ class ConferenceController extends AbstractController
                 'referrer' => $request->headers->get('referer'),
                 'permalink' => $request->getUri(),
             ];
-            if (2 === $spamChecker->getSpamScore($comment, $context)) {
-                throw new \RuntimeException('Blatant spam, go away!');
-            }

-            $this->entityManager->flush();
+            $this->bus->dispatch(new CommentMessage($comment->getId(), $context));

             return $this->redirectToRoute('conference', ['slug' => $conference->getSlug()]);
         }

În loc să depindem de Spam Checker, expediem un mesaj în bus. Handler-ul decide apoi ce să facă cu acesta.

Am obținut ceva neașteptat. Am decuplat controlerul nostru de la Spam Checker și am mutat logica într-o clasă nouă: handler. Este un caz perfect pentru utilizarea bus-ului. Testează codul, pur și simplu funcționează. Totul este încă făcut în mod sincron, dar probabil codul este deja „mai bun”.

Restricționarea comentariilor afișate

Actualizează logica afișajului pentru a evita apariția comentariilor nepublicate pe frontend:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
--- a/src/Repository/CommentRepository.php
+++ b/src/Repository/CommentRepository.php
@@ -25,7 +25,9 @@ class CommentRepository extends ServiceEntityRepository
     {
         return $this->createQueryBuilder('c')
             ->andWhere('c.conference = :conference')
+            ->andWhere('c.state = :state')
             ->setParameter('conference', $conference)
+            ->setParameter('state', 'published')
             ->orderBy('c.createdAt', 'DESC')
             ->setMaxResults($limit)
             ->setFirstResult($offset)

Devenind cu adevărat asincron

În mod implicit, manipulatorii sunt apelați în mod sincron. Pentru a-i face să execute în mod asincron, trebuie să configurezi explicit ce coadă de mesaje să fie folosită pentru fiecare handler din fișierul de configurare config/packages/messenger.yaml:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
--- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -5,10 +5,10 @@ framework:

         transports:
             # https://symfony.com/doc/current/messenger.html#transport-configuration
-            # async: '%env(MESSENGER_TRANSPORT_DSN)%'
+            async: '%env(RABBITMQ_DSN)%'
             # failed: 'doctrine://default?queue_name=failed'
             # sync: 'sync://'

         routing:
             # Route your messages to the transports
-            # 'App\Message\YourMessage': async
+            App\Message\CommentMessage: async

Configurația spune bus-ului să expedieze instanțe de App\Message\CommentMessage la coada de mesaje async, care este definită de un DSN, stocată în variabila de mediu RABBITMQ_DSN.

Adăugarea RabbitMQ la Docker Stack

După cum ai putut ghici, vom folosi RabbitMQ:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -12,3 +12,7 @@ services:
     redis:
         image: redis:5-alpine
         ports: [6379]
+
+    rabbitmq:
+        image: rabbitmq:3.7-management
+        ports: [5672, 15672]

Repornind serviciile Docker

Pentru a forța Docker Composer să țină cont de container-ul RabbitMQ, repornește containerele:

1
2
$ docker-compose stop
$ docker-compose up -d
1
$ sleep 10

Consumând mesajele

Dacă încerci să expediezi un nou comentariu, verificatorul de spam nu va mai fi apelat. Adaugă un apel error_log() în metoda getSpamScore() pentru a confirma asta. În schimb, un mesaj este în așteptare în RabbitMQ, gata să fie consumat de unele procese.

După cum îți poți imagina, Symfony vine cu o comandă de consumator. Ruleaz-o acum:

1
$ symfony console messenger:consume async -vv

Acesta ar trebui să consume imediat mesajul trimis pentru comentariul expediat:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
 [OK] Consuming messages from transports "async".

 // The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.

 // Quit the worker with CONTROL-C.

11:30:20 INFO      [messenger] Received message App\Message\CommentMessage ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]
11:30:20 INFO      [http_client] Request: "POST https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [http_client] Response: "200 https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [messenger] Message App\Message\CommentMessage handled by App\MessageHandler\CommentMessageHandler::__invoke ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage","handler" => "App\MessageHandler\CommentMessageHandler::__invoke"]
11:30:20 INFO      [messenger] App\Message\CommentMessage was handled successfully (acknowledging to transport). ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]

Activitatea consumatorului de mesaje este înregistrată, dar primești feedback instantaneu pe consolă utilizând opțiunea -vv. Ar trebui să fii în măsură să detectezi apelul către API-ul Akismet.

Pentru a opri consumatorul, apasă Ctrl + C.

Explorarea interfeței de gestionare web RabbitMQ

Dacă dorești să vezi cozile de mesaje și mesajele care sunt procesate de RabbitMQ, deschide interfața sa de gestionare web:

1
$ symfony open:local:rabbitmq

Sau din bara de instrumente de depanare web:

Utilizați guest/guest pentru a te autentifica la interfața de gestionare RabbitMQ:

Executarea comenzilor în fundal

În loc să lansăm consumatorul de fiecare dată când postăm un comentariu și îl oprim imediat după aceea, preferăm să-l rulăm continuu, fără a avea prea multe ferestre sau file deschise.

Symfony CLI poate gestiona astfel de comenzi de fundal sau workers folosind opțiunea (-d) cu comanda run.

Pornește din nou consumatorul de mesaje, dar trimite-l în fundal:

1
$ symfony run -d --watch=config,src,templates,vendor symfony console messenger:consume async

Opțiunea --watch indică Symfony că comanda trebuie repornită ori de câte ori există o schimbare a sistemului de fișiere în config/, src/, templates / sau vendor/ directoare.

Notă

Nu folosi -vv deoarece vei duplica mesajele în server:log (mesaje logate și mesaje de consolă).

Dacă consumatorul nu mai funcționează din anumite motive (limită de memorie, eroare, …), acesta va fi repornit automat. Și dacă consumatorul eșuează prea repede, Symfony CLI se va opri.

Jurnalele sunt transmise prin server symfony:log cu toate celelalte provenind de la PHP, serverul web și aplicație:

1
$ symfony server:log

Folosește comanda server: status pentru a enumera toate comenzile de fundal gestionate pentru proiectul curent:

1
2
3
4
$ symfony server:status

Web server listening on https://127.0.0.1:8000
  Command symfony console messenger:consume async running with PID 15774 (watching config/, src/, templates/)

Pentru a opri un worker, oprește serverul web sau distruge PID-ul dat de comanda server:status:

1
$ kill 15774

Reîncearcă expedierea mesajelor eșuate

Ce se întâmplă dacă Akismet nu răspunde în timp ce consumă un mesaj? Nu există niciun impact pentru persoanele care trimit comentarii, dar mesajul este pierdut și spamul nu este verificat.

Messenger are un mecanism de reîncercare pentru situația când apare o excepție în timpul manipulării unui mesaj. Să-l configurăm:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
--- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -5,10 +5,17 @@ framework:

         transports:
             # https://symfony.com/doc/current/messenger.html#transport-configuration
-            async: '%env(RABBITMQ_DSN)%'
-            # failed: 'doctrine://default?queue_name=failed'
+            async:
+                dsn: '%env(RABBITMQ_DSN)%'
+                retry_strategy:
+                    max_retries: 3
+                    multiplier: 2
+
+            failed: 'doctrine://default?queue_name=failed'
             # sync: 'sync://'

+        failure_transport: failed
+
         routing:
             # Route your messages to the transports
             App\Message\CommentMessage: async

Dacă apare o problemă în timpul manipulării unui mesaj, consumatorul va încerca de trei ori înainte de a renunța. Dar, în loc să elimine mesajul, îl va stoca într-un spațiu de stocare permanent, coada de mesaje failed, care folosește baza de date Doctrine.

Verifică mesajele eșuate și încearcă-le din nou prin următoarele comenzi:

1
2
3
$ symfony console messenger:failed:show

$ symfony console messenger:failed:retry

Lansarea RabbitMQ

Adăugarea RabbitMQ la serverele de producție se poate face adăugându-l la lista de servicii:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
--- a/.symfony/services.yaml
+++ b/.symfony/services.yaml
@@ -5,3 +5,8 @@ db:

 rediscache:
     type: redis:5.0
+
+queue:
+    type: rabbitmq:3.7
+    disk: 1024
+    size: S

Adaugă-l în configurația containerului web și activează extensia PHP amqp:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
--- a/.symfony.cloud.yaml
+++ b/.symfony.cloud.yaml
@@ -4,6 +4,7 @@ type: php:7.4

 runtime:
     extensions:
+        - amqp
         - pdo_pgsql
         - apcu
         - mbstring
@@ -22,6 +23,7 @@ variables:
 relationships:
     database: "db:postgresql"
     redis: "rediscache:redis"
+    rabbitmq: "queue:rabbitmq"

 web:
     locations:

Când serviciul RabbitMQ este instalat pe un proiect, poți accesa interfața sa de gestionare web prin deschiderea mai întâi a tunelului:

1
2
3
4
5
$ symfony tunnel:open
$ symfony open:remote:rabbitmq

# when done
$ symfony tunnel:close

Executarea lucrătorilor pe SymfonyCloud

Pentru a consuma mesajele din RabbitMQ, trebuie să executăm comanda messenger:consume continuu. Pe SymfonyCloud, acesta este rolul unui worker:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
--- a/.symfony.cloud.yaml
+++ b/.symfony.cloud.yaml
@@ -46,3 +46,12 @@ hooks:
         set -x -e

         (>&2 symfony-deploy)
+
+workers:
+    messages:
+        commands:
+            start: |
+                set -x -e
+
+                (>&2 symfony-deploy)
+                php bin/console messenger:consume async -vv --time-limit=3600 --memory-limit=128M

Ca și pentru Symfony CLI, SymfonyCloud gestionează repornirile și jurnalele.

Pentru a obține jurnalul unui worker, utilizează:

1
$ symfony logs --worker=messages all

This work, including the code samples, is licensed under a Creative Commons BY-NC-SA 4.0 license.