Skip to content

Переход к асинхронности

Во время проверки формы на наличие спама могут возникнуть проблемы. В случае, если ответ от Akismet API будет долгим, наша страница с формой также станет работать медленнее. Худший сценарий — мы можем совсем потерять комментарии из-за достижения тайм-аута или недоступности Akismet API.

В идеале нам нужно немедленно возвращать ответ на отправленную форму, а приложение должно сохранять данные без попытки их сразу же опубликовать. Проверить на спам можно и позже.

Установка статусов комментариев

Реализуем статусы (state) для комментариев: submitted (отправлен), spam (спам) и published (опубликован).

Добавим свойство state в класс Comment:

1
$ symfony console make:entity Comment

Убедимся, что для свойства state по умолчанию задан статус submitted.

1
2
3
4
5
6
7
8
9
10
11
12
13
--- a/src/Entity/Comment.php
+++ b/src/Entity/Comment.php
@@ -38,8 +38,8 @@ class Comment
     #[ORM\Column(type: 'string', length: 255, nullable: true)]
     private $photoFilename;

-    #[ORM\Column(type: 'string', length: 255)]
-    private $state;
+    #[ORM\Column(type: 'string', length: 255, options: ["default" => "submitted"])]
+    private $state = 'submitted';

     public function __toString(): string
     {

Создайте миграцию базы данных:

1
$ symfony console make:migration

Измените миграцию, чтобы все существующие комментарии были по умолчанию со статусом published:

1
2
3
4
5
6
7
8
9
10
--- a/migrations/Version00000000000000.php
+++ b/migrations/Version00000000000000.php
@@ -21,6 +21,7 @@ final class Version00000000000000 extends AbstractMigration
     {
         // this up() migration is auto-generated, please modify it to your needs
         $this->addSql('ALTER TABLE comment ADD state VARCHAR(255) DEFAULT \'submitted\' NOT NULL');
+        $this->addSql("UPDATE comment SET state='published'");
     }

     public function down(Schema $schema): void

Выполните миграцию базы данных:

1
$ symfony console doctrine:migrations:migrate

Обновите логику отображения, чтобы на странице показывались только опубликованные комментарии:

1
2
3
4
5
6
7
8
9
10
11
12
--- a/src/Repository/CommentRepository.php
+++ b/src/Repository/CommentRepository.php
@@ -27,7 +27,9 @@ class CommentRepository extends ServiceEntityRepository
     {
         $query = $this->createQueryBuilder('c')
             ->andWhere('c.conference = :conference')
+            ->andWhere('c.state = :state')
             ->setParameter('conference', $conference)
+            ->setParameter('state', 'published')
             ->orderBy('c.createdAt', 'DESC')
             ->setMaxResults(self::PAGINATOR_PER_PAGE)
             ->setFirstResult($offset)

Обновите конфигурацию EasyAdmin, чтобы отображать статус комментария в административной панели:

1
2
3
4
5
6
7
8
9
10
--- a/src/Controller/Admin/CommentCrudController.php
+++ b/src/Controller/Admin/CommentCrudController.php
@@ -51,6 +51,7 @@ class CommentCrudController extends AbstractCrudController
             ->setLabel('Photo')
             ->onlyOnIndex()
         ;
+        yield TextField::new('state');

         $createdAt = DateTimeField::new('createdAt')->setFormTypeOptions([
             'html5' => true,

Не забудьте также обновить тесты, установив новое свойство state в фикстурах:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
--- a/src/DataFixtures/AppFixtures.php
+++ b/src/DataFixtures/AppFixtures.php
@@ -37,8 +37,16 @@ class AppFixtures extends Fixture
         $comment1->setAuthor('Fabien');
         $comment1->setEmail('fabien@example.com');
         $comment1->setText('This was a great conference.');
+        $comment1->setState('published');
         $manager->persist($comment1);

+        $comment2 = new Comment();
+        $comment2->setConference($amsterdam);
+        $comment2->setAuthor('Lucas');
+        $comment2->setEmail('lucas@example.com');
+        $comment2->setText('I think this one is going to be moderated.');
+        $manager->persist($comment2);
+
         $admin = new Admin();
         $admin->setRoles(['ROLE_ADMIN']);
         $admin->setUsername('admin');

Для тестирования контроллера смоделируйте проверку комментария:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
--- a/tests/Controller/ConferenceControllerTest.php
+++ b/tests/Controller/ConferenceControllerTest.php
@@ -2,6 +2,8 @@

 namespace App\Tests\Controller;

+use App\Repository\CommentRepository;
+use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;

 class ConferenceControllerTest extends WebTestCase
@@ -22,10 +24,16 @@ class ConferenceControllerTest extends WebTestCase
         $client->submitForm('Submit', [
             'comment_form[author]' => 'Fabien',
             'comment_form[text]' => 'Some feedback from an automated functional test',
-            'comment_form[email]' => 'me@automat.ed',
+            'comment_form[email]' => $email = 'me@automat.ed',
             'comment_form[photo]' => dirname(__DIR__, 2).'/public/images/under-construction.gif',
         ]);
         $this->assertResponseRedirects();
+
+        // simulate comment validation
+        $comment = self::getContainer()->get(CommentRepository::class)->findOneByEmail($email);
+        $comment->setState('published');
+        self::getContainer()->get(EntityManagerInterface::class)->flush();
+
         $client->followRedirect();
         $this->assertSelectorExists('div:contains("There are 2 comments")');
     }

В тесте PHPUnit вы можете получить любой сервис из контейнера с помощью метода self::getContainer()->get(), который также даёт доступ к непубличным сервисам.

Внедрение компонента Messenger

Компонент Messenger управляет асинхронным кодом в Symfony:

1
$ symfony composer req doctrine-messenger

Для выполнения асинхронных задач, отправьте сообщение на шину обмена сообщениями. Шина сохраняет сообщение в очередь и немедленно возвращает ответ, чтобы исключить любые задержки в ваших приложениях.

Получатель работает постоянно в фоновом режиме, читая новые сообщения из очереди и выполняя соответствующие задачи. Получатель может работать как на том же сервере, где находится само приложение, так и на отдельном сервере.

Это напоминает обработку HTTP-запросов, за исключением того, что здесь нет ответов.

Создание обработчика сообщений

Сообщение — это класс с данными, в котором не должно быть никакой логики. Он будет сериализован для хранения в очереди, поэтому держите в нем только "простые" сериализуемые данные.

Создайте класс CommentMessage:

src/Message/CommentMessage.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
namespace App\Message;

class CommentMessage
{
    private $id;
    private $context;

    public function __construct(int $id, array $context = [])
    {
        $this->id = $id;
        $this->context = $context;
    }

    public function getId(): int
    {
        return $this->id;
    }

    public function getContext(): array
    {
        return $this->context;
    }
}

В мире Messenger нет контроллеров, есть только обработчики сообщений.

Создайте класс CommentMessageHandler в новом пространстве имён App\MessageHandler, который знает, как обрабатывать сообщения CommentMessage:

src/MessageHandler/CommentMessageHandler.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
namespace App\MessageHandler;

use App\Message\CommentMessage;
use App\Repository\CommentRepository;
use App\SpamChecker;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class CommentMessageHandler implements MessageHandlerInterface
{
    private $spamChecker;
    private $entityManager;
    private $commentRepository;

    public function __construct(EntityManagerInterface $entityManager, SpamChecker $spamChecker, CommentRepository $commentRepository)
    {
        $this->entityManager = $entityManager;
        $this->spamChecker = $spamChecker;
        $this->commentRepository = $commentRepository;
    }

    public function __invoke(CommentMessage $message)
    {
        $comment = $this->commentRepository->find($message->getId());
        if (!$comment) {
            return;
        }

        if (2 === $this->spamChecker->getSpamScore($comment, $message->getContext())) {
            $comment->setState('spam');
        } else {
            $comment->setState('published');
        }

        $this->entityManager->flush();
    }
}

MessageHandlerInterface — это маркирующий интерфейс. Его наличие помогает Symfony понять, что класс нужно автоматически зарегистрировать и настроить в качестве обработчика Messenger. По соглашению логика обработчика находится в методе __invoke(). Подсказка об используемом типе CommentMessage для единственного аргумента этого метода сообщает Messenger, какой класс он будет обрабатывать.

Обновите контроллер, чтобы использовать новую систему:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
--- a/src/Controller/ConferenceController.php
+++ b/src/Controller/ConferenceController.php
@@ -5,14 +5,15 @@ namespace App\Controller;
 use App\Entity\Comment;
 use App\Entity\Conference;
 use App\Form\CommentFormType;
+use App\Message\CommentMessage;
 use App\Repository\CommentRepository;
 use App\Repository\ConferenceRepository;
-use App\SpamChecker;
 use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
 use Symfony\Component\HttpFoundation\File\Exception\FileException;
 use Symfony\Component\HttpFoundation\Request;
 use Symfony\Component\HttpFoundation\Response;
+use Symfony\Component\Messenger\MessageBusInterface;
 use Symfony\Component\Routing\Annotation\Route;
 use Twig\Environment;

@@ -20,11 +21,13 @@ class ConferenceController extends AbstractController
 {
     private $twig;
     private $entityManager;
+    private $bus;

-    public function __construct(Environment $twig, EntityManagerInterface $entityManager)
+    public function __construct(Environment $twig, EntityManagerInterface $entityManager, MessageBusInterface $bus)
     {
         $this->twig = $twig;
         $this->entityManager = $entityManager;
+        $this->bus = $bus;
     }

     #[Route('/', name: 'homepage')]
@@ -36,7 +39,7 @@ class ConferenceController extends AbstractController
     }

     #[Route('/conference/{slug}', name: 'conference')]
-    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, SpamChecker $spamChecker, string $photoDir): Response
+    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, string $photoDir): Response
     {
         $comment = new Comment();
         $form = $this->createForm(CommentFormType::class, $comment);
@@ -54,6 +57,7 @@ class ConferenceController extends AbstractController
             }

             $this->entityManager->persist($comment);
+            $this->entityManager->flush();

             $context = [
                 'user_ip' => $request->getClientIp(),
@@ -61,11 +65,8 @@ class ConferenceController extends AbstractController
                 'referrer' => $request->headers->get('referer'),
                 'permalink' => $request->getUri(),
             ];
-            if (2 === $spamChecker->getSpamScore($comment, $context)) {
-                throw new \RuntimeException('Blatant spam, go away!');
-            }

-            $this->entityManager->flush();
+            $this->bus->dispatch(new CommentMessage($comment->getId(), $context));

             return $this->redirectToRoute('conference', ['slug' => $conference->getSlug()]);
         }

