diff --git a/composer.json b/composer.json index 1cc8af1144113..67a1cfbafe582 100644 --- a/composer.json +++ b/composer.json @@ -144,7 +144,7 @@ "monolog/monolog": "^3.0", "nikic/php-parser": "^4.18|^5.0", "nyholm/psr7": "^1.0", - "pda/pheanstalk": "^4.0", + "pda/pheanstalk": "^5.1|^7.0", "php-http/discovery": "^1.15", "php-http/httplug": "^1.0|^2.0", "phpdocumentor/reflection-docblock": "^5.2", diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php index 1480e8e56c372..c36270d81498e 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php @@ -11,15 +11,21 @@ namespace Symfony\Component\Messenger\Bridge\Beanstalkd\Tests\Transport; -use Pheanstalk\Contract\PheanstalkInterface; +use Pheanstalk\Contract\PheanstalkManagerInterface; +use Pheanstalk\Contract\PheanstalkPublisherInterface; +use Pheanstalk\Contract\PheanstalkSubscriberInterface; use Pheanstalk\Exception; use Pheanstalk\Exception\ClientException; use Pheanstalk\Exception\DeadlineSoonException; use Pheanstalk\Exception\ServerException; -use Pheanstalk\Job; -use Pheanstalk\JobId; use Pheanstalk\Pheanstalk; -use Pheanstalk\Response\ArrayResponse; +use Pheanstalk\Values\Job; +use Pheanstalk\Values\JobId; +use Pheanstalk\Values\JobState; +use Pheanstalk\Values\JobStats; +use Pheanstalk\Values\TubeList; +use Pheanstalk\Values\TubeName; +use Pheanstalk\Values\TubeStats; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection; use Symfony\Component\Messenger\Exception\InvalidArgumentException as MessengerInvalidArgumentException; @@ -124,7 +130,7 @@ public function testItThrowsAnExceptionIfAnExtraOptionIsDefinedInDSN() public function testGet() { - $id = 1234; + $id = '1234'; $beanstalkdEnvelope = [ 'body' => 'foo', 'headers' => 'bar', @@ -133,17 +139,20 @@ public function testGet() $tube = 'baz'; $timeout = 44; - $job = new Job($id, json_encode($beanstalkdEnvelope)); + $tubeList = new TubeList($tubeName = new TubeName($tube), $tubeNameDefault = new TubeName('default')); + $job = new Job(new JobId($id), json_encode($beanstalkdEnvelope)); $client = $this->createMock(PheanstalkInterface::class); - $client->expects($this->once())->method('watchOnly')->with($tube)->willReturn($client); + $client->expects($this->once())->method('watch')->with($tubeName)->willReturn(2); + $client->expects($this->once())->method('listTubesWatched')->willReturn($tubeList); + $client->expects($this->once())->method('ignore')->with($tubeNameDefault)->willReturn(1); $client->expects($this->once())->method('reserveWithTimeout')->with($timeout)->willReturn($job); $connection = new Connection(['tube_name' => $tube, 'timeout' => $timeout], $client); $envelope = $connection->get(); - $this->assertSame((string) $id, $envelope['id']); + $this->assertSame($id, $envelope['id']); $this->assertSame($beanstalkdEnvelope['body'], $envelope['body']); $this->assertSame($beanstalkdEnvelope['headers'], $envelope['headers']); } @@ -154,7 +163,9 @@ public function testGetWhenThereIsNoJobInTheTube() $timeout = 44; $client = $this->createMock(PheanstalkInterface::class); - $client->expects($this->once())->method('watchOnly')->with($tube)->willReturn($client); + $client->expects($this->once())->method('watch')->with(new TubeName($tube))->willReturn(1); + $client->expects($this->never())->method('listTubesWatched'); + $client->expects($this->never())->method('ignore'); $client->expects($this->once())->method('reserveWithTimeout')->with($timeout)->willReturn(null); $connection = new Connection(['tube_name' => $tube, 'timeout' => $timeout], $client); @@ -170,7 +181,9 @@ public function testGetWhenABeanstalkdExceptionOccurs() $exception = new DeadlineSoonException('foo error'); $client = $this->createMock(PheanstalkInterface::class); - $client->expects($this->once())->method('watchOnly')->with($tube)->willReturn($client); + $client->expects($this->once())->method('watch')->with(new TubeName($tube))->willReturn(1); + $client->expects($this->never())->method('listTubesWatched'); + $client->expects($this->never())->method('ignore'); $client->expects($this->once())->method('reserveWithTimeout')->with($timeout)->willThrowException($exception); $connection = new Connection(['tube_name' => $tube, 'timeout' => $timeout], $client); @@ -181,35 +194,35 @@ public function testGetWhenABeanstalkdExceptionOccurs() public function testAck() { - $id = 123456; + $id = '123456'; $tube = 'xyz'; $client = $this->createMock(PheanstalkInterface::class); - $client->expects($this->once())->method('useTube')->with($tube)->willReturn($client); + $client->expects($this->once())->method('useTube')->with(new TubeName($tube)); $client->expects($this->once())->method('delete')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id)); $connection = new Connection(['tube_name' => $tube], $client); - $connection->ack((string) $id); + $connection->ack($id); } public function testAckWhenABeanstalkdExceptionOccurs() { - $id = 123456; + $id = '123456'; $tube = 'xyzw'; $exception = new ServerException('baz error'); $client = $this->createMock(PheanstalkInterface::class); - $client->expects($this->once())->method('useTube')->with($tube)->willReturn($client); + $client->expects($this->once())->method('useTube')->with(new TubeName($tube)); $client->expects($this->once())->method('delete')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception); $connection = new Connection(['tube_name' => $tube], $client); $this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception)); - $connection->ack((string) $id); + $connection->ack($id); } /** @@ -219,66 +232,66 @@ public function testAckWhenABeanstalkdExceptionOccurs() */ public function testReject(bool $buryOnReject, bool $forceDelete) { - $id = 123456; + $id = '123456'; $tube = 'baz'; $client = $this->createMock(PheanstalkInterface::class); - $client->expects($this->once())->method('useTube')->with($tube)->willReturn($client); + $client->expects($this->once())->method('useTube')->with(new TubeName($tube)); $client->expects($this->once())->method('delete')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id)); $connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => $buryOnReject], $client); - $connection->reject((string) $id, null, $forceDelete); + $connection->reject($id, null, $forceDelete); } public function testRejectWithBury() { - $id = 123456; + $id = '123456'; $tube = 'baz'; $client = $this->createMock(PheanstalkInterface::class); - $client->expects($this->once())->method('useTube')->with($tube)->willReturn($client); + $client->expects($this->once())->method('useTube')->with(new TubeName($tube)); $client->expects($this->once())->method('bury')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id), 1024); $connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => true], $client); - $connection->reject((string) $id); + $connection->reject($id); } public function testRejectWithBuryAndPriority() { - $id = 123456; + $id = '123456'; $priority = 2; $tube = 'baz'; $client = $this->createMock(PheanstalkInterface::class); - $client->expects($this->once())->method('useTube')->with($tube)->willReturn($client); + $client->expects($this->once())->method('useTube')->with(new TubeName($tube)); $client->expects($this->once())->method('bury')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id), $priority); $connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => true], $client); - $connection->reject((string) $id, $priority); + $connection->reject($id, $priority); } public function testRejectWhenABeanstalkdExceptionOccurs() { - $id = 123456; + $id = '123456'; $tube = 'baz123'; $exception = new ServerException('baz error'); $client = $this->createMock(PheanstalkInterface::class); - $client->expects($this->once())->method('useTube')->with($tube)->willReturn($client); + $client->expects($this->once())->method('useTube')->with(new TubeName($tube)); $client->expects($this->once())->method('delete')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception); $connection = new Connection(['tube_name' => $tube], $client); $this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception)); - $connection->reject((string) $id); + $connection->reject($id); } public function testMessageCount() @@ -287,10 +300,11 @@ public function testMessageCount() $count = 51; - $response = new ArrayResponse('OK', ['current-jobs-ready' => $count]); + $response = new TubeStats($tubeName = new TubeName($tube), 0, 51, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0); $client = $this->createMock(PheanstalkInterface::class); - $client->expects($this->once())->method('statsTube')->with($tube)->willReturn($response); + $client->expects($this->once())->method('useTube')->with($tubeName); + $client->expects($this->once())->method('statsTube')->with($tubeName)->willReturn($response); $connection = new Connection(['tube_name' => $tube], $client); @@ -304,7 +318,7 @@ public function testMessageCountWhenABeanstalkdExceptionOccurs() $exception = new ClientException('foobar error'); $client = $this->createMock(PheanstalkInterface::class); - $client->expects($this->once())->method('statsTube')->with($tube)->willThrowException($exception); + $client->expects($this->once())->method('statsTube')->with(new TubeName($tube))->willThrowException($exception); $connection = new Connection(['tube_name' => $tube], $client); @@ -314,24 +328,24 @@ public function testMessageCountWhenABeanstalkdExceptionOccurs() public function testMessagePriority() { - $id = 123456; + $id = '123456'; $priority = 51; $tube = 'baz'; - $response = new ArrayResponse('OK', ['pri' => $priority]); + $response = new JobStats(new JobId($id), new TubeName($tube), JobState::READY, $priority, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0); $client = $this->createMock(PheanstalkInterface::class); $client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willReturn($response); $connection = new Connection(['tube_name' => $tube], $client); - $this->assertSame($priority, $connection->getMessagePriority((string) $id)); + $this->assertSame($priority, $connection->getMessagePriority($id)); } public function testMessagePriorityWhenABeanstalkdExceptionOccurs() { - $id = 123456; + $id = '123456'; $tube = 'baz1234'; @@ -343,7 +357,7 @@ public function testMessagePriorityWhenABeanstalkdExceptionOccurs() $connection = new Connection(['tube_name' => $tube], $client); $this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception)); - $connection->getMessagePriority((string) $id); + $connection->getMessagePriority($id); } public function testSend() @@ -355,10 +369,10 @@ public function testSend() $delay = 1000; $expectedDelay = $delay / 1000; - $id = 110; + $id = '110'; $client = $this->createMock(PheanstalkInterface::class); - $client->expects($this->once())->method('useTube')->with($tube)->willReturn($client); + $client->expects($this->once())->method('useTube')->with(new TubeName($tube)); $client->expects($this->once())->method('put')->with( $this->callback(function (string $data) use ($body, $headers): bool { $expectedMessage = json_encode([ @@ -371,13 +385,13 @@ public function testSend() 1024, $expectedDelay, 90 - )->willReturn(new Job($id, 'foobar')); + )->willReturn(new Job(new JobId($id), 'foobar')); $connection = new Connection(['tube_name' => $tube], $client); $returnedId = $connection->send($body, $headers, $delay); - $this->assertSame($id, (int) $returnedId); + $this->assertSame($id, $returnedId); } public function testSendWithPriority() @@ -390,10 +404,10 @@ public function testSendWithPriority() $priority = 2; $expectedDelay = $delay / 1000; - $id = 110; + $id = '110'; $client = $this->createMock(PheanstalkInterface::class); - $client->expects($this->once())->method('useTube')->with($tube)->willReturn($client); + $client->expects($this->once())->method('useTube')->with(new TubeName($tube)); $client->expects($this->once())->method('put')->with( $this->callback(function (string $data) use ($body, $headers): bool { $expectedMessage = json_encode([ @@ -406,13 +420,13 @@ public function testSendWithPriority() $priority, $expectedDelay, 90 - )->willReturn(new Job($id, 'foobar')); + )->willReturn(new Job(new JobId($id), 'foobar')); $connection = new Connection(['tube_name' => $tube], $client); $returnedId = $connection->send($body, $headers, $delay, $priority); - $this->assertSame($id, (int) $returnedId); + $this->assertSame($id, $returnedId); } public function testSendWhenABeanstalkdExceptionOccurs() @@ -427,7 +441,7 @@ public function testSendWhenABeanstalkdExceptionOccurs() $exception = new Exception('foo bar'); $client = $this->createMock(PheanstalkInterface::class); - $client->expects($this->once())->method('useTube')->with($tube)->willReturn($client); + $client->expects($this->once())->method('useTube')->with(new TubeName($tube)); $client->expects($this->once())->method('put')->with( $this->callback(function (string $data) use ($body, $headers): bool { $expectedMessage = json_encode([ @@ -451,35 +465,35 @@ public function testSendWhenABeanstalkdExceptionOccurs() public function testKeepalive() { - $id = 123456; + $id = '123456'; $tube = 'baz'; $client = $this->createMock(PheanstalkInterface::class); - $client->expects($this->once())->method('useTube')->with($tube)->willReturn($client); + $client->expects($this->once())->method('useTube')->with(new TubeName($tube)); $client->expects($this->once())->method('touch')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id)); $connection = new Connection(['tube_name' => $tube], $client); - $connection->keepalive((string) $id); + $connection->keepalive($id); } public function testKeepaliveWhenABeanstalkdExceptionOccurs() { - $id = 123456; + $id = '123456'; $tube = 'baz123'; $exception = new ServerException('baz error'); $client = $this->createMock(PheanstalkInterface::class); - $client->expects($this->once())->method('useTube')->with($tube)->willReturn($client); + $client->expects($this->once())->method('useTube')->with(new TubeName($tube)); $client->expects($this->once())->method('touch')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception); $connection = new Connection(['tube_name' => $tube], $client); $this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception)); - $connection->keepalive((string) $id); + $connection->keepalive($id); } public function testSendWithRoundedDelay() @@ -491,7 +505,7 @@ public function testSendWithRoundedDelay() $expectedDelay = 0; $client = $this->createMock(PheanstalkInterface::class); - $client->expects($this->once())->method('useTube')->with($tube)->willReturn($client); + $client->expects($this->once())->method('useTube')->with(new TubeName($tube)); $client->expects($this->once())->method('put')->with( $this->anything(), $this->anything(), @@ -503,3 +517,7 @@ public function testSendWithRoundedDelay() $connection->send($body, $headers, $delay); } } + +interface PheanstalkInterface extends PheanstalkPublisherInterface, PheanstalkSubscriberInterface, PheanstalkManagerInterface +{ +} diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php index 8b2b5f67ba821..232d8596336cf 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php @@ -11,11 +11,16 @@ namespace Symfony\Component\Messenger\Bridge\Beanstalkd\Transport; -use Pheanstalk\Contract\PheanstalkInterface; +use Pheanstalk\Contract\PheanstalkManagerInterface; +use Pheanstalk\Contract\PheanstalkPublisherInterface; +use Pheanstalk\Contract\PheanstalkSubscriberInterface; +use Pheanstalk\Contract\SocketFactoryInterface; use Pheanstalk\Exception; -use Pheanstalk\Job as PheanstalkJob; -use Pheanstalk\JobId; +use Pheanstalk\Exception\ConnectionException; use Pheanstalk\Pheanstalk; +use Pheanstalk\Values\Job as PheanstalkJob; +use Pheanstalk\Values\JobId; +use Pheanstalk\Values\TubeName; use Symfony\Component\Messenger\Exception\InvalidArgumentException; use Symfony\Component\Messenger\Exception\TransportException; @@ -29,13 +34,13 @@ class Connection { private const DEFAULT_OPTIONS = [ - 'tube_name' => PheanstalkInterface::DEFAULT_TUBE, + 'tube_name' => 'default', 'timeout' => 0, 'ttr' => 90, 'bury_on_reject' => false, ]; - private string $tube; + private TubeName $tube; private int $timeout; private int $ttr; private bool $buryOnReject; @@ -52,10 +57,10 @@ class Connection */ public function __construct( private array $configuration, - private PheanstalkInterface $client, + private PheanstalkSubscriberInterface&PheanstalkPublisherInterface&PheanstalkManagerInterface $client, ) { $this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration); - $this->tube = $this->configuration['tube_name']; + $this->tube = new TubeName($this->configuration['tube_name']); $this->timeout = $this->configuration['timeout']; $this->ttr = $this->configuration['ttr']; $this->buryOnReject = $this->configuration['bury_on_reject']; @@ -69,7 +74,7 @@ public static function fromDsn(#[\SensitiveParameter] string $dsn, array $option $connectionCredentials = [ 'host' => $components['host'], - 'port' => $components['port'] ?? PheanstalkInterface::DEFAULT_PORT, + 'port' => $components['port'] ?? SocketFactoryInterface::DEFAULT_PORT, ]; $query = []; @@ -113,7 +118,7 @@ public function getConfiguration(): array public function getTube(): string { - return $this->tube; + return (string) $this->tube; } /** @@ -124,27 +129,26 @@ public function getTube(): string */ public function send(string $body, array $headers, int $delay = 0, ?int $priority = null): string { - $message = json_encode([ - 'body' => $body, - 'headers' => $headers, - ]); - - if (false === $message) { - throw new TransportException(json_last_error_msg()); + try { + $message = json_encode([ + 'body' => $body, + 'headers' => $headers, + ], \JSON_THROW_ON_ERROR); + } catch (\JsonException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); } - try { - $job = $this->client->useTube($this->tube)->put( + return $this->withReconnect(function () use ($message, $delay, $priority) { + $this->client->useTube($this->tube); + $job = $this->client->put( $message, - $priority ?? PheanstalkInterface::DEFAULT_PRIORITY, + $priority ?? PheanstalkPublisherInterface::DEFAULT_PRIORITY, (int) ($delay / 1000), $this->ttr ); - } catch (Exception $exception) { - throw new TransportException($exception->getMessage(), 0, $exception); - } - return (string) $job->getId(); + return $job->getId(); + }); } public function get(): ?array @@ -157,10 +161,14 @@ public function get(): ?array $data = $job->getData(); - $beanstalkdEnvelope = json_decode($data, true); + try { + $beanstalkdEnvelope = json_decode($data, true, flags: \JSON_THROW_ON_ERROR); + } catch (\JsonException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } return [ - 'id' => (string) $job->getId(), + 'id' => $job->getId(), 'body' => $beanstalkdEnvelope['body'], 'headers' => $beanstalkdEnvelope['headers'], ]; @@ -168,64 +176,79 @@ public function get(): ?array private function getFromTube(): ?PheanstalkJob { - try { - return $this->client->watchOnly($this->tube)->reserveWithTimeout($this->timeout); - } catch (Exception $exception) { - throw new TransportException($exception->getMessage(), 0, $exception); - } + return $this->withReconnect(function () { + if ($this->client->watch($this->tube) > 1) { + foreach ($this->client->listTubesWatched() as $tube) { + if ((string) $tube !== (string) $this->tube) { + $this->client->ignore($tube); + } + } + } + + return $this->client->reserveWithTimeout($this->timeout); + }); } public function ack(string $id): void { - try { - $this->client->useTube($this->tube)->delete(new JobId((int) $id)); - } catch (Exception $exception) { - throw new TransportException($exception->getMessage(), 0, $exception); - } + $this->withReconnect(function () use ($id) { + $this->client->useTube($this->tube); + $this->client->delete(new JobId($id)); + }); } public function reject(string $id, ?int $priority = null, bool $forceDelete = false): void { - try { + $this->withReconnect(function () use ($id, $priority, $forceDelete) { + $this->client->useTube($this->tube); + if (!$forceDelete && $this->buryOnReject) { - $this->client->useTube($this->tube)->bury(new JobId((int) $id), $priority ?? PheanstalkInterface::DEFAULT_PRIORITY); + $this->client->bury(new JobId($id), $priority ?? PheanstalkPublisherInterface::DEFAULT_PRIORITY); } else { - $this->client->useTube($this->tube)->delete(new JobId((int) $id)); + $this->client->delete(new JobId($id)); } - } catch (Exception $exception) { - throw new TransportException($exception->getMessage(), 0, $exception); - } + }); } public function keepalive(string $id): void { - try { - $this->client->useTube($this->tube)->touch(new JobId((int) $id)); - } catch (Exception $exception) { - throw new TransportException($exception->getMessage(), 0, $exception); - } + $this->withReconnect(function () use ($id) { + $this->client->useTube($this->tube); + $this->client->touch(new JobId($id)); + }); } public function getMessageCount(): int { - try { + return $this->withReconnect(function () { $this->client->useTube($this->tube); $tubeStats = $this->client->statsTube($this->tube); - } catch (Exception $exception) { - throw new TransportException($exception->getMessage(), 0, $exception); - } - return (int) $tubeStats['current-jobs-ready']; + return $tubeStats->currentJobsReady; + }); } public function getMessagePriority(string $id): int + { + return $this->withReconnect(function () use ($id) { + $jobStats = $this->client->statsJob(new JobId($id)); + + return $jobStats->priority; + }); + } + + private function withReconnect(callable $command): mixed { try { - $jobStats = $this->client->statsJob(new JobId((int) $id)); + try { + return $command(); + } catch (ConnectionException) { + $this->client->disconnect(); + + return $command(); + } } catch (Exception $exception) { throw new TransportException($exception->getMessage(), 0, $exception); } - - return (int) $jobStats['pri']; } } diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json index 2c25279b4177d..bed9817cedab3 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json @@ -13,7 +13,7 @@ ], "require": { "php": ">=8.2", - "pda/pheanstalk": "^4.0", + "pda/pheanstalk": "^5.1|^7.0", "symfony/messenger": "^7.3" }, "require-dev": {