Skip to content
  • About
    • What is Symfony?
    • Community
    • News
    • Contributing
    • Support
  • Documentation
    • Symfony Docs
    • Symfony Book
    • Screencasts
    • Symfony Bundles
    • Symfony Cloud
    • Training
  • Services
    • Platform.sh for Symfony Best platform to deploy Symfony apps
    • SymfonyInsight Automatic quality checks for your apps
    • Symfony Certification Prove your knowledge and boost your career
    • SensioLabs Professional services to help you with Symfony
    • Blackfire Profile and monitor performance of your apps
  • Other
  • Blog
  • Download
sponsored by SensioLabs
  1. Home
  2. Documentation
  3. Messenger
  4. How to Create Your own Messenger Transport
  • Documentation
  • Book
  • Reference
  • Bundles
  • Cloud

Table of Contents

  • Create your Transport Factory
  • Register your Factory
  • Use your Transport

How to Create Your own Messenger Transport

Edit this page

Warning: You are browsing the documentation for Symfony 4.3, which is no longer maintained.

Read the updated version of this page for Symfony 6.3 (the current stable version).

How to Create Your own Messenger Transport

Once you have written your transport's sender and receiver, you can register your transport factory to be able to use it via a DSN in the Symfony application.

Create your Transport Factory

You need to give FrameworkBundle the opportunity to create your transport from a DSN. You will need a transport factory:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

class YourTransportFactory implements TransportFactoryInterface
{
    public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
    {
        return new YourTransport(/* ... */);
    }

    public function supports(string $dsn, array $options): bool
    {
        return 0 === strpos($dsn, 'my-transport://');
    }
}

The transport object needs to implement the TransportInterface (which combines the SenderInterface and ReceiverInterface). Here is a simplified example of a database transport:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
use Ramsey\Uuid\Uuid;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

class YourTransport implements TransportInterface
{
    private $db;
    private $serializer;

    /**
     * @param FakeDatabase $db is used for demo purposes. It is not a real class.
     */
    public function __construct(FakeDatabase $db, SerializerInterface $serializer = null)
    {
        $this->db = $db;
        $this->serializer = $serializer ?? new PhpSerializer();
    }

    public function get(): iterable
    {
        // Get a message from "my_queue"
        $row = $this->db->createQuery(
                'SELECT *
                FROM my_queue
                WHERE (delivered_at IS NULL OR delivered_at < :redeliver_timeout)
                AND handled = FALSE'
            )
            ->setParameter('redeliver_timeout', new DateTimeImmutable('-5minutes'))
            ->getOneOrNullResult();

        if (null === $row) {
            return [];
        }

        $envelope = $this->serializer->decode([
            'body' => $row['envelope'],
        ]);

        return [$envelope->with(new TransportMessageIdStamp($row['id']))];
    }

    public function ack(Envelope $envelope): void
    {
        $stamp = $envelope->last(TransportMessageIdStamp::class);
        if (!$stamp instanceof TransportMessageIdStamp) {
            throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
        }

        // Mark the message as "handled"
        $this->db->createQuery('UPDATE my_queue SET handled = TRUE WHERE id = :id')
            ->setParameter('id', $stamp->getId())
            ->execute();
    }

    public function reject(Envelope $envelope): void
    {
        $stamp = $envelope->last(TransportMessageIdStamp::class);
        if (!$stamp instanceof TransportMessageIdStamp) {
            throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
        }

        // Delete the message from the "my_queue" table
        $this->db->createQuery('DELETE FROM my_queue WHERE id = :id')
            ->setParameter('id', $stamp->getId())
            ->execute();
    }

    public function send(Envelope $envelope): Envelope
    {
        $encodedMessage = $this->serializer->encode($envelope);
        $uuid = Uuid::uuid4()->toString();

        // Add a message to the "my_queue" table
        $this->db->createQuery(
                'INSERT INTO my_queue (id, envelope, delivered_at, handled)
                VALUES (:id, :envelope, NULL, FALSE)'
            )
            ->setParameters([
                'id' => $uuid,
                'envelope' => $encodedMessage['body'],
            ])
            ->execute();

        return $envelope->with(new TransportMessageIdStamp($uuid));
    }
}

