ステップ 18: 非同期にする

5.2 version
Maintained Unmaintained
5.0

非同期にする

フォーム投稿時にスパムの判定をするのには多少問題があります。例えば、 Akismet API に遅延の問題があったときに、私たちの Web サイトも遅くなってしまいます。さらに、タイムアウトされてしまったり、Akismet API に問題があったときには、コメントを失ってしまうかもしれません。

公開することなく投稿されたデータを保存して、レスポンスを早く返すことが理想とするところです。そのためにスパムのチェックとは独立して実行します。

コメントにフラグを付ける

コメントに submitted, spam, published という state を追加する必要があります。

Comment クラスに state プロパティを追加しましょう:

1
$ symfony console make:entity Comment

データベースマイグレーションを追加する:

1
$ symfony console make:migration

既に登録されているコメント全てに、デフォルトの値として published を指定するようにマイグレーションを修正してください:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
--- a/migrations/Version00000000000000.php
+++ b/migrations/Version00000000000000.php
@@ -20,7 +20,9 @@ final class Version20200714155905 extends AbstractMigration
     public function up(Schema $schema) : void
     {
         // this up() migration is auto-generated, please modify it to your needs
-        $this->addSql('ALTER TABLE comment ADD state VARCHAR(255) NOT NULL');
+        $this->addSql('ALTER TABLE comment ADD state VARCHAR(255)');
+        $this->addSql("UPDATE comment SET state='published'");
+        $this->addSql('ALTER TABLE comment ALTER COLUMN state SET NOT NULL');
     }

     public function down(Schema $schema) : void

データベースをマイグレートする:

1
$ symfony console doctrine:migrations:migrate

また、 state のデフォルトの値を submitted としてセットされていることを確認してください:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
--- a/src/Entity/Comment.php
+++ b/src/Entity/Comment.php
@@ -55,9 +55,9 @@ class Comment
     private $photoFilename;

     /**
-     * @ORM\Column(type="string", length=255)
+     * @ORM\Column(type="string", length=255, options={"default": "submitted"})
      */
