8000 [Messenger] Fix integration with newer version of Pheanstalk · symfony/symfony@b766607 · GitHub
[go: up one dir, main page]

Skip to content

Commit b766607

Browse files
committed
[Messenger] Fix integration with newer version of Pheanstalk
1 parent 195f1f1 commit b766607

File tree

2 files changed

+134
-26
lines changed

2 files changed

+134
-26
lines changed

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php

+91-4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Pheanstalk\Contract\PheanstalkSubscriberInterface;
1717
use Pheanstalk\Exception;
1818
use Pheanstalk\Exception\ClientException;
19+
use Pheanstalk\Exception\ConnectionException;
1920
use Pheanstalk\Exception\DeadlineSoonException;
2021
use Pheanstalk\Exception\ServerException;
2122
use Pheanstalk\Pheanstalk;
@@ -131,6 +132,7 @@ public function testItThrowsAnExceptionIfAnExtraOptionIsDefinedInDSN()
131132
public function testGet()
132133
{
133134
$id = '1234';
135+
$id2 = '1235';
134136
$beanstalkdEnvelope = [
135137
'body' => 'foo',
136138
'headers' => 'bar',
@@ -140,13 +142,52 @@ public function testGet()
140142
$timeout = 44;
141143

142144
$tubeList = new TubeList($tubeName = new TubeName($tube), $tubeNameDefault = new TubeName('default'));
143-
$job = new Job(new JobId($id), json_encode($beanstalkdEnvelope));
144145

145146
$client = $this->createMock(PheanstalkInterface::class);
146147
$client->expects($this->once())->method('watch')->with($tubeName)->willReturn(2);
147148
$client->expects($this->once())->method('listTubesWatched')->willReturn($tubeList);
148149
$client->expects($this->once())->method('ignore')->with($tubeNameDefault)->willReturn(1);
149-
$client->expects($this->once())->method('reserveWithTimeout')->with($timeout)->willReturn($job);
150+
$client->expects($this->exactly(2))->method('reserveWithTimeout')->with($timeout)->willReturnOnConsecutiveCalls(
151+
new Job(new JobId($id), json_encode($beanstalkdEnvelope)),
152+
new Job(new JobId($id2), json_encode($beanstalkdEnvelope)),
153+
);
154+
155+
$connection = new Connection(['tube_name' => $tube, 'timeout' => $timeout], $client);
156+
157+
$envelope = $connection->get();
158+
159+
$this->assertSame($id, $envelope['id']);
160+
$this->assertSame($beanstalkdEnvelope['body'], $envelope['body']);
161+
$this->assertSame($beanstalkdEnvelope['headers'], $envelope['headers']);
162+
163+
$envelope = $connection->get();
164+
165+
$this->assertSame($id2, $envelope['id']);
166+
$this->assertSame($beanstalkdEnvelope['body'], $envelope['body']);
167+
$this->assertSame($beanstalkdEnvelope['headers'], $envelope['headers']);
168+
}
169+
170+
public function testGetOnReconnect()
171+
{
172+
$id = '1234';
173+
$beanstalkdEnvelope = [
174+
'body' => 'foo',
175+
'headers' => 'bar',
176+
];
177+
178+
$tube = 'baz';
179+
$timeout = 44;
180+
181+
$tubeList = new TubeList($tubeName = new TubeName($tube), $tubeNameDefault = new TubeName('default'));
182+
183+
$client = $this->createMock(PheanstalkInterface::class);
184+
$client->expects($this->exactly(2))->method('watch')->with($tubeName)->willReturn(2);
185+
$client->expects($this->exactly(2))->method('listTubesWatched')->willReturn($tubeList);
186+
$client->expects($this->exactly(2))->method('ignore')->with($tubeNameDefault)->willReturn(1);
187+
$client->expects($this->exactly(2))->method('reserveWithTimeout')->with($timeout)->willReturnOnConsecutiveCalls(
188+
$this->throwException(new ConnectionException('123', 'foobar')),
189+
new Job(new JobId($id), json_encode($beanstalkdEnvelope)),
190+
);
150191

151192
$connection = new Connection(['tube_name' => $tube, 'timeout' => $timeout], $client);
152193

@@ -370,10 +411,11 @@ public function testSend()
370411
$expectedDelay = $delay / 1000;
371412

372413
$id = '110';
414+
$id2 = '111';
373415

374416
$client = $this->createMock(PheanstalkInterface::class);
375417
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
376-
$client->expects($this->once())->method('put')->with(
418+
$client->expects($this->exactly(2))->method('put')->with(
377419
$this->callback(function (string $data) use ($body, $headers): bool {
378420
$expectedMessage = json_encode([
379421
'body' => $body,
@@ -385,7 +427,51 @@ public function testSend()
385427
1024,
386428
$expectedDelay,
387429
90
388-
)->willReturn(new Job(new JobId($id), 'foobar'));
430+
)->willReturnOnConsecutiveCalls(
431+
new Job(new JobId($id), 'foobar'),
432+
new Job(new JobId($id2), 'foobar'),
433+
);
434+
435+
$connection = new Connection(['tube_name' => $tube], $client);
436+
437+
$returnedId = $connection->send($body, $headers, $delay);
438+
439+
$this->assertSame($id, $returnedId);
440+
441+
$returnedId = $connection->send($body, $headers, $delay);
442+
443+
$this->assertSame($id2, $returnedId);
444+
}
445+
446+
public function testSendOnReconnect()
447+
{
448+
$tube = 'xyz';
449+
450+
$body = 'foo';
451+
$headers = ['test' => 'bar'];
452+
$delay = 1000;
453+
$expectedDelay = $delay / 1000;
454+
455+
$id = '110';
456+
457+
$client = $this->createMock(PheanstalkInterface::class);
458+
$client->expects($this->exactly(2))->method('useTube')->with(new TubeName($tube));
459+
$client->expects($this->exactly(2))->method('put')->with(
460+
$this->callback(function (string $data) use ($body, $headers): bool {
461+
$expectedMessage = json_encode([
462+
'body' => $body,
463+
'headers' => $headers,
464+
]);
465+
466+
return $expectedMessage === $data;
467+
}),
468+
1024,
469+
$expectedDelay,
470+
90
471+
)->willReturnOnConsecutiveCalls(
472+
$this->throwException(new ConnectionException('123', 'foobar')),
473+
new Job(new JobId($id), 'foobar'),
474+
);
389475

390476
$connection = new Connection(['tube_name' => $tube], $client);
391477

@@ -520,4 +606,5 @@ public function testSendWithRoundedDelay()
520606

521607
interface PheanstalkInterface extends PheanstalkPublisherInterface, PheanstalkSubscriberInterface, PheanstalkManagerInterface
522608
{
609+
public function disconnect(): void;
523610
}

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php

+43-22
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
use Pheanstalk\Exception;
1919
use Pheanstalk\Exception\ConnectionException;
2020
use Pheanstalk\Pheanstalk;
21-
use Pheanstalk\Values\Job as PheanstalkJob;
2221
use Pheanstalk\Values\JobId;
2322
use Pheanstalk\Values\TubeName;
2423
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
@@ -45,6 +44,9 @@ class Connection
4544
private int $ttr;
4645
private bool $buryOnReject;
4746

47+
private bool $usingTube = false;
48+
private bool $watchingTube = false;
49+
4850
/**
4951
* Constructor.
5052
*
@@ -139,7 +141,7 @@ public function send(string $body, array $headers, int $delay = 0, ?int $priorit
139141
}
140142

141143
return $this->withReconnect(function () use ($message, $delay, $priority) {
142-
$this->client->useTube($this->tube);
144+
$this->useTube();
143145
$job = $this->client->put(
144146
$message,
145147
$priority ?? PheanstalkPublisherInterface::DEFAULT_PRIORITY,
@@ -153,7 +155,11 @@ public function send(string $body, array $headers, int $delay = 0, ?int $priorit
153155

154156
public function get(): ?array
155157
{
156-
$job = $this->getFromTube();
158+
$job = $this->withReconnect(function () {
159+
$this->watchTube();
160+
161+
return $this->client->reserveWithTimeout($this->timeout);
162+
});
157163

158164
if (null === $job) {
159165
return null;
@@ -174,33 +180,18 @@ public function get(): ?array
174180
];
175181
}
176182

177-
private function getFromTube(): ?PheanstalkJob
178-
{
179-
return $this->withReconnect(function () {
180-
if ($this->client->watch($this->tube) > 1) {
181-
foreach ($this->client->listTubesWatched() as $tube) {
182-
if ((string) $tube !== (string) $this->tube) {
183-
$this->client->ignore($tube);
184-
}
185-
}
186-
}
187-
188-
return $this->client->reserveWithTimeout($this->timeout);
189-
});
190-
}
191-
192183
public function ack(string $id): void
193184
{
194185
$this->withReconnect(function () use ($id) {
195-
$this->client->useTube($this->tube);
186+
$this->useTube();
196187
$this->client->delete(new JobId($id));
197188
});
198189
}
199190

200191
public function reject(string $id, ?int $priority = null, bool $forceDelete = false): void
201192
{
202193
$this->withReconnect(function () use ($id, $priority, $forceDelete) {
203-
$this->client->useTube($this->tube);
194+
$this->useTube();
204195

205196
if (!$forceDelete && $this->buryOnReject) {
206197
$this->client->bury(new JobId($id), $priority ?? PheanstalkPublisherInterface::DEFAULT_PRIORITY);
@@ -213,15 +204,15 @@ public function reject(string $id, ?int $priority = null, bool $forceDelete = fa
213204
public function keepalive(string $id): void
214205
{
215206
$this->withReconnect(function () use ($id) {
216-
$this->client->useTube($this->tube);
207+
$this->useTube();
217208
$this->client->touch(new JobId($id));
218209
});
219210
}
220211

221212
public function getMessageCount(): int
222213
{
223214
return $this->withReconnect(function () {
224-
$this->client->useTube($this->tube);
215+
$this->useTube();
225216
$tubeStats = $this->client->statsTube($this->tube);
226217

227218
return $tubeStats->currentJobsReady;
@@ -237,6 +228,33 @@ public function getMessagePriority(string $id): int
237228
});
238229
}
239230

231+
private function useTube(): void
232+
{
233+
if ($this->usingTube) {
234+
return;
235+
}
236+
237+
$this->client->useTube($this->tube);
238+
$this->usingTube = true;
239+
}
240+
241+
private function watchTube(): void
242+
{
243+
if ($this->watchingTube) {
244+
return;
245+
}
246+
247+
if ($this->client->watch($this->tube) > 1) {
248+
foreach ($this->client->listTubesWatched() as $tube) {
249+
if ((string) $tube !== (string) $this->tube) {
250+
$this->client->ignore($tube);
251+
}
252+
}
253+
}
254+
255+
$this->watchingTube = true;
256+
}
257+
240258
private function withReconnect(callable $command): mixed
241259
{
242260
try {
@@ -245,6 +263,9 @@ private function withReconnect(callable $command): mixed
245263
} catch (ConnectionException) {
246264
$this->client->disconnect();
247265

266+
$this->usingTube = false;
267+
$this->watchingTube = false;
268+
248269
return $command();
249270
}
250271
} catch (Exception $exception) {

0 commit comments

Comments
 (0)
0