Skip to content

Asynchrone Verarbeitung

Die Überprüfung auf Spam während der Bearbeitung des übermittelten Formulars kann zu Problemen führen. Wenn die Akismet-API langsam ist, wird unsere Website auch für Benutzer*innen langsam. Aber noch schlimmer: Wir könnten Kommentare verlieren, falls wir in einen Timeout laufen oder die Akismet-API nicht verfügbar ist.

Im Idealfall sollten wir die übermittelten Daten speichern, ohne sie zu veröffentlichen, und sofort eine Response zurückliefern. Die Überprüfung auf Spam kann dann unabhängig davon durchgeführt werden.

Kommentare kennzeichnen

Wir müssen ein state-Feld für Kommentare einführen: submitted, spam und published.

Füge die state-Property zur Comment-Klasse hinzu:

1
$ symfony console make:entity Comment

Wir sollten auch sicherstellen, dass der state-Wert standardmäßig auf submitted gesetzt ist:

1
2
3
4
5
6
7
8
9
10
11
12
13
--- a/src/Entity/Comment.php
+++ b/src/Entity/Comment.php
@@ -38,8 +38,8 @@ class Comment
     #[ORM\Column(type: 'string', length: 255, nullable: true)]
     private $photoFilename;

-    #[ORM\Column(type: 'string', length: 255)]
-    private $state;
+    #[ORM\Column(type: 'string', length: 255, options: ["default" => "submitted"])]
+    private $state = 'submitted';

     public function __toString(): string
     {

Erstelle eine Datenbankmigration:

1
$ symfony console make:migration

Passe die Migration an, um alle vorhandenen Kommentare standardmäßig auf published zu setzen:

1
2
3
4
5
6
7
8
9
10
--- a/migrations/Version00000000000000.php
+++ b/migrations/Version00000000000000.php
@@ -21,6 +21,7 @@ final class Version00000000000000 extends AbstractMigration
     {
         // this up() migration is auto-generated, please modify it to your needs
         $this->addSql('ALTER TABLE comment ADD state VARCHAR(255) DEFAULT \'submitted\' NOT NULL');
+        $this->addSql("UPDATE comment SET state='published'");
     }

     public function down(Schema $schema): void

Führe die Datenbankmigration durch:

1
$ symfony console doctrine:migrations:migrate

Aktualisiere die Anzeigelogik, um zu vermeiden, dass unveröffentlichte Kommentare im Frontend erscheinen:

1
2
3
4
5
6
7
8
9
10
11
12
--- a/src/Repository/CommentRepository.php
+++ b/src/Repository/CommentRepository.php
@@ -27,7 +27,9 @@ class CommentRepository extends ServiceEntityRepository
     {
         $query = $this->createQueryBuilder('c')
             ->andWhere('c.conference = :conference')
+            ->andWhere('c.state = :state')
             ->setParameter('conference', $conference)
+            ->setParameter('state', 'published')
             ->orderBy('c.createdAt', 'DESC')
             ->setMaxResults(self::PAGINATOR_PER_PAGE)
             ->setFirstResult($offset)

Aktualisiere die EasyAdmin-Konfiguration, um den Zustand des Kommentars zu sehen:

1
2
3
4
5
6
7
8
9
10
--- a/src/Controller/Admin/CommentCrudController.php
+++ b/src/Controller/Admin/CommentCrudController.php
@@ -51,6 +51,7 @@ class CommentCrudController extends AbstractCrudController
             ->setLabel('Photo')
             ->onlyOnIndex()
         ;
+        yield TextField::new('state');

         $createdAt = DateTimeField::new('createdAt')->setFormTypeOptions([
             'html5' => true,

Denk daran, auch die Tests zu aktualisieren, indem Du state zu den Fixtures hinzufügst:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
--- a/src/DataFixtures/AppFixtures.php
+++ b/src/DataFixtures/AppFixtures.php
@@ -37,8 +37,16 @@ class AppFixtures extends Fixture
         $comment1->setAuthor('Fabien');
         $comment1->setEmail('fabien@example.com');
         $comment1->setText('This was a great conference.');
+        $comment1->setState('published');
         $manager->persist($comment1);

+        $comment2 = new Comment();
+        $comment2->setConference($amsterdam);
+        $comment2->setAuthor('Lucas');
+        $comment2->setEmail('lucas@example.com');
+        $comment2->setText('I think this one is going to be moderated.');
+        $manager->persist($comment2);
+
         $admin = new Admin();
         $admin->setRoles(['ROLE_ADMIN']);
         $admin->setUsername('admin');

Simuliere die Validierung für die Controller-Tests:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
--- a/tests/Controller/ConferenceControllerTest.php
+++ b/tests/Controller/ConferenceControllerTest.php
@@ -2,6 +2,8 @@

 namespace App\Tests\Controller;

+use App\Repository\CommentRepository;
+use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;

 class ConferenceControllerTest extends WebTestCase
@@ -22,10 +24,16 @@ class ConferenceControllerTest extends WebTestCase
         $client->submitForm('Submit', [
             'comment_form[author]' => 'Fabien',
             'comment_form[text]' => 'Some feedback from an automated functional test',
-            'comment_form[email]' => 'me@automat.ed',
+            'comment_form[email]' => $email = 'me@automat.ed',
             'comment_form[photo]' => dirname(__DIR__, 2).'/public/images/under-construction.gif',
         ]);
         $this->assertResponseRedirects();
+
+        // simulate comment validation
+        $comment = self::getContainer()->get(CommentRepository::class)->findOneByEmail($email);
+        $comment->setState('published');
+        self::getContainer()->get(EntityManagerInterface::class)->flush();
+
         $client->followRedirect();
         $this->assertSelectorExists('div:contains("There are 2 comments")');
     }

Du kannst von einem PHPUnit-Test aus jeden beliebigen Service aus den Container über self::getContainer()->get() holen; dies ermöglicht auch den Zugriff auf Services, die nicht public sind.

Messenger verstehen

Die asynchrone Verarbeitung mit Symfony ist Aufgabe der Messenger-Komponente:

1
$ symfony composer req doctrine-messenger

Wenn Logik asynchron ausgeführt werden soll, sende eine Message (Nachricht) an einen Messenger-Bus. Der Bus speichert die Message in einer Queue (Warteschlange) und kehrt sofort zurück, um den Betriebsablauf so schnell wie möglich wieder aufzunehmen.

Ein Consumer läuft kontinuierlich im Hintergrund, um neue Messages auf der Queue zu lesen und die zugehörige Logik auszuführen. Der Consumer kann auf dem gleichen Server wie die Webanwendung oder auf einem separaten Server laufen.

Das Ganze ist der Art und Weise, wie HTTP-Requests behandelt werden sehr ähnlich, nur dass wir keine Responses zurückliefern.

Einen Message Handler erstellen

Eine Message ist eine Datenobjektklasse, die keine Logik enthalten sollte. Sie wird serialisiert, um in einer Queue gespeichert zu werden, also speichere darin nur "einfache" serialisierbare Daten.

Lege die CommentMessage-Klasse an:

src/Message/CommentMessage.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
namespace App\Message;

class CommentMessage
{
    private $id;
    private $context;

    public function __construct(int $id, array $context = [])
    {
        $this->id = $id;
        $this->context = $context;
    }

    public function getId(): int
    {
        return $this->id;
    }

    public function getContext(): array
    {
        return $this->context;
    }
}

In der Messenger-Welt haben wir keine Controller, sondern Message-Handler.

Erstelle eine CommentMessageHandler-Klasse unter einem neuen App\MessageHandler-Namespace, die weiß, wie man mit CommentMessage-Messages umgeht:

src/MessageHandler/CommentMessageHandler.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
namespace App\MessageHandler;

use App\Message\CommentMessage;
use App\Repository\CommentRepository;
use App\SpamChecker;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class CommentMessageHandler implements MessageHandlerInterface
{
    private $spamChecker;
    private $entityManager;
    private $commentRepository;

    public function __construct(EntityManagerInterface $entityManager, SpamChecker $spamChecker, CommentRepository $commentRepository)
    {
        $this->entityManager = $entityManager;
        $this->spamChecker = $spamChecker;
        $this->commentRepository = $commentRepository;
    }

    public function __invoke(CommentMessage $message)
    {
        $comment = $this->commentRepository->find($message->getId());
        if (!$comment) {
            return;
        }

        if (2 === $this->spamChecker->getSpamScore($comment, $message->getContext())) {
            $comment->setState('spam');
        } else {
            $comment->setState('published');
        }

        $this->entityManager->flush();
    }
}

Das MessageHandlerInterface dient lediglich zur Markierung einer Klasse. Es hilft Symfony nur, die Klasse automatisch zu registrieren und automatisch als Messenger-Handler zu konfigurieren. Nach Konvention lebt die Logik eines Handlers in einer Methode namens __invoke(). Der CommentMessage-Type-Hint auf das eine Argument dieser Methode sagt dem Messenger, welche Klasse diese verarbeiten soll.

Aktualisiere den Controller, damit er das neue System verwendet:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
--- a/src/Controller/ConferenceController.php
+++ b/src/Controller/ConferenceController.php
@@ -5,14 +5,15 @@ namespace App\Controller;
 use App\Entity\Comment;
 use App\Entity\Conference;
 use App\Form\CommentFormType;
+use App\Message\CommentMessage;
 use App\Repository\CommentRepository;
 use App\Repository\ConferenceRepository;
-use App\SpamChecker;
 use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
 use Symfony\Component\HttpFoundation\File\Exception\FileException;
 use Symfony\Component\HttpFoundation\Request;
 use Symfony\Component\HttpFoundation\Response;
+use Symfony\Component\Messenger\MessageBusInterface;
 use Symfony\Component\Routing\Annotation\Route;
 use Twig\Environment;

@@ -20,11 +21,13 @@ class ConferenceController extends AbstractController
 {
     private $twig;
     private $entityManager;
+    private $bus;

-    public function __construct(Environment $twig, EntityManagerInterface $entityManager)
+    public function __construct(Environment $twig, EntityManagerInterface $entityManager, MessageBusInterface $bus)
     {
         $this->twig = $twig;
         $this->entityManager = $entityManager;
+        $this->bus = $bus;
     }

     #[Route('/', name: 'homepage')]
@@ -36,7 +39,7 @@ class ConferenceController extends AbstractController
     }

     #[Route('/conference/{slug}', name: 'conference')]
-    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, SpamChecker $spamChecker, string $photoDir): Response
+    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, string $photoDir): Response
     {
         $comment = new Comment();
         $form = $this->createForm(CommentFormType::class, $comment);
@@ -54,6 +57,7 @@ class ConferenceController extends AbstractController
             }

             $this->entityManager->persist($comment);
+            $this->entityManager->flush();

             $context = [
                 'user_ip' => $request->getClientIp(),
@@ -61,11 +65,8 @@ class ConferenceController extends AbstractController
                 'referrer' => $request->headers->get('referer'),
                 'permalink' => $request->getUri(),
             ];
-            if (2 === $spamChecker->getSpamScore($comment, $context)) {
-                throw new \RuntimeException('Blatant spam, go away!');
-            }

-            $this->entityManager->flush();
+            $this->bus->dispatch(new CommentMessage($comment->getId(), $context));

             return $this->redirectToRoute('conference', ['slug' => $conference->getSlug()]);
         }

