From 0bfd3d3afae120a65539fd75fbe0129096839b23 Mon Sep 17 00:00:00 2001 From: Yonel Ceruto Date: Fri, 1 Nov 2024 15:10:26 -0400 Subject: [PATCH] Streamlining server event streaming --- .../Component/HttpFoundation/CHANGELOG.md | 1 + .../HttpFoundation/EventStreamResponse.php | 110 +++++++++++++ .../Component/HttpFoundation/ServerEvent.php | 147 ++++++++++++++++++ .../Tests/EventStreamResponseTest.php | 127 +++++++++++++++ 4 files changed, 385 insertions(+) create mode 100644 src/Symfony/Component/HttpFoundation/EventStreamResponse.php create mode 100644 src/Symfony/Component/HttpFoundation/ServerEvent.php create mode 100644 src/Symfony/Component/HttpFoundation/Tests/EventStreamResponseTest.php diff --git a/src/Symfony/Component/HttpFoundation/CHANGELOG.md b/src/Symfony/Component/HttpFoundation/CHANGELOG.md index 6861b3b365983..0841fa9ab5252 100644 --- a/src/Symfony/Component/HttpFoundation/CHANGELOG.md +++ b/src/Symfony/Component/HttpFoundation/CHANGELOG.md @@ -5,6 +5,7 @@ CHANGELOG --- * Add support for iterable of string in `StreamedResponse` + * Add `EventStreamResponse` and `ServerEvent` classes to streamline server event streaming 7.2 --- diff --git a/src/Symfony/Component/HttpFoundation/EventStreamResponse.php b/src/Symfony/Component/HttpFoundation/EventStreamResponse.php new file mode 100644 index 0000000000000..fe1a2872e0371 --- /dev/null +++ b/src/Symfony/Component/HttpFoundation/EventStreamResponse.php @@ -0,0 +1,110 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\HttpFoundation; + +/** + * Represents a streaming HTTP response for sending server events + * as part of the Server-Sent Events (SSE) streaming technique. + * + * To broadcast events to multiple users at once, for long-running + * connections and for high-traffic websites, prefer using the Mercure + * Symfony Component, which relies on Software designed for these use + * cases: https://symfony.com/doc/current/mercure.html + * + * @see ServerEvent + * + * @author Yonel Ceruto + * + * Example usage: + * + * return new EventStreamResponse(function () { + * yield new ServerEvent(time()); + * + * sleep(1); + * + * yield new ServerEvent(time()); + * }); + */ +class EventStreamResponse extends StreamedResponse +{ + /** + * @param int|null $retry The number of milliseconds the client should wait + * before reconnecting in case of network failure + */ + public function __construct(?callable $callback = null, int $status = 200, array $headers = [], private ?int $retry = null) + { + $headers += [ + 'Connection' => 'keep-alive', + 'Content-Type' => 'text/event-stream', + 'Cache-Control' => 'private, no-cache, no-store, must-revalidate, max-age=0', + 'X-Accel-Buffering' => 'no', + 'Pragma' => 'no-cache', + 'Expire' => '0', + ]; + + parent::__construct($callback, $status, $headers); + } + + public function setCallback(callable $callback): static + { + if ($this->callback) { + return parent::setCallback($callback); + } + + $this->callback = function () use ($callback) { + if (is_iterable($events = $callback($this))) { + foreach ($events as $event) { + $this->sendEvent($event); + + if (connection_aborted()) { + break; + } + } + } + }; + + return $this; + } + + /** + * Sends a server event to the client. + * + * @return $this + */ + public function sendEvent(ServerEvent $event): static + { + if ($this->retry > 0 && !$event->getRetry()) { + $event->setRetry($this->retry); + } + + foreach ($event as $part) { + echo $part; + + if (!\in_array(\PHP_SAPI, ['cli', 'phpdbg', 'embed'], true)) { + static::closeOutputBuffers(0, true); + flush(); + } + } + + return $this; + } + + public function getRetry(): ?int + { + return $this->retry; + } + + public function setRetry(int $retry): void + { + $this->retry = $retry; + } +} diff --git a/src/Symfony/Component/HttpFoundation/ServerEvent.php b/src/Symfony/Component/HttpFoundation/ServerEvent.php new file mode 100644 index 0000000000000..ea2b5c885e8bd --- /dev/null +++ b/src/Symfony/Component/HttpFoundation/ServerEvent.php @@ -0,0 +1,147 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\HttpFoundation; + +/** + * An event generated on the server intended for streaming to the client + * as part of the SSE streaming technique. + * + * @implements \IteratorAggregate + * + * @author Yonel Ceruto + */ +class ServerEvent implements \IteratorAggregate +{ + /** + * @param string|iterable $data The event data field for the message + * @param string|null $type The event type + * @param int|null $retry The number of milliseconds the client should wait + * before reconnecting in case of network failure + * @param string|null $id The event ID to set the EventSource object's last event ID value + * @param string|null $comment The event comment + */ + public function __construct( + private string|iterable $data, + private ?string $type = null, + private ?int $retry = null, + private ?string $id = null, + private ?string $comment = null, + ) { + } + + public function getData(): iterable|string + { + return $this->data; + } + + /** + * @return $this + */ + public function setData(iterable|string $data): static + { + $this->data = $data; + + return $this; + } + + public function getType(): ?string + { + return $this->type; + } + + /** + * @return $this + */ + public function setType(string $type): static + { + $this->type = $type; + + return $this; + } + + public function getRetry(): ?int + { + return $this->retry; + } + + /** + * @return $this + */ + public function setRetry(?int $retry): static + { + $this->retry = $retry; + + return $this; + } + + public function getId(): ?string + { + return $this->id; + } + + /** + * @return $this + */ + public function setId(string $id): static + { + $this->id = $id; + + return $this; + } + + public function getComment(): ?string + { + return $this->comment; + } + + public function setComment(string $comment): static + { + $this->comment = $comment; + + return $this; + } + + /** + * @return \Traversable + */ + public function getIterator(): \Traversable + { + static $lastRetry = null; + + $head = ''; + if ($this->comment) { + $head .= \sprintf(': %s', $this->comment)."\n"; + } + if ($this->id) { + $head .= \sprintf('id: %s', $this->id)."\n"; + } + if ($this->retry > 0 && $this->retry !== $lastRetry) { + $head .= \sprintf('retry: %s', $lastRetry = $this->retry)."\n"; + } + if ($this->type) { + $head .= \sprintf('event: %s', $this->type)."\n"; + } + yield $head; + + if ($this->data) { + if (is_iterable($this->data)) { + foreach ($this->data as $data) { + yield \sprintf('data: %s', $data)."\n"; + } + } else { + yield \sprintf('data: %s', $this->data)."\n"; + } + } + + yield "\n"; + } +} diff --git a/src/Symfony/Component/HttpFoundation/Tests/EventStreamResponseTest.php b/src/Symfony/Component/HttpFoundation/Tests/EventStreamResponseTest.php new file mode 100644 index 0000000000000..4c430fbe85e68 --- /dev/null +++ b/src/Symfony/Component/HttpFoundation/Tests/EventStreamResponseTest.php @@ -0,0 +1,127 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\HttpFoundation\Tests; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\HttpFoundation\EventStreamResponse; +use Symfony\Component\HttpFoundation\ServerEvent; + +class EventStreamResponseTest extends TestCase +{ + public function testInitializationWithDefaultValues() + { + $response = new EventStreamResponse(); + + $this->assertSame('text/event-stream', $response->headers->get('content-type')); + $this->assertSame('max-age=0, must-revalidate, no-cache, no-store, private', $response->headers->get('cache-control')); + $this->assertSame('keep-alive', $response->headers->get('connection')); + + $this->assertSame(200, $response->getStatusCode()); + $this->assertNull($response->getRetry()); + } + + public function testStreamSingleEvent() + { + $response = new EventStreamResponse(function () { + yield new ServerEvent( + data: 'foo', + type: 'bar', + retry: 100, + id: '1', + comment: 'bla bla', + ); + }); + + $expected = <<assertSameResponseContent($expected, $response); + } + + public function testStreamEventsAndData() + { + $data = static function (): iterable { + yield 'first line'; + yield 'second line'; + yield 'third line'; + }; + + $response = new EventStreamResponse(function () use ($data) { + yield new ServerEvent('single line'); + yield new ServerEvent(['first line', 'second line']); + yield new ServerEvent($data()); + }); + + $expected = <<assertSameResponseContent($expected, $response); + } + + public function testStreamEventsWithRetryFallback() + { + $response = new EventStreamResponse(function () { + yield new ServerEvent('foo'); + yield new ServerEvent('bar'); + yield new ServerEvent('baz', retry: 1000); + }, retry: 1500); + + $expected = <<assertSameResponseContent($expected, $response); + } + + public function testStreamEventWithSendMethod() + { + $response = new EventStreamResponse(function (EventStreamResponse $response) { + $response->sendEvent(new ServerEvent('foo')); + }); + + $this->assertSameResponseContent("data: foo\n\n", $response); + } + + private function assertSameResponseContent(string $expected, EventStreamResponse $response): void + { + ob_start(); + $response->send(); + $actual = ob_get_clean(); + + $this->assertSame($expected, $actual); + } +}