Skip to content

Idziemy w asynchroniczność

Sprawdzenie, czy komentarz nie jest spamem, podczas obsługi wysyłania formularza może prowadzić do pewnych problemów. Jeśli interfejs API Akismet stanie się powolny, nasza strona również będzie powolna dla użytkowników. Co gorsza, jeżeli przekroczymy limit czasu lub API Akismet jest niedostępne, możemy stracić komentarze.

Idealnie byłoby, gdybyśmy przechowywali przesłane dane bez ich publikowania i natychmiast zwracali odpowiedź. Sprawdzenie, czy nie ma spamu, może zostać wykonane poza głównym wątkiem.

Oznaczanie komentarzy

Musimy wprowadzić atrybut state określający stan komentarzy, przyjmujący trzy wartości: submitted dla komentarzy wysłanych, published dla opublikowanych oraz spam dla odrzuconych jako spamowe.

Dodaj atrybut state do klasy Comment:

1
$ symfony console make:entity Comment

Powinniśmy także upewnić się, że domyślna wartość atrybutu state jest ustawiona na submitted:

1
2
3
4
5
6
7
8
9
10
11
12
13
--- a/src/Entity/Comment.php
+++ b/src/Entity/Comment.php
@@ -38,8 +38,8 @@ class Comment
     #[ORM\Column(type: 'string', length: 255, nullable: true)]
     private $photoFilename;