Теперь вместо того, чтобы зависеть от скорости работы антиспам-сервиса, мы отправляем сообщение на шину. Затем обработчик решает, что делать с этим сообщением.

Произошло что-то неожиданное: мы отделили наш контроллер от антиспам-сервиса и перенесли логику в новый класс — обработчик. Это идеальный вариант для использования шины. Проверьте код — всё работает. Операции всё ещё выполняются синхронно, но код, вероятно, уже стал "лучше".

Переходим к асинхронности по-настоящему

По умолчанию обработчики вызываются синхронно. Для асинхронного вызова необходимо указать, какую очередь использовать для каждого обработчика в конфигурационном файле config/packages/messenger.yaml:

1
2
3
4
5
6
7
8
--- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -21,4 +21,4 @@ framework:
             Symfony\Component\Notifier\Message\SmsMessage: async

             # Route your messages to the transports
-            # 'App\Message\YourMessage': async
+            App\Message\CommentMessage: async

Шина настроена на отсылку сообщений типа App\Message\CommentMessage в очередь async, которая управляется Doctrine (согласно DSN-значению в переменной окружения MESSENGER_TRANSPORT_DSN из файла .env). Проще говоря, мы используем PostgreSQL в качестве очереди для наших сообщений.

Настроим таблицы и триггеры PostgreSQL:

