Etap 18: Idziemy w asynchroniczność

5.0 version
Maintained

Idziemy w asynchroniczność

Sprawdzenie, czy komentarz nie jest spamem, podczas obsługi wysyłania formularza może prowadzić do pewnych problemów. Jeśli interfejs API Akismet stanie się powolny, nasza strona również będzie powolna dla użytkowników. Co gorsza, jeżeli przekroczymy limit czasu lub API Akismet jest niedostępne, możemy stracić komentarze.

Idealnie byłoby, gdybyśmy przechowywali przesłane dane bez ich publikowania i natychmiast zwracali odpowiedź. Sprawdzenie, czy nie ma spamu, może zostać wykonane poza głównym wątkiem.

Oznaczanie komentarzy

Musimy wprowadzić atrybut state określający stan komentarzy, przyjmujący trzy wartości: submitted dla komentarzy wysłanych, published dla opublikowanych oraz spam dla odrzuconych jako spamowe.

Dodaj atrybut state do klasy Comment:

1
$ symfony console make:entity Comment

Utwórz migrację bazy danych:

1
$ symfony console make:migration

Zmodyfikuj migrację tak, by zaktualizowała status wszystkich istniejących komentarzy na published:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
--- a/migrations/Version00000000000000.php
+++ b/migrations/Version00000000000000.php
@@ -20,7 +20,9 @@ final class Version20200714155905 extends AbstractMigration
     public function up(Schema $schema) : void
     {
         // this up() migration is auto-generated, please modify it to your needs
-        $this->addSql('ALTER TABLE comment ADD state VARCHAR(255) NOT NULL');
+        $this->addSql('ALTER TABLE comment ADD state VARCHAR(255)');
+        $this->addSql("UPDATE comment SET state='published'");
+        $this->addSql('ALTER TABLE comment ALTER COLUMN state SET NOT NULL');
     }

     public function down(Schema $schema) : void

Uruchom migrację bazy danych:

1
$ symfony console doctrine:migrations:migrate

Powinniśmy także upewnić się, że domyślna wartość atrybutu state jest ustawiona na submitted:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
--- a/src/Entity/Comment.php
+++ b/src/Entity/Comment.php
@@ -49,9 +49,9 @@ class Comment
     private $photoFilename;

     /**
-     * @ORM\Column(type="string", length=255)
+     * @ORM\Column(type="string", length=255, options={"default": "submitted"})
      */
-    private $state;
+    private $state = 'submitted';

     public function __toString(): string
     {

Zaktualizuj konfigurację EasyAdmin tak, by móc zobaczyć stan komentarza:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
--- a/config/packages/easy_admin.yaml
+++ b/config/packages/easy_admin.yaml
@@ -18,6 +18,7 @@ easy_admin:
                     - author
                     - { property: 'email', type: 'email' }
                     - { property: 'photoFilename', type: 'image', 'base_path': "/uploads/photos", label: 'Photo' }
+                    - state
                     - { property: 'createdAt', type: 'datetime' }
                 sort: ['createdAt', 'ASC']
                 filters: ['conference']
@@ -26,5 +27,6 @@ easy_admin:
                     - { property: 'conference' }
                     - { property: 'createdAt', type: datetime, type_options: { attr: { readonly: true } } }
                     - 'author'
+                    - { property: 'state' }
                     - { property: 'email', type: 'email' }
                     - text

Nie zapomnij również zaktualizować testów poprzez ustawienie atrybutu state w danych testowych (ang. fixtures):

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
--- a/src/DataFixtures/AppFixtures.php
+++ b/src/DataFixtures/AppFixtures.php
@@ -37,8 +37,16 @@ class AppFixtures extends Fixture
         $comment1->setAuthor('Fabien');
         $comment1->setEmail('[email protected]');
         $comment1->setText('This was a great conference.');
+        $comment1->setState('published');
         $manager->persist($comment1);

+        $comment2 = new Comment();
+        $comment2->setConference($amsterdam);
+        $comment2->setAuthor('Lucas');
+        $comment2->setEmail('[email protected]');
+        $comment2->setText('I think this one is going to be moderated.');
+        $manager->persist($comment2);
+
         $admin = new Admin();
         $admin->setRoles(['ROLE_ADMIN']);
         $admin->setUsername('admin');

W przypadku testów kontrolera należy przeprowadzić symulację walidacji:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
--- a/tests/Controller/ConferenceControllerTest.php
+++ b/tests/Controller/ConferenceControllerTest.php
@@ -2,6 +2,8 @@

 namespace App\Tests\Controller;

+use App\Repository\CommentRepository;
+use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;

 class ConferenceControllerTest extends WebTestCase
@@ -22,11 +24,17 @@ class ConferenceControllerTest extends WebTestCase
         $client->submitForm('Submit', [
             'comment_form[author]' => 'Fabien',
             'comment_form[text]' => 'Some feedback from an automated functional test',
-            'comment_form[email]' => '[email protected]',
+            'comment_form[email]' => $email = '[email protected]',
             'comment_form[photo]' => dirname(__DIR__, 2).'/public/images/under-construction.gif',
         ]);
         $this->assertResponseRedirects();
+
+        // simulate comment validation
+        $comment = self::$container->get(CommentRepository::class)->findOneByEmail($email);
+        $comment->setState('published');
+        self::$container->get(EntityManagerInterface::class)->flush();
+
         $client->followRedirect();
         $this->assertSelectorExists('div:contains("There are 2 comments")');
     }