-    #[ORM\Column(type: 'string', length: 255)]
-    private $state;
+    #[ORM\Column(type: 'string', length: 255, options: ["default" => "submitted"])]
+    private $state = 'submitted';

     public function __toString(): string
     {

Utwórz migrację bazy danych:

1
$ symfony console make:migration

Zmodyfikuj migrację tak, by zaktualizowała status wszystkich istniejących komentarzy na published:

1
2
3
4
5
6
7
8
9
10
--- a/migrations/Version00000000000000.php
+++ b/migrations/Version00000000000000.php
@@ -21,6 +21,7 @@ final class Version00000000000000 extends AbstractMigration
     {
         // this up() migration is auto-generated, please modify it to your needs
         $this->addSql('ALTER TABLE comment ADD state VARCHAR(255) DEFAULT \'submitted\' NOT NULL');
+        $this->addSql("UPDATE comment SET state='published'");
     }

     public function down(Schema $schema): void

Uruchom migrację bazy danych:

1
$ symfony console doctrine:migrations:migrate

Zaktualizuj reguły wyświetlania, aby uniknąć pojawienia się nieopublikowanych komentarzy na stronie:

1
2
3
4
5
6
7
8
9
10
11
12
--- a/src/Repository/CommentRepository.php
+++ b/src/Repository/CommentRepository.php
@@ -27,7 +27,9 @@ class CommentRepository extends ServiceEntityRepository
     {
         $query = $this->createQueryBuilder('c')
             ->andWhere('c.conference = :conference')
+            ->andWhere('c.state = :state')
             ->setParameter('conference', $conference)
+            ->setParameter('state', 'published')
             ->orderBy('c.createdAt', 'DESC')
             ->setMaxResults(self::PAGINATOR_PER_PAGE)
             ->setFirstResult($offset)

Zaktualizuj konfigurację EasyAdmin tak, by móc zobaczyć stan komentarza:

1
2
3
4
5
6
7
8
9
10
--- a/src/Controller/Admin/CommentCrudController.php
+++ b/src/Controller/Admin/CommentCrudController.php
@@ -51,6 +51,7 @@ class CommentCrudController extends AbstractCrudController
             ->setLabel('Photo')
             ->onlyOnIndex()
         ;
+        yield TextField::new('state');

         $createdAt = DateTimeField::new('createdAt')->setFormTypeOptions([
             'html5' => true,

Nie zapomnij również zaktualizować testów poprzez ustawienie atrybutu state w danych testowych (ang. fixtures):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
--- a/src/DataFixtures/AppFixtures.php
+++ b/src/DataFixtures/AppFixtures.php
@@ -37,8 +37,16 @@ class AppFixtures extends Fixture
         $comment1->setAuthor('Fabien');
         $comment1->setEmail('fabien@example.com');
         $comment1->setText('This was a great conference.');
+        $comment1->setState('published');
         $manager->persist($comment1);

+        $comment2 = new Comment();
+        $comment2->setConference($amsterdam);
+        $comment2->setAuthor('Lucas');
+        $comment2->setEmail('lucas@example.com');
+        $comment2->setText('I think this one is going to be moderated.');
+        $manager->persist($comment2);
+
         $admin = new Admin();
         $admin->setRoles(['ROLE_ADMIN']);
         $admin->setUsername('admin');

W przypadku testów kontrolera należy przeprowadzić symulację walidacji:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
--- a/tests/Controller/ConferenceControllerTest.php
+++ b/tests/Controller/ConferenceControllerTest.php
@@ -2,6 +2,8 @@

 namespace App\Tests\Controller;

+use App\Repository\CommentRepository;
+use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;

 class ConferenceControllerTest extends WebTestCase
@@ -22,10 +24,16 @@ class ConferenceControllerTest extends WebTestCase
         $client->submitForm('Submit', [
             'comment_form[author]' => 'Fabien',
             'comment_form[text]' => 'Some feedback from an automated functional test',
-            'comment_form[email]' => 'me@automat.ed',
+            'comment_form[email]' => $email = 'me@automat.ed',
             'comment_form[photo]' => dirname(__DIR__, 2).'/public/images/under-construction.gif',
         ]);
         $this->assertResponseRedirects();
+
+        // simulate comment validation
+        $comment = self::getContainer()->get(CommentRepository::class)->findOneByEmail($email);
+        $comment->setState('published');
+        self::getContainer()->get(EntityManagerInterface::class)->flush();
+
         $client->followRedirect();
         $this->assertSelectorExists('div:contains("There are 2 comments")');
     }

Z poziomu testu PHPUnit można uzyskać dowolną usługę z kontenera za pośrednictwem self::getContainer()->get(); daje to również dostęp do usług niepublicznych.

Zrozumienie komponentu Messenger

Zarządzanie kodem asynchronicznym w Symfony jest zadaniem komponentu Messenger:

1
$ symfony composer req doctrine-messenger

Gdy jakieś działania powinny być wykonywane asynchronicznie, wyślij wiadomość (ang. message) do magistrali komunikacyjnej (ang. messenger bus). Magistrala dodaje wiadomość do kolejki (ang. queue) i natychmiast zwraca wynik, aby umożliwić wykonywanie od razu kolejnych operacji.

Konsument (ang. consumer) pracuje stale w tle, czytając nowe wiadomości w kolejce i wykonuje związane z nimi schematy działań. Konsument może działać na tym samym serwerze co aplikacja internetowa lub osobnym.

Jest to bardzo podobne do sposobu obsługi żądań HTTP, z wyjątkiem tego, że nie mamy odpowiedzi.

Kodowanie obsługi wiadomości (ang. message handler)

Wiadomość jest klasą obiektu danych, z którą nie są związane żadne reguły działania. Będzie ona zserializowana, aby być przechowywana w kolejce, więc przechowuj tylko "proste" dane, które można serializować.

Stwórz klasę CommentMessage:

src/Message/CommentMessage.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
namespace App\Message;

class CommentMessage
{
    private $id;
    private $context;

    public function __construct(int $id, array $context = [])
    {
        $this->id = $id;
        $this->context = $context;
    }

    public function getId(): int
    {
        return $this->id;
    }

    public function getContext(): array
    {
        return $this->context;
    }
}

W świecie Messengera nie mamy kontrolerów, lecz obiekty obsługi wiadomości (ang. message handlers).

Utwórz klasę CommentMessageHandler w nowej przestrzeni nazw App\MessageHandler, która wie, jak obsługiwać wiadomości CommentMessage:

src/MessageHandler/CommentMessageHandler.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
namespace App\MessageHandler;

use App\Message\CommentMessage;
use App\Repository\CommentRepository;
use App\SpamChecker;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class CommentMessageHandler implements MessageHandlerInterface
{
    private $spamChecker;
    private $entityManager;
    private $commentRepository;

    public function __construct(EntityManagerInterface $entityManager, SpamChecker $spamChecker, CommentRepository $commentRepository)
    {
        $this->entityManager = $entityManager;
        $this->spamChecker = $spamChecker;
        $this->commentRepository = $commentRepository;
    }

    public function __invoke(CommentMessage $message)
    {
        $comment = $this->commentRepository->find($message->getId());
        if (!$comment) {
            return;
        }

        if (2 === $this->spamChecker->getSpamScore($comment, $message->getContext())) {
            $comment->setState('spam');
        } else {
            $comment->setState('published');
        }

        $this->entityManager->flush();
    }
}

MessageHandlerInterface jest interfejsem znacznikowym (ang. marker interface). Pomaga on Symfony tylko w automatycznej rejestracji i konfiguracji klasy do obsługi wiadomości. Zgodnie z konwencją, reguły obsługi znajdują się w metodzie __invoke(). Podpowiedź typu CommentMessage do jednego z argumentów tej metody mówi Messengerowi, którą klasę będzie obsługiwać.

Zaktualizuj kontroler, aby korzystać z nowego systemu:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
--- a/src/Controller/ConferenceController.php
+++ b/src/Controller/ConferenceController.php
@@ -5,14 +5,15 @@ namespace App\Controller;
 use App\Entity\Comment;
 use App\Entity\Conference;
 use App\Form\CommentFormType;
+use App\Message\CommentMessage;
 use App\Repository\CommentRepository;
 use App\Repository\ConferenceRepository;
-use App\SpamChecker;
 use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
 use Symfony\Component\HttpFoundation\File\Exception\FileException;
 use Symfony\Component\HttpFoundation\Request;
 use Symfony\Component\HttpFoundation\Response;
+use Symfony\Component\Messenger\MessageBusInterface;
 use Symfony\Component\Routing\Annotation\Route;
 use Twig\Environment;

@@ -20,11 +21,13 @@ class ConferenceController extends AbstractController
 {
     private $twig;
     private $entityManager;
+    private $bus;

-    public function __construct(Environment $twig, EntityManagerInterface $entityManager)
+    public function __construct(Environment $twig, EntityManagerInterface $entityManager, MessageBusInterface $bus)
     {
         $this->twig = $twig;
         $this->entityManager = $entityManager;
+        $this->bus = $bus;
     }

     #[Route('/', name: 'homepage')]
@@ -36,7 +39,7 @@ class ConferenceController extends AbstractController
     }

     #[Route('/conference/{slug}', name: 'conference')]
-    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, SpamChecker $spamChecker, string $photoDir): Response
+    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, string $photoDir): Response
     {
         $comment = new Comment();
         $form = $this->createForm(CommentFormType::class, $comment);
@@ -54,6 +57,7 @@ class ConferenceController extends AbstractController
             }

             $this->entityManager->persist($comment);
+            $this->entityManager->flush();

             $context = [
                 'user_ip' => $request->getClientIp(),
@@ -61,11 +65,8 @@ class ConferenceController extends AbstractController
                 'referrer' => $request->headers->get('referer'),
                 'permalink' => $request->getUri(),
             ];
-            if (2 === $spamChecker->getSpamScore($comment, $context)) {
-                throw new \RuntimeException('Blatant spam, go away!');
-            }

-            $this->entityManager->flush();
+            $this->bus->dispatch(new CommentMessage($comment->getId(), $context));

             return $this->redirectToRoute('conference', ['slug' => $conference->getSlug()]);
         }

