Yonel Ceruto
Contributed by Yonel Ceruto in #58743

Mercure is the recommended solution for adding real-time features to Symfony applications. However, sometimes your needs are simpler and you can implement your own SSE (Server-sent Event) backend using Symfony components.

With current Symfony features, your controller might look like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public function __invoke(): StreamedResponse
{
    $response = new StreamedResponse(function () {
        foreach ($this->watchJobsInProgress() as $job) {
            echo "type: jobs\n";
            echo "data: ".$job->toJson()."\n\n";

            StreamedResponse::closeOutputBuffers(0, true);
            flush();

            if (connection_aborted()) {
                break;
            }

            sleep(1);
        }
    });
    $response->headers->set('Content-Type', 'text/event-stream');
    $response->headers->set('Cache-Control', 'no-cache');
    $response->headers->set('Connection', 'keep-alive');
    $response->headers->set('X-Accel-Buffering', 'no');

    return $response;
}

In Symfony 7.3, we've improved the DX (developer experience) around this. The same controller can now be written like this:

1
2
3
4
5
6
7
8
9
10
public function __invoke(): EventStreamResponse
{
    return new EventStreamResponse(function () {
        foreach ($this->watchJobsInProgress() as $job) {
            yield new ServerEvent($job->toJson(), type: 'jobs');

            sleep(1);
        }
    });
}

First, we've introduced a ServerEvent DTO class modeled after the SSE specification. It represents individual events streamed from the server and includes fields like data, type, id, retry, and comment.

It also implements a getIterator() method that automatically formats all these fields according to the SSE spec. It even supports iterable data for multi-line messages or complex structures in a single event.

We've also added an EventStreamResponse class to simplify sending events. It sets all required HTTP headers (Content-Type, Cache-Control, Connection) for you. The class accepts a generator that yields events:

1
2
3
4
5
6
7
use Symfony\Component\HttpFoundation\EventStreamResponse;
use Symfony\Component\HttpFoundation\ServerEvent;

return new EventStreamResponse(function (): \Generator {
    // events are automatically serialized and output is buffered as needed
    yield new ServerEvent(time(), type: 'ping');
});

The callback receives the response instance as its first argument, allowing you to manually send events using the sendEvent() method. For example, you can listen for Redis messages and dispatch events on demand:

1
2
3
4
5
6
7
return new EventStreamResponse(function (EventStreamResponse $response) {
    $redis = new \Redis();
    $redis->connect('127.0.0.1');
    $redis->subscribe(['message'], function (/* ... */, string $message) use ($response) {
        $response->sendEvent(new ServerEvent($message));
    });
});
Published in #Living on the edge