Anstatt vom Spam Checker abhängig zu sein, senden wir nun eine Message zum Bus. Der Handler entscheidet dann, was er damit macht.

Wir haben etwas Unerwartetes erreicht. Wir haben unseren Controller vom Spam Checker entkoppelt und die Logik in eine neue Klasse, den Handler, verschoben. Es ist ein perfekter Anwendungsfall für den Bus. Teste den Code, er funktioniert. Alles wird noch synchron gemacht, aber der Code ist wahrscheinlich schon "besser".

Echt Asynchron

Standardmäßig werden Handler synchron aufgerufen. Um asynchron zu werden, musst Du in der config/packages/messenger.yaml-Konfigurationsdatei für jeden Handler explizit konfigurieren, welche Queue verwendet werden soll:

1
2
3
4
5
6
7
8
--- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -21,4 +21,4 @@ framework:
             Symfony\Component\Notifier\Message\SmsMessage: async

             # Route your messages to the transports
-            # 'App\Message\YourMessage': async
+            App\Message\CommentMessage: async

Die Konfiguration weist den Bus an, Instanzen von App\Message\CommentMessage an die async-Queue zu senden, die durch einen DSN (MESSENGER_TRANSPORT_DSN) definiert ist, welche wiederum zu Doctrine verweist, wie in .env definiert. Auf gut Deutsch: wir nutzen PostgreSQL als Warteschlange (queues) für unsere Messages (Nachrichten).