Zamiast polegać na kontrolerze spamu, wysyłamy teraz wiadomość do magistrali. Następnie obiekt obsługi decyduje, co z nią zrobić.

Osiągnęliśmy coś nieoczekiwanego. Odłączyliśmy nasz kontroler od kontrolera spamu i przenieśliśmy schemat działania do nowej klasy – obsługi (ang. handler). Jest to idealne zastosowanie dla magistrali. Przetestuj kod: działa. Wszystko jest nadal wykonywane synchronicznie, ale kod jest już prawdopodobnie "lepszy".

Idziemy w prawdziwą asynchroniczność

Domyślnie, obsługa wywoływana jest synchronicznie. Aby przejść do trybu asynchronicznego, należy skonfigurować, której kolejki użyć dla każdego obiektu obsługującego wiadomości (ang. handler) w pliku konfiguracyjnym config/packages/messenger.yaml:

1
2
3
4
5
6
7
8
--- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -21,4 +21,4 @@ framework:
             Symfony\Component\Notifier\Message\SmsMessage: async

             # Route your messages to the transports
-            # 'App\Message\YourMessage': async
+            App\Message\CommentMessage: async

Konfiguracja każe magistrali wysyłać instancje App\Message\CommentMessage do kolejki async, która jest zdefiniowana przez DSN (MESSENGER_TRANSPORT_DSN), przechowywany w pliku zmiennych środowiskowych .env. Krótko mówiąc, używamy PostgreSQL jako kolejki dla naszych wiadomości.

Skonfiguruj tabele i wyzwalacze w PostgreSQL:

1
$ symfony console make:migration

I zmigruj bazę danych:

1
$ symfony console doctrine:migrations:migrate

Tip

Za kulisami Symfony używa wbudowanego PostgreSQL, wydajnego, skalowalnego i transakcyjnego systemu pub/sub (LISTEN/NOTIFY). Możesz także przeczytać rozdział o RabbitMQ, jeśli chcesz go używać zamiast PostgreSQL jako brokera wiadomości.

Przetwarzanie wiadomości

Jeśli spróbujesz przesłać nowy komentarz, kontroler spamu nie będzie już wywoływany. Dodaj wywołanie error_log() w metodzie getSpamScore(), aby to potwierdzić. Zamiast tego, wiadomość czeka w kolejce, gotowa do skonsumowania przez jakiś proces.

Jak możesz się domyślić, Symfony posiada polecenie przetwarzania wiadomości. Uruchom je teraz:

1
$ symfony console messenger:consume async -vv

Powinno ono natychmiast przetworzyć wiadomość wysłaną w związku z przesłanymi komentarzami:

1
2
3
4
5
6
7
8
9
10
11
[OK] Consuming messages from transports "async".

 // The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.

 // Quit the worker with CONTROL-C.