-    private $state;
+    private $state = 'submitted';

     public function __toString(): string
     {

EasyAdmin の設定を変更してコメントの状態(state)を見ることができるようにしましょう。

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
--- a/src/Controller/Admin/CommentCrudController.php
+++ b/src/Controller/Admin/CommentCrudController.php
@@ -51,6 +51,7 @@ class CommentCrudController extends AbstractCrudController
             ->setLabel('Photo')
             ->onlyOnIndex()
         ;
+        yield TextField::new('state');

         $createdAt = DateTimeField::new('createdAt')->setFormTypeOptions([
             'html5' => true,

フィクスチャに state をセットして、テストコードを修正しましょう:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
--- a/src/DataFixtures/AppFixtures.php
+++ b/src/DataFixtures/AppFixtures.php
@@ -37,8 +37,16 @@ class AppFixtures extends Fixture
         $comment1->setAuthor('Fabien');
         $comment1->setEmail('[email protected]');
         $comment1->setText('This was a great conference.');
+        $comment1->setState('published');
         $manager->persist($comment1);

+        $comment2 = new Comment();
+        $comment2->setConference($amsterdam);
+        $comment2->setAuthor('Lucas');
+        $comment2->setEmail('[email protected]');
+        $comment2->setText('I think this one is going to be moderated.');
+        $manager->persist($comment2);
+
         $admin = new Admin();
         $admin->setRoles(['ROLE_ADMIN']);
         $admin->setUsername('admin');

コントローラーのテストでは、バリデーションをシミュレートします:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
--- a/tests/Controller/ConferenceControllerTest.php
+++ b/tests/Controller/ConferenceControllerTest.php
@@ -2,6 +2,8 @@

 namespace App\Tests\Controller;

+use App\Repository\CommentRepository;
+use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;

 class ConferenceControllerTest extends WebTestCase
@@ -22,10 +24,16 @@ class ConferenceControllerTest extends WebTestCase
         $client->submitForm('Submit', [
             'comment_form[author]' => 'Fabien',
             'comment_form[text]' => 'Some feedback from an automated functional test',
-            'comment_form[email]' => '[email protected]',
+            'comment_form[email]' => $email = '[email protected]',
             'comment_form[photo]' => dirname(__DIR__, 2).'/public/images/under-construction.gif',
         ]);
         $this->assertResponseRedirects();
+
+        // simulate comment validation
+        $comment = self::$container->get(CommentRepository::class)->findOneByEmail($email);
+        $comment->setState('published');
+        self::$container->get(EntityManagerInterface::class)->flush();
+
         $client->followRedirect();
         $this->assertSelectorExists('div:contains("There are 2 comments")');
     }

PHPUnit のテストからは self::$container->get() を使えば、全てのサービス(非公開なサービスも含めて)を取得することができます。

メッセンジャーを理解する

Symfony で非同期処理を管理するために、メッセンジャーコンポーネントを使用します:

1
$ symfony composer req messenger

非同期処理が必要な際に、 メッセージメッセージバス に送ってください。このバスは キュー に格納され、処理を止めることなくすぐリターンされます。

コンシューマー は、継続的ににバックグラウンドで動いており、キューにある新しいメッセージを読み、そのメッセージに関連したロジックを実行します。コンシューマーは、Web アプリケーションと同じサーバーでも別のサーバーにあっても動作します。

レスポンスがない以外は、HTTP リクエストを処理するときととても似ています。

メッセージハンドラーをコーディングする

メッセージはデータオブジェクトのクラスで、ロジックを持つべきではありません。シリアライズされ、キューに格納されます。 "シンプルな"シリアライズ可能なデータのみを格納しましょう。

CommentMessage クラスを作成する:

src/Message/CommentMessage.php
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
namespace App\Message;

class CommentMessage
{
    private $id;
    private $context;

    public function __construct(int $id, array $context = [])
    {
        $this->id = $id;
        $this->context = $context;
    }

    public function getId(): int
    {
        return $this->id;
    }

    public function getContext(): array
    {
        return $this->context;
    }
}

メッセンジャーを使う際には、コントローラーではなくメッセージハンドラーが処理を担います。

App\MessageHandler ネームスペース以下に CommentMessage メッセージを処理する CommentMessageHandler クラスを作成してください:

src/MessageHandler/CommentMessageHandler.php
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
namespace App\MessageHandler;

use App\Message\CommentMessage;
use App\Repository\CommentRepository;
use App\SpamChecker;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class CommentMessageHandler implements MessageHandlerInterface
{
    private $spamChecker;
    private $entityManager;
    private $commentRepository;

    public function __construct(EntityManagerInterface $entityManager, SpamChecker $spamChecker, CommentRepository $commentRepository)
    {
        $this->entityManager = $entityManager;
        $this->spamChecker = $spamChecker;
        $this->commentRepository = $commentRepository;
    }

    public function __invoke(CommentMessage $message)
    {
        $comment = $this->commentRepository->find($message->getId());
        if (!$comment) {
            return;
        }

        if (2 === $this->spamChecker->getSpamScore($comment, $message->getContext())) {
            $comment->setState('spam');
        } else {
            $comment->setState('published');
        }

        $this->entityManager->flush();
    }
}

MessageHandlerInterface は、 マーカー インターフェースです。このインターフェースは、メッセンジャーハンドラのクラスを設定し Symfony の自動登録を行います。規約では、ハンドラーのロジックは、 __invoke() メソッドに書きます。このメソッドの引数の CommentMessage 型宣言から、どのクラスを処理するのか知ることができます。

コントローラーを修正します:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
--- a/src/Controller/ConferenceController.php
+++ b/src/Controller/ConferenceController.php
@@ -5,14 +5,15 @@ namespace App\Controller;
 use App\Entity\Comment;
 use App\Entity\Conference;
 use App\Form\CommentFormType;
+use App\Message\CommentMessage;
 use App\Repository\CommentRepository;
 use App\Repository\ConferenceRepository;
-use App\SpamChecker;
 use Doctrine\ORM\EntityManagerInterface;
 use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
 use Symfony\Component\HttpFoundation\File\Exception\FileException;
 use Symfony\Component\HttpFoundation\Request;
 use Symfony\Component\HttpFoundation\Response;
+use Symfony\Component\Messenger\MessageBusInterface;
 use Symfony\Component\Routing\Annotation\Route;
 use Twig\Environment;

@@ -20,11 +21,13 @@ class ConferenceController extends AbstractController
 {
     private $twig;
     private $entityManager;
+    private $bus;

-    public function __construct(Environment $twig, EntityManagerInterface $entityManager)
+    public function __construct(Environment $twig, EntityManagerInterface $entityManager, MessageBusInterface $bus)
     {
         $this->twig = $twig;
         $this->entityManager = $entityManager;
+        $this->bus = $bus;
     }

     #[Route('/', name: 'homepage')]
@@ -36,7 +39,7 @@ class ConferenceController extends AbstractController
     }

     #[Route('/conference/{slug}', name: 'conference')]
-    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, SpamChecker $spamChecker, string $photoDir): Response
+    public function show(Request $request, Conference $conference, CommentRepository $commentRepository, string $photoDir): Response
     {
         $comment = new Comment();
         $form = $this->createForm(CommentFormType::class, $comment);
@@ -54,6 +57,7 @@ class ConferenceController extends AbstractController
             }

             $this->entityManager->persist($comment);
+            $this->entityManager->flush();

             $context = [
                 'user_ip' => $request->getClientIp(),
@@ -61,11 +65,8 @@ class ConferenceController extends AbstractController
                 'referrer' => $request->headers->get('referer'),
                 'permalink' => $request->getUri(),
             ];
-            if (2 === $spamChecker->getSpamScore($comment, $context)) {
-                throw new \RuntimeException('Blatant spam, go away!');
-            }

-            $this->entityManager->flush();
+            $this->bus->dispatch(new CommentMessage($comment->getId(), $context));

             return $this->redirectToRoute('conference', ['slug' => $conference->getSlug()]);
         }