Z poziomu testu PHPUnit można uzyskać dowolną usługę z kontenera za pośrednictwem self::$container->get(); daje to również dostęp do usług niepublicznych.

Zrozumienie komponentu Messenger

Zarządzanie kodem asynchronicznym w Symfony jest zadaniem komponentu Messenger:

1
$ symfony composer req messenger

Gdy jakieś działania powinny być wykonywane asynchronicznie, wyślij wiadomość (ang. message) do magistrali komunikacyjnej (ang. messenger bus). Magistrala dodaje wiadomość do kolejki (ang. queue) i natychmiast zwraca wynik, aby umożliwić wykonywanie od razu kolejnych operacji.

Konsument (ang. consumer) pracuje stale w tle, czytając nowe wiadomości w kolejce i wykonuje związane z nimi schematy działań. Konsument może działać na tym samym serwerze co aplikacja internetowa lub osobnym.

Jest to bardzo podobne do sposobu obsługi żądań HTTP, z wyjątkiem tego, że nie mamy odpowiedzi.

Kodowanie obsługi wiadomości (ang. message handler)

Wiadomość jest klasą obiektu danych, z którą nie są związane żadne reguły działania. Będzie ona zserializowana, aby być przechowywana w kolejce, więc przechowuj tylko „proste” dane, które można serializować.

Stwórz klasę CommentMessage:

src/Message/CommentMessage.php
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
namespace App\Message;

class CommentMessage
{
    private $id;
    private $context;

    public function __construct(int $id, array $context = [])
    {
        $this->id = $id;
        $this->context = $context;
    }

    public function getId(): int
    {
        return $this->id;
    }

    public function getContext(): array
    {
        return $this->context;
    }
}

W świecie Messengera nie mamy kontrolerów, lecz obiekty obsługi wiadomości (ang. message handlers).

Utwórz klasę CommentMessageHandler w nowej przestrzeni nazw App\MessageHandler, która wie, jak obsługiwać wiadomości CommentMessage:

src/MessageHandler/CommentMessageHandler.php
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
namespace App\MessageHandler;

use App\Message\CommentMessage;
use App\Repository\CommentRepository;
use App\SpamChecker;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class CommentMessageHandler implements MessageHandlerInterface
{
    private $spamChecker;
    private $entityManager;
    private $commentRepository;

    public function __construct(EntityManagerInterface $entityManager, SpamChecker $spamChecker, CommentRepository $commentRepository)
    {
        $this->entityManager = $entityManager;
        $this->spamChecker = $spamChecker;
        $this->commentRepository = $commentRepository;
    }

    public function __invoke(CommentMessage $message)
    {
        $comment = $this->commentRepository->find($message->getId());
        if (!$comment) {
            return;
        }

        if (2 === $this->spamChecker->getSpamScore($comment, $message->getContext())) {
            $comment->setState('spam');
        } else {
            $comment->setState('published');
        }

        $this->entityManager->flush();
    }
}

MessageHandlerInterface jest interfejsem znacznikowym (ang. marker interface). Pomaga on Symfony tylko w automatycznej rejestracji i konfiguracji klasy do obsługi wiadomości. Zgodnie z konwencją, reguły obsługi znajdują się w metodzie __invoke(). Podpowiedź typu CommentMessage do jednego z argumentów tej metody mówi Messengerowi, którą klasę będzie obsługiwać.