11:30:20 INFO      [messenger] Received message App\Message\CommentMessage ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]
11:30:20 INFO      [http_client] Request: "POST https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [http_client] Response: "200 https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [messenger] Message App\Message\CommentMessage handled by App\MessageHandler\CommentMessageHandler::__invoke ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage","handler" => "App\MessageHandler\CommentMessageHandler::__invoke"]
11:30:20 INFO      [messenger] App\Message\CommentMessage was handled successfully (acknowledging to transport). ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]

Aktywność konsumentów jest zapisywana w logach, ale możesz otrzymać natychmiastową informację zwrotną na konsoli, przekazując flagę-vv. Możesz dostrzec nawet połączenie do API Akismet.

Aby zatrzymać konsumenta, naciśnij Ctrl+C.

Robotnicy (ang. workers) działający w tle

Zamiast uruchamiać konsumenta za każdym razem, gdy zamieszczamy komentarz, i zatrzymywać go natychmiast po tym, chcemy uruchomić go w sposób ciągły, bez otwierania zbyt wielu okien lub zakładek terminala.

Symfony CLI może zarządzać takimi poleceniami w tle (robotnikami) używając flagi demona (-d) na poleceniu run.

Uruchom ponownie konsumenta wiadomości, ale umieść go w tle:

1
$ symfony run -d --watch=config,src,templates,vendor symfony console messenger:consume async -vv

Opcja --watch mówi Symfony, że polecenie musi zostać zrestartowane za każdym razem, gdy dochodzi do zmiany plików w katalogach config/, src/, templates/, lub vendor/.

Note

Nie używaj -vv, ponieważ otrzymasz zduplikowane wiadomości w server:log (wiadomości zarówno w logach i w konsoli).

Jeśli konsument przestanie pracować z jakiegoś powodu (limit pamięci, błąd, itp.), to zostanie automatycznie uruchomiony ponownie. Ale jeśli konsument przestanie pracować natychmiast po uruchomieniu – Symfony CLI podda się.

Logi są przesyłane strumieniowo przez symfony server:log wraz z wszystkimi innymi logami pochodzącymi z PHP, serwera WWW i aplikacji:

1
$ symfony server:log

Użyj polecenia server:status, aby wyświetlić listę wszystkich robotników pracujących w tle w bieżącym projekcie:

1
2
3
4
$ symfony server:status

Web server listening on https://127.0.0.1:8000
  Command symfony console messenger:consume async running with PID 15774 (watching config/, src/, templates/)

Aby zatrzymać robotnika, zatrzymaj serwer WWW lub zabij PID podany przez polecenie server:status:

1
$ kill 15774

Ponawianie dostarczenia niedostarczonych wiadomości

A co, jeśli API Akismet nie działa podczas przetwarzania wiadomości? Twórca komentarza nie ma o tym pojęcia, ale wiadomość zostaje utracona i komentarz, którego dotyczyła, nie zostaje sprawdzony pod kątem bycia spamem.

Messenger posiada mechanizm ponawiania przetwarzania wiadomości w przypadku wystąpienia wyjątku podczas jej obsługi:

config/packages/messenger.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
framework:
    messenger:
        failure_transport: failed

        transports:
            # https://symfony.com/doc/current/messenger.html#transport-configuration
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    use_notify: true
                    check_delayed_interval: 60000
                retry_strategy:
                    max_retries: 3
                    multiplier: 2
            failed: 'doctrine://default?queue_name=failed'
            # sync: 'sync://'

Jeśli pojawi się problem podczas obsługi wiadomości, konsument spróbuje ponownie trzy razy przed rezygnacją. A potem zamiast odrzucić wiadomość, umieści ją w trwałej kolejce nieudanych przetworzeń (ang. failed), która wykorzystuje inną tabelę w bazie danych.

Przejrzyj nieudane wiadomości, a następnie ponów przetwarzanie za pomocą następujących poleceń:

1
2
3
$ symfony console messenger:failed:show

$ symfony console messenger:failed:retry

Uruchamianie robotników na Platform.sh

Aby przetwarzać wiadomości od PostgreSQL, musimy bezustannie uruchamiać polecenie messenger:consume. Na Platform.sh jest to rola robotnika (ang. worker):

.platform.app.yaml
1
2
3
4
5
workers:
    messenger:
        commands:
            # Consume "async" messages (as configured in the routing section of config/packages/messenger.yaml)
            start: symfony console --time-limit=3600 --memory-limit=64M messenger:consume async

Podobnie jak Symfony CLI, Platform.sh zarządza restartami i logami.

Aby odczytać logi robotnika, użyj:

1
$ symfony cloud:logs --worker=messages all
This work, including the code samples, is licensed under a Creative Commons BY-NC-SA 4.0 license.
TOC
    Version