Certain transports used in the Messenger component have a timeout that determines how long a message is considered "in progress." If a message is not processed within that time frame, it is returned to the queue of pending messages, potentially causing it to be processed multiple times. In Symfony 7.2, we're addressing this issue in two ways.
First, we've introduced a keepalive feature for Messenger. The new KeepaliveReceiverInterface, which can be optionally implemented by transports, defines the following method:
1 2 3 4 5 6
/**
* Informs the transport that the message is still being processed, avoiding a timeout on the transport's side.
*
* @throws TransportException If there is an issue communicating with the transport
*/
public function keepalive(Envelope $envelope): void;
Some of the built-in transports have already been updated to implement this interface, and we'll continue updating others in future Symfony versions.
Second, we've added a ConsoleAlarmEvent
to notify the \SIGALRM
signal and enabled console commands to schedule alarms. The main Application
class, extended by all Symfony console applications, now includes the following method:
1 2
// Sets the interval to schedule a SIGALRM signal in seconds.
public function setAlarmInterval(?int $seconds): void
Thanks to this new feature, we've updated the messenger:consume
and
messenger:failed:retry
commands to include a new --keepalive
option.
This enhancement allows Messenger commands to run the following logic:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
private const DEFAULT_KEEPALIVE_INTERVAL = 5;
// ...
// schedule to send the SIGALRM every N seconds
if ($input->hasParameterOption('--keepalive')) {
$this->getApplication()->setAlarmInterval(
(int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL)
);
}
// ...
// handle the SIGALARM event by calling the keepalive feature of the transport
public function handleSignal(int $signal, int|false $previousExitCode = 0): int|false
{
// ...
if (\SIGALRM === $signal) {
$this->worker->keepalive();
// ...
}
}
finally, I could send keepalive packets directly from the app and remove AMQP proxy in my AWS :)
๐๐๐