スパムチェッカーに依存するのではなく、メッセージバスにディスパッチするようになりました。そして、ハンドラーにどう処理するかを決めさせます。

コントローラーとスパムチェックを隔離し、ロジックを新しいクラスのハンドラーに移動しました。バスの良いユースケースです。コードをテストして、動作するか確認してください。全て同期的に実行されますが、コードは、 "ベター"になっています。

表示されるコメントを制限する

表示ロジックを変更して、公開されていないコメントをフロントエンドへ表示しないようにしましょう:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
--- a/src/Repository/CommentRepository.php
+++ b/src/Repository/CommentRepository.php
@@ -27,7 +27,9 @@ class CommentRepository extends ServiceEntityRepository
     {
         $query = $this->createQueryBuilder('c')
             ->andWhere('c.conference = :conference')
+            ->andWhere('c.state = :state')
             ->setParameter('conference', $conference)
+            ->setParameter('state', 'published')
             ->orderBy('c.createdAt', 'DESC')
             ->setMaxResults(self::PAGINATOR_PER_PAGE)
             ->setFirstResult($offset)

実際に非同期にする

デフォルトでは、ハンドラは、同期的に処理します。非同期にするために、config/packages/messenger.yaml の設定ファイルに、ハンドラがどのキューを使用するかを明示的に設定してください:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
--- a/.env
+++ b/.env
@@ -29,7 +29,7 @@ DATABASE_URL="postgresql://127.0.0.1:5432/db?serverVersion=13&charset=utf8"

 ###> symfony/messenger ###
 # Choose one of the transports below
-# MESSENGER_TRANSPORT_DSN=doctrine://default
+MESSENGER_TRANSPORT_DSN=doctrine://default
 # MESSENGER_TRANSPORT_DSN=amqp://guest:[email protected]:5672/%2f/messages
 # MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
 ###< symfony/messenger ###
--- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -5,10 +5,15 @@ framework:

         transports:
             # https://symfony.com/doc/current/messenger.html#transport-configuration
-            # async: '%env(MESSENGER_TRANSPORT_DSN)%'
+            async:
+                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
+                options:
+                    auto_setup: false
+                    use_notify: true
+                    check_delayed_interval: 60000
             # failed: 'doctrine://default?queue_name=failed'
             # sync: 'sync://'

         routing:
             # Route your messages to the transports
-            # 'App\Message\YourMessage': async
+            App\Message\CommentMessage: async

この設定では、バスに App\Message\CommentMessage のインスタンスを 非同期 にキューへ送るようにしています。 非同期キューは環境変数 MESSENGER_TRANSPORT_DSN によって定義されており、 .env でDoctrineを指すように設定されています。つまり、メッセージのキューとして、PostgreSQLを利用しています。

PostgreSQLのテーブルとトリガーを設定します:

1
$ symfony console make:migration

そしてデータベースをマイグレーションします:

1
$ symfony console doctrine:migrations:migrate

ちなみに

舞台裏では、SymfonyはPostgreSQLに組み込みの、高速で、スケーラブルで、トランザクションできる pub/sub システム(LISTEN/NOTIFY)を利用しています。メッセージの保存先としてPostgreSQLの代わりにRabbitMQを使いたい場合は、RabbitMQの章を読んでみてください。

メッセージを取得実行する

新しくコメントを投稿しても、スパムチェッカーは呼ばれなくなりました。 getSpamScore() メソッドで error_log() 関数を追加して確認してみてください。代わりにキューにメッセージが入るようになったので、他のプロセスから取得され実行される準備ができました。

もうお分かりかもしれませんが、Symfony は、メッセージを取得し、実行するコマンドがビルトインされていますので、実行してみましょう:

1
$ symfony console messenger:consume async -vv

コマンドを実行するとすぐに、コメント送信でディスパッチされたメッセージが取得され、実行されるはずです:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
 [OK] Consuming messages from transports "async".

 // The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.

 // Quit the worker with CONTROL-C.

11:30:20 INFO      [messenger] Received message App\Message\CommentMessage ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]
11:30:20 INFO      [http_client] Request: "POST https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [http_client] Response: "200 https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
11:30:20 INFO      [messenger] Message App\Message\CommentMessage handled by App\MessageHandler\CommentMessageHandler::__invoke ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage","handler" => "App\MessageHandler\CommentMessageHandler::__invoke"]
11:30:20 INFO      [messenger] App\Message\CommentMessage was handled successfully (acknowledging to transport). ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]