Tip

Hinter den Kulissen nutzt Symfony das in PostgreSQL integrierte, performante, skalierbare und transaktionale pub/sub system (LISTEN/NOTIFY). Du kannst auch das RabbitMQ-Kapitel lesen, wenn Du das lieber als Message-Händler benutzt als PostgreSQL.

Messages verarbeiten

Wenn Du versuchst, einen neuen Kommentar abzugeben, wird der Spam-Checker nicht mehr aufgerufen. Füge einen error_log()-Aufruf in der getSpamScore()-Methode hinzu, um Dich zu vergewissern. Stattdessen wartet in der Warteschlange (queue) eine Message (Nachricht), die von bestimmten Prozessen verarbeitet werden kann.

Selbstverständlich wird Symfony mit einem Verarbeitungsbefehl (Consumer Command) geliefert. Führe diesen jetzt aus:

1
$ symfony console messenger:consume async -vv

Er sollte die für den eingereichten Kommentar versendete Message sofort verarbeiten:

1
2
3
4
5
6
7
8
9
10
11
[OK] Consuming messages from transports "async".

 // The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.

 // Quit the worker with CONTROL-C.

11:30:20 INFO      [messenger] Received message App\Message\CommentMessage ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]
11:30:20 INFO      [http_client] Request: "POST https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [http_client] Response: "200 https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [messenger] Message App\Message\CommentMessage handled by App\MessageHandler\CommentMessageHandler::__invoke ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage","handler" => "App\MessageHandler\CommentMessageHandler::__invoke"]
11:30:20 INFO      [messenger] App\Message\CommentMessage was handled successfully (acknowledging to transport). ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]

