8000 [messenger] Publish with a routing key coming from exchange configura… · symfony/symfony@07d98cd · GitHub
[go: up one dir, main page]

Skip to content

Commit 07d98cd

Browse files
author
Guillaume Gammelin
committed
[messenger] Publish with a routing key coming from exchange configuration.
1 parent f8aca28 commit 07d98cd

File tree

5 files changed

+29
-20
lines changed

5 files changed

+29
-20
lines changed

UPGRADE-4.3.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ Messenger
9191

9292
* `Amqp` transport does not throw `\AMQPException` anymore, catch `TransportException` instead.
9393
* Deprecated the `LoggingMiddleware` class, pass a logger to `SendMessageMiddleware` instead.
94+
* Deprecated publishing with a routing key from queue configuration, use exchange configuration instead.
9495

9596
Routing
9697
-------

UPGRADE-5.0.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ Messenger
236236
---------
237237

238238
* The `LoggingMiddleware` class has been removed, pass a logger to `SendMessageMiddleware` instead.
239+
* Messages aren't published anymore with a routing key from queue configuration. Use exchange configuration instead.
239240

240241
Monolog
241242
-------

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ CHANGELOG
5151
and queues by default. Previously, this was done when in "debug" mode
5252
only. Pass the `auto_setup` connection option to control this.
5353
* Added a `SetupTransportsCommand` command to setup the transports
54-
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message
55-
publishing.
54+
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
55+
* Deprecated publishing with a routing key from queue configuration, use exchange configuration instead.
5656

5757
4.2.0
5858
-----

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,11 @@ public function testItSetupsTheConnectionByDefault()
207207

208208
$amqpExchange->method('getName')->willReturn('exchange_name');
209209
$amqpExchange->expects($this->once())->method('declareExchange');
210+
$amqpExchange->expects($this->once())->method('publish')->with('body', 'exchange_key', AMQP_NOPARAM, ['headers' => []]);
210211
$amqpQueue->expects($this->once())->method('declareQueue');
211-
$amqpQueue->expects($this->once())->method('bind')->with('exchange_name', 'my_key');
212+
$amqpQueue->expects($this->once())->method('bind')->with('exchange_name', 'queue_key');
212213

213-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', [], $factory);
214+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=exchange_key&queue[routing_key]=queue_key', [], $factory);
214215
$connection->publish('body');
215216
}
216217

@@ -228,13 +229,13 @@ public function testItCanDisableTheSetup()
228229
$amqpQueue->expects($this->never())->method('declareQueue');
229230
$amqpQueue->expects($this->never())->method('bind');
230231

231-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto_setup' => 'false'], $factory);
232+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key&queue[routing_key]=my_key', ['auto_setup' => 'false'], $factory);
232233
$connection->publish('body');
233234

234-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto_setup' => false], $factory);
235+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key&queue[routing_key]=my_key', ['auto_setup' => false], $factory);
235236
$connection->publish('body');
236237

237-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto_setup=false', [], $factory);
238+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key&queue[routing_key]=my_key&auto_setup=false', [], $factory);
238239
$connection->publish('body');
239240
}
240241

@@ -368,7 +369,7 @@ public function testItCanPublishWithTheDefaultQueueRoutingKey()
368369

369370
$amqpExchange->expects($this->once())->method('publish')->with('body', 'my_key');
370371

371-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', [], $factory);
372+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key', [], $factory);
372373
$connection->publish('body');
373374
}
374375

@@ -383,7 +384,7 @@ public function testItCanPublishWithASuppliedRoutingKey()
383384

384385
$amqpExchange->expects($this->once())->method('publish')->with('body', 'supplied_key');
385386

386-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', [], $factory);
387+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[routing_key]=my_key', [], $factory);
387388
$connection->publish('body', [], 0, 'supplied_key');
388389
}
389390

src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -179,18 +179,18 @@ public function publish(string $body, array $headers = [], int $delay = 0, strin
179179

180180
$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM;
181181
$attributes = $this->getAttributes($headers);
182-
$routingKey = $this->getRoutingKey($routingKey);
182+
$routingKey = $routingKey ?? $this->getExchangeRoutinKey();
183183

184184
$this->exchange()->publish($body, $routingKey, $flags, $attributes);
185185
}
186186

187187
/**
188188
* @throws \AMQPException
189189
*/
190-
private function publishWithDelay(string $body, array $headers, int $delay, ?string $routingKey)
190+
private function publishWithDelay(string $body, array $headers, int $delay, ?string $exchangeRoutingKey)
191191
{
192192
if ($this->shouldSetup()) {
193-
$this->setupDelay($delay, $routingKey);
193+
$this->setupDelay($delay, $exchangeRoutingKey);
194194
}
195195

196196
$routingKey = $this->getRoutingKeyForDelay($delay);
@@ -200,7 +200,7 @@ private function publishWithDelay(string $body, array $headers, int $delay, ?str
200200
$this->getDelayExchange()->publish($body, $routingKey, $flags, $attributes);
201201
}
202202

203-
private function setupDelay(int $delay, ?string $routingKey)
203+
private function setupDelay(int $delay, ?string $exchangeRoutingKey)
204204
{
205205
if (!$this->channel()->isConnected()) {
206206
$this->clear();
@@ -209,7 +209,7 @@ private function setupDelay(int $delay, ?string $routingKey)
209209
$exchange = $this->getDelayExchange();
210210
$exchange->declareExchange();
211211

212-
$queue = $this->createDelayQueue($delay, $routingKey);
212+
$queue = $this->createDelayQueue($delay, $exchangeRoutingKey);
213213
$queue->declareQueue();
214214
$queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay));
215215
}
@@ -234,7 +234,7 @@ private function getDelayExchange(): \AMQPExchange
234234
* which is the original exchange, resulting on it being put back into
< 10000 code>235235
* the original queue.
236236
*/
237-
private function createDelayQueue(int $delay, ?string $routingKey)
237+
private function createDelayQueue(int $delay, ?string $exchangeRoutingKey)
238238
{
239239
$delayConfiguration = $this->connectionConfiguration['delay'];
240240

@@ -245,10 +245,10 @@ private function createDelayQueue(int $delay, ?string $routingKey)
245245
'x-dead-letter-exchange' => $this->exchange()->getName(),
246246
]);
247247

248-
$routingKey = $this->getRoutingKey($routingKey);
249-
if ($routingKey) {
248+
$exchangeRoutingKey = $exchangeRoutingKey ?? $this->getExchangeRoutinKey();
249+
if (null !== $exchangeRoutingKey) {
250250
// after being released from to DLX, this routing key will be used
251-
$queue->setArgument('x-dead-letter-routing-key', $routingKey);
251+
$queue->setArgument('x-dead-letter-routing-key', $exchangeRoutingKey);
252252
}
253253

254254
return $queue;
@@ -391,8 +391,14 @@ private function getAttributes(array $headers): array
391391
return array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]);
392392
}
393393

394-
private function getRoutingKey(?string $expected = ''): ?string
394+
private function getExchangeRoutinKey(): ?string
395395
{
396-
return $expected ?: ($this->queueConfiguration['routing_key'] ?? null);
396+
$routingKey = $this->exchangeConfiguration['routing_key'] ?? null;
397+
if (null === $routingKey && isset($this->queueConfiguration['routing_key'])) {
398+
$routingKey = $this->queueConfiguration['routing_key'];
399+
@trigger_error('Setting routing key from queue configuration is deprecated. Use exchange configuration instead.', E_USER_DEPRECATED);
400+
}
401+
402+
return $routingKey;
397403
}
398404
}

0 commit comments

Comments
 (0)
0