1
$ symfony console make:migration

И запустим миграцию базы данных:

1
$ symfony console doctrine:migrations:migrate

Tip

За кулисами Symfony использует встроенную в PostgreSQL производительную, масштабируемую и транзакционную систему подписки pub/sub (LISTEN/NOTIFY). Прочитайте главу о RabbitMQ, если хотите заменить брокер сообщений с PostgreSQL на RabbitMQ.

Получение сообщений

При отправке нового комментария, проверка на спам больше не сработает. Чтобы убедиться в этом, добавьте вызов функцииerror_log() в метод getSpamScore(). Видно, что сообщение находится в очереди и ждёт обработки.

Как вы понимаете, у Symfony есть команда для получения сообщений. Давайте выполним её сейчас:

1
$ symfony console messenger:consume async -vv

Команда немедленно получит сообщение, отправленное для комментария:

1
2
3
4
5
6
7
8
9
10
11
[OK] Consuming messages from transports "async".

 // The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.

 // Quit the worker with CONTROL-C.

11:30:20 INFO      [messenger] Received message App\Message\CommentMessage ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]
11:30:20 INFO      [http_client] Request: "POST https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [http_client] Response: "200 https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [messenger] Message App\Message\CommentMessage handled by App\MessageHandler\CommentMessageHandler::__invoke ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage","handler" => "App\MessageHandler\CommentMessageHandler::__invoke"]
11:30:20 INFO      [messenger] App\Message\CommentMessage was handled successfully (acknowledging to transport). ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]