メッセージ取得実行の処理がログに書かれますが、 --vv フラグを渡すことでコンソールに即時的なフィードバックを得ることができます。さらに、Akismet Akismet API の呼び出しを探すこともできます。

メッセージの取得実行は、Ctrl+C でストップします。

ワーカーをバックグラウンドで実行する

コメントを投稿した際に、毎回メッセージ取得の起動と停止を行うのではなく、ターミナルのウィンドウやタブを開くことなく、継続的に実行するようにしましょう。

Symfony CLI は、 run コマンドに -d フラグを付けることでデーモンとすることができ、こういったバックグラウンドで実行するコマンドを管理することができます。

メッセージ取得実行をもう一度走らせてください。今度はバックグラウンドで送信しましょう:

1
$ symfony run -d --watch=config,src,templates,vendor symfony console messenger:consume async

--watch オプションを付けることで、config/, src/, templates/, vendor/ ディレクトリ内のファイルシステムに変更があった際に、Symfony にコマンドをリスタートさせることができます。

注釈

server:log でメッセージを重複させたくない際は、--vv オプションは使用しないでください(ログされたメッセージとコンソールのメッセージ)。

メモリ制限やバグなどでメッセージの取得実行が停止した際は、自動的に再起動します。また、メッセージの取得実行の失敗が暴走した際は、 Symfony CLI は処理を停止します。

symfony server:log コマンドで、PHP やWebサーバー、アプリケーションの全てのログのストリームを見ることができます:

1
$ symfony server:log

server:status コマンドを使えば、現在のプロジェクトで管理されているバックグランドのワーカーの全ての一覧を表示できます:

1
2
3
4
$ symfony server:status

Web server listening on https://127.0.0.1:8000
  Command symfony console messenger:consume async running with PID 15774 (watching config/, src/, templates/)

ワーカーを停止するには、Webサーバーを止めるか、server:status コマンドで得られる PID をキルしてください:

1
$ kill 15774

メッセージの失敗をリトライする

メッセージ取得実行の際に、Akismet が落ちていたらどうしますか?コメントの投稿者には何も影響はありませんが、メッセージを失うことになり、スパムはチェックされません。

メッセンジャーには、メッセージのハンドリングで例外になったらリトライする機構があるので、設定してみましょう:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
--- a/config/packages/messenger.yaml
+++ b/config/packages/messenger.yaml
@@ -1,7 +1,7 @@
 framework:
     messenger:
         # Uncomment this (and the failed transport below) to send failed messages to this transport for later handling.
-        # failure_transport: failed
+        failure_transport: failed

         transports:
             # https://symfony.com/doc/current/messenger.html#transport-configuration
@@ -10,7 +10,10 @@ framework:
                 options:
                     use_notify: true
                     check_delayed_interval: 60000
-            # failed: 'doctrine://default?queue_name=failed'
+                retry_strategy:
+                    max_retries: 3
+                    multiplier: 2
+            failed: 'doctrine://default?queue_name=failed'
             # sync: 'sync://'

         routing:

メッセージのハンドリングに問題が起きた際に、メッセージの取得実行は諦めるまでに3回リトライをします。ただし、メッセージを廃棄するのではなく、恒久的に failed キューに保存します。failedキューは通常のキューとは別のデータベーステーブルを利用します。

失敗したメッセージを調べ、再実行するには次のコマンドを使用します:

1
2
3
$ symfony console messenger:failed:show

$ symfony console messenger:failed:retry

SymfonyCloud でワーカーを実行する

PostgreSQL からメッセージ取得実行をするには、 messenger:consume コマンドを継続的に実行する必要があります。これは SymfonyCloud の ワーカー の役割です:

patch_file
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
--- a/.symfony.cloud.yaml
+++ b/.symfony.cloud.yaml
@@ -50,3 +50,8 @@ hooks:
         set -x -e

         (>&2 symfony-deploy)
+
+workers:
+    messages:
+        commands:
+            start: symfony console messenger:consume async -vv --time-limit=3600 --memory-limit=128M

Symfony CLI のように、SymfonyCloud マネージャーはログをリスタートします。

ワーカーのログを取得するには、以下のようにしてください:

1
$ symfony logs --worker=messages all

  • « Previous ステップ 17: テストをする
  • Next » ステップ 19: ワークフローを使って判定する

This work, including the code samples, is licensed under a Creative Commons BY-NC-SA 4.0 license.