8000 Port the blocking consume solution to 5.4.45 by githoober · Pull Request #58674 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

Port the blocking consume solution to 5.4.45 #58674

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Add multiple bindings support
  • Loading branch information
Oleg Namaka committed Oct 30, 2024
commit 906b9de504a0a7ad7ab092aa8cab446b0b42f6f5
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,21 @@ class Connection
];

private const AVAILABLE_QUEUE_OPTIONS = [
'flags',
'arguments',
];

private const NEW_QUEUE_OPTIONS = [
'bindings',
];

private const DEPRECATED_BINDING_KEYS = [
'binding_keys',
'binding_arguments',
'flags',
];

private const AVAILABLE_BINDINGS_OPTIONS = [
'key',
'arguments',
];

Expand Down Expand Up @@ -145,8 +157,11 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
* * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional.
* * confirm_timeout: Timeout in seconds for confirmation, if none specified transport will not wait for message confirmation. Note: 0 or greater seconds. May be fractional.
* * queues[name]: An array of queues, keyed by the name
* * binding_keys: The binding keys (if any) to bind to this queue
* * binding_arguments: Arguments to be used while binding the queue.
* * binding_keys: The binding keys (if any) to bind to this queue (Usage is deprecated. See 'bindings')
* * binding_arguments: Arguments to be used while binding the queue. (Usage is deprecated. See 'bindings')
* * bindings[name]: An array of bindings for this queue, keyed by the name
* * key: The binding key (if any) to bind to this queue
* * arguments: An array of arguments to be used while binding the queue.
* * flags: Queue flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * exchange:
Expand Down Expand Up @@ -261,9 +276,24 @@ private static function validateOptions(array $options): void
continue;
}

if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) {
if (0 < \count($deprecatedQueueOptions = array_intersect(array_keys($queue), self::DEPRECATED_BINDING_KEYS))) {
trigger_deprecation('symfony/messenger', '6.3', 'Deprecated queue option(s) "%s" passed to the AMQP Messenger transport. The "%s" option(s) should be used rather than "%s".', implode('", "', $deprecatedQueueOptions), implode('", ', self::NEW_QUEUE_OPTIONS), implode('", ', self::DEPRECATED_BINDING_KEYS));
if (0 < \count($newQueueOptions = array_intersect(array_keys($queue), self::NEW_QUEUE_OPTIONS))) {
throw new LogicException(sprintf('New "%s" and deprecated "%s" option(s) passed to the AMQP Messenger transport', implode('", "', $newQueueOptions), implode('", "', $deprecatedQueueOptions)));
}
}

if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS, self::NEW_QUEUE_OPTIONS, self::DEPRECATED_BINDING_KEYS))) {
trigger_deprecation('symfony/messenger', '5.1', 'Invalid queue option(s) "%s" passed to the AMQP Messenger transport. Passing invalid queue options is deprecated.', implode('", "', $invalidQueueOptions));
}

if (\is_array($queue['bindings'] ?? false)) {
foreach ($queue['bindings'] as $individualBinding) {
if (0 < \count(array_diff(array_keys($individualBinding), self::AVAILABLE_BINDINGS_OPTIONS))) {
throw new LogicException(sprintf("Valid options for each 'bindings' are: %s", implode(', ', self::AVAILABLE_BINDINGS_OPTIONS)));
}
}
}
}
}

Expand Down Expand Up @@ -478,9 +508,9 @@ public function pull(string $queueName, ?callable $callback): void
$this->queue($queueName)->consume($callback);
}

public function ack(\AMQPEnvelope $message, string $queueName): bool
public function ack(\AMQPEnvelope $message, string $queueName, int $flags = \AMQP_NOPARAM): bool
{
return $this->queue($queueName)->ack($message->getDeliveryTag()) ?? true;
return $this->queue($queueName)->ack($message->getDeliveryTag(), $flags);
}

public function nack(\AMQPEnvelope $message, string $queueName, int $flags = \AMQP_NOPARAM): bool
Expand All @@ -500,6 +530,12 @@ private function setupExchangeAndQueues(): void

foreach ($this->queuesOptions as $queueName => $queueConfig) {
$this->queue($queueName)->declareQueue();
foreach ($queueConfig['bindings'] ?? [] as $binding) {
$this->queue($queueName)->bind($this->exchangeOptions['name'], $binding['key'] ?? null, $binding['arguments'] ?? []);
}
if (isset($queueConfig['bindings']) && empty($queueConfig['binding_keys'])) {
continue;
}
foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) {
$this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey, $queueConfig['binding_arguments'] ?? []);
}
Expand Down
Loading
0