Все действия получателя сообщений пишутся в логи, но эти же действия можно увидеть в консоли в реальном времени, если к команде добавить флаг -vv. Вы даже сможете увидеть вызов к Akismet API.

Для остановки получателя, нажмите Ctrl+C.

Запуск воркеров в фоновом режиме

Вместо того, чтобы запускать получателя каждый раз после отправки комментария и тут же его останавливать, хотелось бы, чтобы он работал непрерывно, и при этом не нужно было бы открывать слишком много окон или вкладок терминала.

Symfony CLI может управлять такими фоновыми командами или воркерами, если добавить флаг ( -d ) к команде run.

Запустите получателя сообщений ещё раз, но теперь уже в фоновом режиме:

1
$ symfony run -d --watch=config,src,templates,vendor symfony console messenger:consume async -vv

При использовании параметра --watch Symfony перезапускает команду каждый раз при изменении файловой системы в директориях config/, src/, templates/ или vendor/.

Note

Не используйте флаг -vv, иначе получите дублирующие сообщения в логах команды server:log (логированные и консольные сообщения).

Если получатель перестаёт работать по какой-либо причине (недостаточно памяти, баг и т.д.), он будет перезапущен автоматически. Но Symfony CLI не перезапустит получателя в том случае, если получатель выдает ошибку сразу после запуска.

Логи можно просматривать через команду symfony server:log вместе с остальными логами, приходящими от PHP, веб-сервера и приложения:

1
$ symfony server:log

Команда server:status выведет все фоновые воркеры для текущего проекта:

1
2
3
4
$ symfony server:status

Web server listening on https://127.0.0.1:8000
  Command symfony console messenger:consume async running with PID 15774 (watching config/, src/, templates/)

Чтобы остановить воркер, нужно либо остановить веб-сервер, либо принудительно завершить процесс по его PID после запуска команды server:status:

1
$ kill 15774

Повторная обработка сообщений в случае ошибки

Что, если Akismet будет недоступен во время получения сообщения? Это не отразится на пользователях, оправляющих комментарии, но грозит потерей сообщения и, как следствие, отсутствием проверки на спам.

У компонента Messenger есть механизм повторных попыток в случае возникновения исключений во время обработки сообщений:

config/packages/messenger.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
framework:
    messenger:
        failure_transport: failed

        transports:
            # https://symfony.com/doc/current/messenger.html#transport-configuration
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    use_notify: true
                    check_delayed_interval: 60000
                retry_strategy:
                    max_retries: 3
                    multiplier: 2
            failed: 'doctrine://default?queue_name=failed'
            # sync: 'sync://'

Если при обработке сообщения возникнет ошибка, получатель повторит попытку ещё 3 раза, а затем остановится. Но вместо удаления получатель перенесёт сообщение в постоянное хранилище — очередь failed, которая использует другую таблицу базу данных.

Просматривать сообщения, давшие сбой и повторить попытку их обработки можно с помощью следующих команд:

1
2
3
$ symfony console messenger:failed:show

$ symfony console messenger:failed:retry

Выполнение воркеров на Platform.sh

Чтобы получать сообщения от PostgreSQL, нам нужно постоянно запускать команду messenger:consume. В Platform.sh для этой задачи отведена специальная роль — воркер:

.platform.app.yaml
1
2
3
4
5
workers:
    messenger:
        commands:
            # Consume "async" messages (as configured in the routing section of config/packages/messenger.yaml)
            start: symfony console --time-limit=3600 --memory-limit=64M messenger:consume async

Подобно Symfony CLI, Platform.sh позволяет управлять перезагрузками и логами.

Чтобы посмотреть логи воркера, воспользуйтесь командой ниже:

1
$ symfony cloud:logs --worker=messages all
This work, including the code samples, is licensed under a Creative Commons BY-NC-SA 4.0 license.
TOC
    Version