Zaktualizuj kontroler, aby korzystać z nowego systemu:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
--- a/src/Controller/ConferenceController.php
+++ b/src/Controller/ConferenceController.php
@@ -5,14 +5,15 @@ namespace App\Controller;
 use App\Entity\Comment;
 use App\Entity\Conference;
 use App\Form\CommentFormType;
+use App\Message\CommentMessage;
 use App\Repository\CommentRepository;
 use App\Repository\ConferenceRepository;
-use App\SpamChecker;
 use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
 use Symfony\Component\HttpFoundation\File\Exception\FileException;
 use Symfony\Component\HttpFoundation\Request;
 use Symfony\Component\HttpFoundation\Response;
+use Symfony\Component\Messenger\MessageBusInterface;
 use Symfony\Component\Routing\Annotation\Route;
 use Twig\Environment;

@@ -20,11 +21,13 @@ class ConferenceController extends AbstractController
 {
     private $twig;
     private $entityManager;
+    private $bus;

-    public function __construct(Environment $twig, EntityManagerInterface $entityManager)
+    public function __construct(Environment $twig, EntityManagerInterface $entityManager, MessageBusInterface $bus)
     {
         $this->twig = $twig;
         $this->entityManager = $entityManager;
+        $this->bus = $bus;
     }

     /**
@@ -40,7 +43,7 @@ class ConferenceController extends AbstractController
     /**
      * @Route("/conference/{slug}", name="conference")
      */
-    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, SpamChecker $spamChecker, string $photoDir)
+    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, string $photoDir)
     {
         $comment = new Comment();
         $form = $this->createForm(CommentFormType::class, $comment);
@@ -59,6 +62,7 @@ class ConferenceController extends AbstractController
             }

             $this->entityManager->persist($comment);
+            $this->entityManager->flush();

             $context = [
                 'user_ip' => $request->getClientIp(),
@@ -66,11 +70,8 @@ class ConferenceController extends AbstractController
                 'referrer' => $request->headers->get('referer'),
                 'permalink' => $request->getUri(),
             ];
-            if (2 === $spamChecker->getSpamScore($comment, $context)) {
-                throw new \RuntimeException('Blatant spam, go away!');
-            }

-            $this->entityManager->flush();
+            $this->bus->dispatch(new CommentMessage($comment->getId(), $context));

             return $this->redirectToRoute('conference', ['slug' => $conference->getSlug()]);
         }

Zamiast polegać na kontrolerze spamu, wysyłamy teraz wiadomość do magistrali. Następnie obiekt obsługi decyduje, co z nią zrobić.

Osiągnęliśmy coś nieoczekiwanego. Odłączyliśmy nasz kontroler od kontrolera spamu i przenieśliśmy schemat działania do nowej klasy – obsługi (ang. handler). Jest to idealne zastosowanie dla magistrali. Przetestuj kod: działa. Wszystko jest nadal wykonywane synchronicznie, ale kod jest już prawdopodobnie „lepszy”.

Ograniczenie wyświetlanych komentarzy