Die Aktivität des Message Consumers wird geloggt, aber Du erhältst sofortiges Feedback auf der Konsole, indem Du das -vv Flag übergibst. Du solltest sogar den Aufruf der Akismet-API sehen können.

Drücke Ctrl+C, um den Consumer zu stoppen.

Worker im Hintergrund ausführen

Anstatt den Consumer jedes Mal zu starten, wenn wir einen Kommentar posten und ihn sofort danach stoppen, wollen wir ihn kontinuierlich ausführen, ohne zu viele Terminalfenster oder -tabs geöffnet zu haben.

Die Symfony CLI kann solche Hintergrundbefehle oder Worker verwalten, indem Du das Daemon-Flag (-d) zusätzlich zum run-Befehl verwendest.

Führe den Message Consumer erneut aus, aber schiebe ihn in den Hintergrund:

1
$ symfony run -d --watch=config,src,templates,vendor symfony console messenger:consume async -vv

Die --watch-Option teilt Symfony mit, dass der Befehl neu gestartet werden muss, wenn Dateien in den Verzeichnissen config/, src/, templates/ oder vendor/ verändert werden.

Note

Verwende nicht -vv, da Du sonst in server:log doppelte Meldungen erhalten würdest (Log- und Konsolenmeldungen).

Wenn der Consumer aus irgendeinem Grund nicht mehr funktioniert (Speicherlimit, Fehler, ...), wird er automatisch neugestartet. Und wenn der Consumer zu schnell versagt, gibt die Symfony CLI auf.

Logs werden von symfony server:log mit allen anderen Logs, die von PHP, dem Webserver und der Anwendung stammen, gesammelt:

1
$ symfony server:log

Verwende den server:status-Befehl, um alle für das aktuelle Projekt verwalteten Worker aus dem Hintergrund aufzulisten:

1
2
3
4
$ symfony server:status

Web server listening on https://127.0.0.1:8000
  Command symfony console messenger:consume async running with PID 15774 (watching config/, src/, templates/)

Um einen Worker zu stoppen, stoppe den Webserver oder beende die PID, die durch den server:status-Befehl gegeben wird:

1
$ kill 15774

Fehlgeschlagene Messages erneut verarbeiten

Was passiert, wenn Akismet während des Verarbeitens einer Message ausgefallen ist? Es gibt keine Auswirkungen für Personen, die Kommentare abgeben, aber die Nachricht geht verloren und Spam wird nicht überprüft.

Der Messenger hat einen Wiederholungsmechanismus, wenn beim Verarbeiten einer Message ein Fehler auftritt:

config/packages/messenger.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
framework:
    messenger:
        failure_transport: failed

        transports:
            # https://symfony.com/doc/current/messenger.html#transport-configuration
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    use_notify: true
                    check_delayed_interval: 60000
                retry_strategy:
                    max_retries: 3
                    multiplier: 2
            failed: 'doctrine://default?queue_name=failed'
            # sync: 'sync://'

Wenn beim Verarbeiten einer Message ein Problem auftritt, wird der Consumer es dreimal erneut probieren, bevor er aufgibt. Aber anstatt die Message zu verwerfen, wird sie in einem dauerhafteren Speicher, der failed-Queue, gespeichert, die eine andere Datenbank-Tabelle verwendet.

Überprüfe fehlgeschlagene Messages und verarbeite sie mit den folgenden Befehlen erneut:

1
2
3
$ symfony console messenger:failed:show

$ symfony console messenger:failed:retry

Worker bei Platform.sh ausführen

Um Messages von PostgreSQL zu bearbeiten, müssen wir den messenger:consume-Befehl kontinuierlich ausführen. Bei Platform.sh ist dies die Rolle eines Workers:

.platform.app.yaml
1
2
3
4
5
workers:
    messenger:
        commands:
            # Consume "async" messages (as configured in the routing section of config/packages/messenger.yaml)
            start: symfony console --time-limit=3600 --memory-limit=64M messenger:consume async

Wie Symfony CLI verwaltet Platform.sh Neustarts und Logs.

Um Logs für einen Worker zu erhalten, verwende:

1
$ symfony cloud:logs --worker=messages all
This work, including the code samples, is licensed under a Creative Commons BY-NC-SA 4.0 license.
TOC
    Version