How to Create Your own Messenger Transport
Once you have written your transport's sender and receiver, you can register your transport factory to be able to use it via a DSN in the Symfony application.
Create your Transport Factory
You need to give FrameworkBundle the opportunity to create your transport from a DSN. You will need a transport factory:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
class YourTransportFactory implements TransportFactoryInterface
{
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
{
return new YourTransport(/* ... */);
}
public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'my-transport://');
}
}
The transport object needs to implement the TransportInterface (which combines the SenderInterface and ReceiverInterface). Here is a simplified example of a database transport:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\Uid\Uuid;
class YourTransport implements TransportInterface
{
private $db;
private $serializer;
/**
* @param FakeDatabase $db is used for demo purposes. It is not a real class.
*/
public function __construct(FakeDatabase $db, ?SerializerInterface $serializer = null)
{
$this->db = $db;
$this->serializer = $serializer ?? new PhpSerializer();
}
public function get(): iterable
{
// Get a message from "my_queue"
$row = $this->db->createQuery(
'SELECT *
FROM my_queue
WHERE (delivered_at IS NULL OR delivered_at < :redeliver_timeout)
AND handled = FALSE'
)
->setParameter('redeliver_timeout', new DateTimeImmutable('-5 minutes'))
->getOneOrNullResult();
if (null === $row) {
return [];
}
$envelope = $this->serializer->decode([
'body' => $row['envelope'],
]);
return [$envelope->with(new TransportMessageIdStamp($row['id']))];
}
public function ack(Envelope $envelope): void
{
$stamp = $envelope->last(TransportMessageIdStamp::class);
if (!$stamp instanceof TransportMessageIdStamp) {
throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
}
// Mark the message as "handled"
$this->db->createQuery('UPDATE my_queue SET handled = TRUE WHERE id = :id')
->setParameter('id', $stamp->getId())
->execute();
}
public function reject(Envelope $envelope): void
{
$stamp = $envelope->last(TransportMessageIdStamp::class);
if (!$stamp instanceof TransportMessageIdStamp) {
throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
}
// Delete the message from the "my_queue" table
$this->db->createQuery('DELETE FROM my_queue WHERE id = :id')
->setParameter('id', $stamp->getId())
->execute();
}
public function send(Envelope $envelope): Envelope
{
$encodedMessage = $this->serializer->encode($envelope);
$uuid = (string) Uuid::v4();
// Add a message to the "my_queue" table
$this->db->createQuery(
'INSERT INTO my_queue (id, envelope, delivered_at, handled)
VALUES (:id, :envelope, NULL, FALSE)'
)
->setParameters([
'id' => $uuid,
'envelope' => $encodedMessage['body'],
])
->execute();
return $envelope->with(new TransportMessageIdStamp($uuid));
}
}
The implementation above is not runnable code but illustrates how a TransportInterface could be implemented. For real implementations see InMemoryTransport and DoctrineReceiver.
Register your Factory
Before using your factory, you must register it. If you're using the default services.yaml configuration, this is already done for you, thanks to autoconfiguration. Otherwise, add the following:
1 2 3 4
# config/services.yaml
services:
Your\Transport\YourTransportFactory:
tags: [messenger.transport_factory]
1 2 3 4 5 6 7 8 9 10 11 12 13
<!-- config/services.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://symfony.com/schema/dic/services
https://symfony.com/schema/dic/services/services-1.0.xsd">
<services>
<service id="Your\Transport\YourTransportFactory">
<tag name="messenger.transport_factory"/>
</service>
</services>
</container>
1 2 3 4 5
// config/services.php
use Your\Transport\YourTransportFactory;
$container->register(YourTransportFactory::class)
->setTags(['messenger.transport_factory']);
Use your Transport
Within the framework.messenger.transports.*
configuration, create your
named transport using your own DSN:
1 2 3 4 5
# config/packages/messenger.yaml
framework:
messenger:
transports:
yours: 'my-transport://...'
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
<!-- config/packages/messenger.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services
https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony
https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
<framework:config>
<framework:messenger>
<framework:transport name="yours" dsn="my-transport://..."/>
</framework:messenger>
</framework:config>
</container>
1 2 3 4 5 6 7 8 9
// config/packages/messenger.php
use Symfony\Config\FrameworkConfig;
return static function (FrameworkConfig $framework) {
$framework->messenger()
->transport('yours')
->dsn('my-transport://...')
;
};
In addition of being able to route your messages to the yours
sender, this
will give you access to the following services:
messenger.sender.yours
: the sender;messenger.receiver.yours
: the receiver.