Zaktualizuj reguły wyświetlania, aby uniknąć pojawienia się nieopublikowanych komentarzy na stronie:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
--- a/src/Repository/CommentRepository.php
+++ b/src/Repository/CommentRepository.php
@@ -25,7 +25,9 @@ class CommentRepository extends ServiceEntityRepository
     {
         return $this->createQueryBuilder('c')
             ->andWhere('c.conference = :conference')
+            ->andWhere('c.state = :state')
             ->setParameter('conference', $conference)
+            ->setParameter('state', 'published')
             ->orderBy('c.createdAt', 'DESC')
             ->setMaxResults($limit)
             ->setFirstResult($offset)

Idziemy w prawdziwą asynchroniczność

Domyślnie, obsługa wywoływana jest synchronicznie. Aby przejść do trybu asynchronicznego, należy skonfigurować, której kolejki użyć dla każdego obiektu obsługującego wiadomości (ang. handler) w pliku konfiguracyjnym config/packages/messenger.yaml:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
--- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -5,10 +5,10 @@ framework:

         transports:
             # https://symfony.com/doc/current/messenger.html#transport-configuration
-            # async: '%env(MESSENGER_TRANSPORT_DSN)%'
+            async: '%env(RABBITMQ_DSN)%'
             # failed: 'doctrine://default?queue_name=failed'
             # sync: 'sync://'

         routing:
             # Route your messages to the transports
-            # 'App\Message\YourMessage': async
+            App\Message\CommentMessage: async

Konfiguracja każe magistrali wysyłać instancje App\Message\CommentMessage do kolejki async, która jest zdefiniowana przez DSN, przechowywany w zmiennej środowiskowej RABBITMQ_DSN.

Dodawanie RabbitMQ do Docker Stack

Jak możesz się domyślić, użyjemy RabbitMQ:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -12,3 +12,7 @@ services:
     redis:
         image: redis:5-alpine
         ports: [6379]
+
+    rabbitmq:
+        image: rabbitmq:3.7-management
+        ports: [5672, 15672]

Restartowanie usług Dockera

Aby zmusić Docker Compose do wzięcia pod uwagę kontenera RabbitMQ, zatrzymaj kontenery i uruchom je ponownie:

1
2
$ docker-compose stop
$ docker-compose up -d
1
$ sleep 10

Przetwarzanie wiadomości

Jeśli spróbujesz przesłać nowy komentarz, kontroler spamu nie będzie już wywoływany. Dodaj wywołanie error_log() w metodzie getSpamScore(), aby to potwierdzić. Zamiast tego, wiadomość czeka w RabbitMQ, gotowa do skonsumowania przez jakiś proces.

Jak możesz się domyślić, Symfony posiada polecenie przetwarzania wiadomości. Uruchom je teraz:

1
$ symfony console messenger:consume async -vv

Powinno ono natychmiast przetworzyć wiadomość wysłaną w związku z przesłanymi komentarzami:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
 [OK] Consuming messages from transports "async".

 // The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.

 // Quit the worker with CONTROL-C.

11:30:20 INFO      [messenger] Received message App\Message\CommentMessage ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]
11:30:20 INFO      [http_client] Request: "POST https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [http_client] Response: "200 https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [messenger] Message App\Message\CommentMessage handled by App\MessageHandler\CommentMessageHandler::__invoke ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage","handler" => "App\MessageHandler\CommentMessageHandler::__invoke"]
11:30:20 INFO      [messenger] App\Message\CommentMessage was handled successfully (acknowledging to transport). ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]

Aktywność konsumentów jest zapisywana w logach, ale możesz otrzymać natychmiastową informację zwrotną na konsoli, przekazując flagę``-vv``. Możesz dostrzec nawet połączenie do API Akismet.

Aby zatrzymać konsumenta, naciśnij Ctrl+C.

Zapoznanie się z interfejsem zarządzania RabbitMQ

Jeśli chcesz zobaczyć kolejki i wiadomości płynące przez RabbitMQ, otwórz jego interfejs zarządzania za pomocą komendy:

1
$ symfony open:local:rabbitmq

albo z paska narzędzi do debugowania:

Użyj guest/guest aby zalogować się do interfejsu zarządzania RabbitMQ:

Robotnicy (ang. workers) działający w tle

Zamiast uruchamiać konsumenta za każdym razem, gdy zamieszczamy komentarz, i zatrzymywać go natychmiast po tym, chcemy uruchomić go w sposób ciągły, bez otwierania zbyt wielu okien lub zakładek terminala.

Symfony CLI może zarządzać takimi poleceniami w tle (robotnikami) używając flagi demona (-d) na poleceniu run.

Uruchom ponownie konsumenta wiadomości, ale umieść go w tle:

1
$ symfony run -d --watch=config,src,templates,vendor symfony console messenger:consume async

Opcja --watch mówi Symfony, że polecenie musi zostać zrestartowane za każdym razem, gdy dochodzi do zmiany plików w katalogach config/, src/, templates/, lub vendor/.

Informacja

Nie używaj -vv, ponieważ otrzymasz zduplikowane wiadomości w server:log (wiadomości zarówno w logach i w konsoli).

Jeśli konsument przestanie pracować z jakiegoś powodu (limit pamięci, błąd, itp.), to zostanie automatycznie uruchomiony ponownie. Ale jeśli konsument przestanie pracować natychmiast po uruchomieniu – Symfony CLI podda się.

Logi są przesyłane strumieniowo przez symfony server:log wraz z wszystkimi innymi logami pochodzącymi z PHP, serwera WWW i aplikacji:

1
$ symfony server:log

