How to Create Your own Messenger Transport
Warning: You are browsing the documentation for Symfony 4.x, which is no longer maintained.
Read the updated version of this page for Symfony 7.1 (the current stable version).
Once you have written your transport's sender and receiver, you can register your transport factory to be able to use it via a DSN in the Symfony application.
Create your Transport Factory
You need to give FrameworkBundle the opportunity to create your transport from a DSN. You will need a transport factory:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
class YourTransportFactory implements TransportFactoryInterface
{
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
{
return new YourTransport(/* ... */);
}
public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'my-transport://');
}
}
The transport object needs to implement the TransportInterface (which combines the SenderInterface and ReceiverInterface). Here is a simplified example of a database transport:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
use Ramsey\Uuid\Uuid;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
class YourTransport implements TransportInterface
{
private $db;
private $serializer;
/**
* @param FakeDatabase $db is used for demo purposes. It is not a real class.
*/
public function __construct(FakeDatabase $db, SerializerInterface $serializer = null)
{
$this->db = $db;
$this->serializer = $serializer ?? new PhpSerializer();
}
public function get(): iterable
{
// Get a message from "my_queue"
$row = $this->db->createQuery(
'SELECT *
FROM my_queue
WHERE (delivered_at IS NULL OR delivered_at < :redeliver_timeout)
AND handled = FALSE'
)
->setParameter('redeliver_timeout', new DateTimeImmutable('-5 minutes'))
->getOneOrNullResult();
if (null === $row) {
return [];
}
$envelope = $this->serializer->decode([
'body' => $row['envelope'],
]);
return [$envelope->with(new TransportMessageIdStamp($row['id']))];
}
public function ack(Envelope $envelope): void
{
$stamp = $envelope->last(TransportMessageIdStamp::class);
if (!$stamp instanceof TransportMessageIdStamp) {
throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
}
// Mark the message as "handled"
$this->db->createQuery('UPDATE my_queue SET handled = TRUE WHERE id = :id')
->setParameter('id', $stamp->getId())
->execute();
}
public function reject(Envelope $envelope): void
{
$stamp = $envelope->last(TransportMessageIdStamp::class);
if (!$stamp instanceof TransportMessageIdStamp) {
throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
}
// Delete the message from the "my_queue" table
$this->db->createQuery('DELETE FROM my_queue WHERE id = :id')
->setParameter('id', $stamp->getId())
->execute();
}
public function send(Envelope $envelope): Envelope
{
$encodedMessage = $this->serializer->encode($envelope);
$uuid = Uuid::uuid4()->toString();
// Add a message to the "my_queue" table
$this->db->createQuery(
'INSERT INTO my_queue (id, envelope, delivered_at, handled)
VALUES (:id, :envelope, NULL, FALSE)'
)
->setParameters([
'id' => $uuid,
'envelope' => $encodedMessage['body'],
])
->execute();
return $envelope->with(new TransportMessageIdStamp($uuid));
}
}
The implementation above is not runnable code but illustrates how a TransportInterface could be implemented. For real implementations see InMemoryTransport and DoctrineReceiver.
Register your Factory
1 2 3 4
# config/services.yaml
services:
Your\Transport\YourTransportFactory:
tags: [messenger.transport_factory]
1 2 3 4 5 6 7 8 9 10 11 12 13
<!-- config/services.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://symfony.com/schema/dic/services
https://symfony.com/schema/dic/services/services-1.0.xsd">
<services>
<service id="Your\Transport\YourTransportFactory">
<tag name="messenger.transport_factory"/>
</service>
</services>
</container>
1 2 3 4 5
// config/services.php
use Your\Transport\YourTransportFactory;
$container->register(YourTransportFactory::class)
->setTags(['messenger.transport_factory']);
Use your Transport
Within the framework.messenger.transports.*
configuration, create your
named transport using your own DSN:
1 2 3 4 5
# config/packages/messenger.yaml
framework:
messenger:
transports:
yours: 'my-transport://...'
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
<!-- config/packages/messenger.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services
https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
<framework:config>
<framework:messenger>
<framework:transport name="yours" dsn="my-transport://..."/>
</framework:messenger>
</framework:config>
</container>
1 2 3 4 5 6 7 8
// config/packages/messenger.php
$container->loadFromExtension('framework', [
'messenger' => [
'transports' => [
'yours' => 'my-transport://...',
],
],
]);
In addition of being able to route your messages to the yours
sender, this
will give you access to the following services:
messenger.sender.yours
: the sender;messenger.receiver.yours
: the receiver.