The implementation above is not runnable code but illustrates how a TransportInterface could be implemented. For real implementations see InMemoryTransport and DoctrineReceiver.

Register your Factory

1
2
3
4
# config/services.yaml
services:
    Your\Transport\YourTransportFactory:
        tags: [messenger.transport_factory]
1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- config/services.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://symfony.com/schema/dic/services
        https://symfony.com/schema/dic/services/services-1.0.xsd">

    <services>
        <service id="Your\Transport\YourTransportFactory">
           <tag name="messenger.transport_factory"/>
        </service>
    </services>
</container>
1
2
3
4
5
// config/services.php
use Your\Transport\YourTransportFactory;

$container->register(YourTransportFactory::class)
    ->setTags(['messenger.transport_factory']);

Use your Transport

Within the framework.messenger.transports.* configuration, create your named transport using your own DSN:

1
2
3
4
5
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            yours: 'my-transport://...'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<!-- config/packages/messenger.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:framework="http://symfony.com/schema/dic/symfony"
    xsi:schemaLocation="http://symfony.com/schema/dic/services
        https://symfony.com/schema/dic/services/services-1.0.xsd
        http://symfony.com/schema/dic/symfony
        https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

    <framework:config>
        <framework:messenger>
            <framework:transport name="yours" dsn="my-transport://..."/>
        </framework:messenger>
    </framework:config>
</container>
1
2
3
4
5
6
7
8
// config/packages/messenger.php
$container->loadFromExtension('framework', [
    'messenger' => [
        'transports' => [
            'yours' => 'my-transport://...',
        ],
    ],
]);

In addition of being able to route your messages to the yours sender, this will give you access to the following services:

  1. messenger.sender.yours: the sender;
  2. messenger.receiver.yours: the receiver.
This work, including the code samples, is licensed under a Creative Commons BY-SA 3.0 license.
TOC
    Version
    We stand with Ukraine.
    Version:
    Save your teams and projects before they sink

    Save your teams and projects before they sink

    Code consumes server resources. Blackfire tells you how

    Code consumes server resources. Blackfire tells you how

    Symfony footer

    ↓ Our footer now uses the colors of the Ukrainian flag because Symfony stands with the people of Ukraine.

    Avatar of Hugo Monteiro, a Symfony contributor

    Thanks Hugo Monteiro (@monteiro) for being a Symfony contributor

    15 commits • 3.90K lines changed

    View all contributors that help us make Symfony

    Become a Symfony contributor

    Be an active part of the community and contribute ideas, code and bug fixes. Both experts and newcomers are welcome.

    Learn how to contribute

    Symfony™ is a trademark of Symfony SAS. All rights reserved.

    • What is Symfony?

      • Symfony at a Glance
      • Symfony Components
      • Case Studies
      • Symfony Releases
      • Security Policy
      • Logo & Screenshots
      • Trademark & Licenses
      • symfony1 Legacy
    • Learn Symfony

      • Symfony Docs
      • Symfony Book
      • Reference
      • Bundles
      • Best Practices
      • Training
      • eLearning Platform
      • Certification
    • Screencasts

      • Learn Symfony
      • Learn PHP
      • Learn JavaScript
      • Learn Drupal
      • Learn RESTful APIs
    • Community

      • SymfonyConnect
      • Support
      • How to be Involved
      • Code of Conduct
      • Events & Meetups
      • Projects using Symfony
      • Downloads Stats
      • Contributors
      • Backers
    • Blog

      • Events & Meetups
      • A week of symfony
      • Case studies
      • Cloud
      • Community
      • Conferences
      • Diversity
      • Documentation
      • Living on the edge
      • Releases
      • Security Advisories
      • SymfonyInsight
      • Twig
      • SensioLabs
    • Services

      • SensioLabs services
      • Train developers
      • Manage your project quality
      • Improve your project performance
      • Host Symfony projects

      Deployed on

    Follow Symfony

    Search by Meilisearch