Użyj polecenia server:status, aby wyświetlić listę wszystkich robotników pracujących w tle w bieżącym projekcie:

1
2
3
4
$ symfony server:status

Web server listening on https://127.0.0.1:8000
  Command symfony console messenger:consume async running with PID 15774 (watching config/, src/, templates/)

Aby zatrzymać robotnika, zatrzymaj serwer WWW lub zabij PID podany przez polecenie server:status:

1
$ kill 15774

Ponawianie dostarczenia niedostarczonych wiadomości

A co, jeśli API Akismet nie działa podczas przetwarzania wiadomości? Twórca komentarza nie ma o tym pojęcia, ale wiadomość zostaje utracona i komentarz, którego dotyczyła, nie zostaje sprawdzony pod kątem bycia spamem.

Messenger posiada mechanizm ponawiania przetwarzania wiadomości w przypadku wystąpienia wyjątku podczas jej obsługi. Skonfigurujmy go:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
--- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -5,10 +5,17 @@ framework:

         transports:
             # https://symfony.com/doc/current/messenger.html#transport-configuration
-            async: '%env(RABBITMQ_DSN)%'
-            # failed: 'doctrine://default?queue_name=failed'
+            async:
+                dsn: '%env(RABBITMQ_DSN)%'
+                retry_strategy:
+                    max_retries: 3
+                    multiplier: 2
+
+            failed: 'doctrine://default?queue_name=failed'
             # sync: 'sync://'

+        failure_transport: failed
+
         routing:
             # Route your messages to the transports
             App\Message\CommentMessage: async

Jeśli pojawi się problem podczas obsługi wiadomości, konsument spróbuje ponownie trzy razy przed rezygnacją. A potem zamiast odrzucić wiadomość, umieści ją w bardziej trwałym magazynie, w kolejce nieudanych przetworzeń (ang. failed), która wykorzystuje bazę danych.

Przejrzyj nieudane wiadomości, a następnie ponów przetwarzanie za pomocą następujących poleceń:

1
2
3
$ symfony console messenger:failed:show

$ symfony console messenger:failed:retry

Wdrażanie RabbitMQ

RabbitMQ można dodać do serwerów produkcyjnych poprzez dodanie go do listy usług:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
--- a/.symfony/services.yaml
+++ b/.symfony/services.yaml
@@ -5,3 +5,8 @@ db:

 rediscache:
     type: redis:5.0
+
+queue:
+    type: rabbitmq:3.7
+    disk: 1024
+    size: S

Odwołaj się do niego w głównej konfiguracji kontenera sieciowego oraz włącz rozszerzenie PHP amqp:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
--- a/.symfony.cloud.yaml
+++ b/.symfony.cloud.yaml
@@ -4,6 +4,7 @@ type: php:7.4

 runtime:
     extensions:
+        - amqp
         - pdo_pgsql
         - apcu
         - mbstring
@@ -22,6 +23,7 @@ variables:
 relationships:
     database: "db:postgresql"
     redis: "rediscache:redis"
+    rabbitmq: "queue:rabbitmq"

 web:
     locations:

Kiedy usługa RabbitMQ jest zainstalowana w projekcie, możesz uzyskać dostęp do jej interfejsu zarządzania, otwierając tunel:

1
2
3
4
5
$ symfony tunnel:open
$ symfony open:remote:rabbitmq

# when done
$ symfony tunnel:close

Uruchamianie robotników na SymfonyCloud

Aby przetwarzać wiadomości od RabbitMQ, musimy bezustannie uruchamiać polecenie messenger:consume. Na SymfonyCloud jest to rola robotnika (ang. worker):

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
--- a/.symfony.cloud.yaml
+++ b/.symfony.cloud.yaml
@@ -46,3 +46,12 @@ hooks:
         set -x -e

         (>&2 symfony-deploy)
+
+workers:
+    messages:
+        commands:
+            start: |
+                set -x -e
+
+                (>&2 symfony-deploy)
+                php bin/console messenger:consume async -vv --time-limit=3600 --memory-limit=128M

Podobnie jak Symfony CLI, SymfonyCloud zarządza restartami i logami.

Aby odczytać logi robotnika, użyj:

1
$ symfony logs --worker=messages all

  • « Previous Etap 17: Testowanie
  • Next » Etap 19: Podejmowanie decyzji przy użyciu komponentu Workflow

This work, including the code samples, is licensed under a Creative Commons BY-NC-SA 4.0 license.