From a997be30b1a07a8c6fae92c9e3af404bfcde9295 Mon Sep 17 00:00:00 2001 From: Viktor Masicek Date: Fri, 13 Sep 2024 10:41:47 +0200 Subject: [PATCH] WIP --- src/BackgroundQueue.php | 17 +++++++++++------ src/Broker/PhpAmqpLib/Consumer.php | 8 ++++---- src/Broker/PhpAmqpLib/Manager.php | 17 +++++++++++++++-- src/Broker/PhpAmqpLib/Producer.php | 13 ++++++++----- 4 files changed, 38 insertions(+), 17 deletions(-) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index 7640580..c53fd49 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -74,6 +74,9 @@ public function __construct(array $config) if (min($config['priorities']) < 0 || max($config['priorities']) > 999) { throw new Exception('There are value out of range 0-999 in priorities list: ' . implode(',', $config['priorities'])); } + if (!isset($config['consumerId'])) { + $config['consumerId'] = '0'; + } if (!isset($config['bulkSize'])) { $config['bulkSize'] = 1; } @@ -284,15 +287,12 @@ public function process($entity, string $queue, int $priority): void } catch (Throwable $e) {} } - if ($e instanceof DieException && $e->getPrevious()) { - $e = $e->getPrevious(); // Dále se řídíme podle té, kvůli které to vzniklo - $this->shouldDie = true; - } - switch (true) { case $e instanceof SkipException: break; - case $e instanceof DieException: // Pokud to došlo sem, tak ta DieException nemá $e->getPrevious(), takže ji označíme jako STATE_PERMANENTLY_FAILED + case $e instanceof DieException: + // Záměrně propadávací case - pokud chceme ukončit consumera, chceme proces označit za permanentní chybu + $this->shouldDie = true; case $e instanceof PermanentErrorException: case $e instanceof TypeError: $state = BackgroundJob::STATE_PERMANENTLY_FAILED; @@ -368,6 +368,11 @@ public function getConfig(): array return $this->config; } + public function getConsumerId(): string + { + return $this->config['consumerId']; + } + /** * @internal diff --git a/src/Broker/PhpAmqpLib/Consumer.php b/src/Broker/PhpAmqpLib/Consumer.php index ead4d9a..7b0213b 100644 --- a/src/Broker/PhpAmqpLib/Consumer.php +++ b/src/Broker/PhpAmqpLib/Consumer.php @@ -26,16 +26,16 @@ public function consume(string $queue, array $priorities): void // TODO Do budoucna cheme podporovat libovolné priority a ne pouze jejich výčet. // Zde si musíme vytáhnout seznam existujících front. To lze přes HTTP API pomocí CURL. - // Nejprve se chceme kouknout, jestli není zaslána zpráva k ukončení, proto na první místo dáme TOP_PRIORITY frontu. - array_unshift($priorities, Manager::QUEUE_TOP_PRIORITY); + // Nejprve se chceme kouknout, jestli není zaslána zpráva k ukončení, proto na první místo dáme vyhrazenou frontu pro aktuálního consumera + $queueDedicated = $this->manager->getQueueDedicated($queue); + $this->manager->createQueueWithExchange($queueDedicated, $queueDedicated); // Sestavíme si seznam názvů front v RabbitMQ (tedy včetně priorit) a všechny inicializujeme $queuesWithPriorities = []; foreach ($priorities as $priority) { $queueWithPriority = $this->manager->getQueueWithPriority($queue, $priority); $queuesWithPriorities[] = $queueWithPriority; - $this->manager->createExchange($queueWithPriority); - $this->manager->createQueue($queueWithPriority, $queueWithPriority); + $this->manager->createQueueWithExchange($queueWithPriority, $queueWithPriority); } $this->manager->setupQos(); diff --git a/src/Broker/PhpAmqpLib/Manager.php b/src/Broker/PhpAmqpLib/Manager.php index 9a16836..974e190 100644 --- a/src/Broker/PhpAmqpLib/Manager.php +++ b/src/Broker/PhpAmqpLib/Manager.php @@ -2,6 +2,7 @@ namespace ADT\BackgroundQueue\Broker\PhpAmqpLib; +use ADT\BackgroundQueue\BackgroundQueue; use Exception; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AMQPStreamConnection; @@ -9,10 +10,10 @@ class Manager { - const QUEUE_TOP_PRIORITY = 0; private array $connectionParams; private array $queueParams; + private BackgroundQueue $backgroundQueue; private ?AMQPStreamConnection $connection = null; private ?AMQPChannel $channel = null; @@ -22,10 +23,11 @@ class Manager private array $initExchanges; private bool $initQos = false; - public function __construct(array $connectionParams, array $queueParams) + public function __construct(array $connectionParams, array $queueParams, BackgroundQueue $backgroundQueue) { $this->connectionParams = $connectionParams; $this->queueParams = $queueParams; + $this->backgroundQueue = $backgroundQueue; } private function getConnection(): AMQPStreamConnection @@ -126,6 +128,12 @@ public function createQueue(string $queue, ?string $exchange = null, array $addi $this->initQueues[$queue] = true; } + public function createQueueWithExchange(string $queue, string $exchange) { + $this->createExchange($exchange); + $this->createQueue($queue, $exchange); + } + + public function setupQos() { if ($this->initQos) { @@ -146,6 +154,11 @@ public function getQueueWithPriority(string $queue, int $priority): string return $queue . '_' . $priority; } + public function getQueueDedicated(string $queue): string + { + return $queue . '_ded-' . $this->backgroundQueue->getConsumerId(); + } + public function parseQueueAndPriority(string $queueWithPriority): array { $parts = explode('_', $queueWithPriority); diff --git a/src/Broker/PhpAmqpLib/Producer.php b/src/Broker/PhpAmqpLib/Producer.php index 4495d11..01a5323 100644 --- a/src/Broker/PhpAmqpLib/Producer.php +++ b/src/Broker/PhpAmqpLib/Producer.php @@ -18,13 +18,16 @@ public function __construct( Manager $manager) $this->manager = $manager; } - public function publish(string $id, string $queue, int $priority, ?int $expiration = null): void - { + public function publish(string $id, string $queue, int $priority, ?int $expiration = null): void { $queue = $this->manager->getQueueWithPriority($queue, $priority); + $this->publishToBroker($id, $queue, $expiration); + } + + private function publishToBroker(string $id, string $queue, ?int $expiration = null): void + { $exchange = $queue; - $this->manager->createExchange($exchange); - $this->manager->createQueue($queue, $exchange); + $this->manager->createQueueWithExchange($queue, $exchange); if ($expiration) { $additionalArguments = [ 'x-dead-letter-exchange' => ['S', $exchange], @@ -48,7 +51,7 @@ public function publish(string $id, string $queue, int $priority, ?int $expirati public function publishDie(string $queue): void { - $this->publish(self::DIE, $queue, Manager::QUEUE_TOP_PRIORITY); + $this->publishToBroker(self::DIE, $this->manager->getQueueDedicated($queue)); } private function createMessage(string $body): AMQPMessage