diff --git a/README.md b/README.md index 388a79d..019a630 100644 --- a/README.md +++ b/README.md @@ -192,7 +192,7 @@ Ve všech ostatních případech se záznam uloží jako úspěšně dokončený `background-queue:clear-finished 14` Smaže všechny úspěšně zpracované záznamy starší 14 dní. -`background-queue:reload-consumers QUEUE NUMBER` Reloadne NUMBER consumerů pro danou QUEUE. +`background-queue:reload-consumers QUEUE [LABEL1,LABEL2,...]` Reloadne consumery označené jedním z labelů, viz `background-queue:consume`. `background-queue:update-schema` Aktualizuje databázové schéma, pokud je potřeba. @@ -282,6 +282,8 @@ Ale pokud by se vyskytlo více požadavků na zasílání emailů, po nějaké d Dále máme možnost prioritu nastavenou pro callback přetížit při vkládání záznamu v metodě `publish`. Například víme, že se jedná o rozesílání newsletterů. Tedy se jedná o zasílání emailů, ale s nízkou prioritou zpracování. +Příkaz `background-queue:consume` má také volitelný parametr `-l`, kterým konzumerovi nastavíme příslušný label. Při restartu konzumerů pomocí `background-queue:reload-consumers` pak máme možnost temito labely zvolit, které konzumery chceme restartovat. + ``` $priority = null; // aplikuje se priorita 10 z nastavení pro callback if ($isNewsletter) { diff --git a/composer.json b/composer.json index 0eca2f4..d86fb9d 100644 --- a/composer.json +++ b/composer.json @@ -20,6 +20,7 @@ "require": { "php": "^7.4|^8.0", "ext-json": "*", + "ext-pdo": "*", "doctrine/dbal": "^2.0|^3.0|^4.0", "symfony/console": "^4.0|^5.0|^6.0", "psr/log": "^1.0|^2.0|^3.0", diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index c9043b1..0d7eb99 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -14,16 +14,14 @@ use Doctrine\DBAL\DriverManager; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Schema\AbstractSchemaManager; -use Doctrine\DBAL\Schema\Comparator; use Doctrine\DBAL\Schema\Schema; use Doctrine\DBAL\Schema\SchemaException; use Doctrine\DBAL\Types\Types; use Exception; use InvalidArgumentException; +use PDOException; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; -use ReflectionException; -use ReflectionMethod; use RuntimeException; use Throwable; use TypeError; @@ -33,23 +31,20 @@ class BackgroundQueue const UNEXPECTED_ERROR_MESSAGE = 'Unexpected error occurred.'; private array $config; + private bool $connectionCreated = false; private Connection $connection; private LoggerInterface $logger; private ?Producer $producer = null; - private bool $transaction = false; private int $bulkSize = 1; private array $bulkBrokerCallbacks = []; private array $bulkDatabaseEntities = []; private bool $shouldDie = false; /** - * @throws \Doctrine\DBAL\Exception + * @throws Exception */ public function __construct(array $config) { - if (is_string($config['connection'])) { - $config['connection'] = $this->parseDsn($config['connection']); - } if (empty($config['waitingJobExpiration'])) { $config['waitingJobExpiration'] = 1000; } @@ -85,7 +80,7 @@ public function __construct(array $config) } $this->config = $config; - $this->connection = DriverManager::getConnection($config['connection']); + $this->connection = $config['connection']; $this->logger = $config['logger'] ?: new NullLogger(); if ($config['producer']) { $this->producer = $config['producer']; @@ -95,6 +90,7 @@ public function __construct(array $config) /** * Bezpečně ověříme, že nedošlo ke ztrátě spojení k DB. * Pokud ano, připojíme se znovu. + * @throws \Doctrine\DBAL\Exception */ private function databaseConnectionCheckAndReconnect(): void { @@ -111,7 +107,7 @@ private function databaseConnectionCheckAndReconnect(): void $this->connection->close(); $this->connection->getNativeConnection(); } - } catch (\Exception $e) { + } catch (Exception $e) { $this->logger->log('critical', new Exception('BackgroundQueue - database connection lost (exception): ' . $e->getMessage(), 0, $e)); $this->connection->close(); $this->connection->getNativeConnection(); @@ -120,10 +116,13 @@ private function databaseConnectionCheckAndReconnect(): void } } + /** + * @throws Exception + */ private function databasePing(): bool { set_error_handler(function ($severity, $message) { - throw new \PDOException($message, $severity); + throw new PDOException($message, $severity); }); try { @@ -132,18 +131,18 @@ private function databasePing(): bool return true; - } catch (\Doctrine\DBAL\DBALException $e) { + } catch (\Doctrine\DBAL\Exception $e) { restore_error_handler(); return false; - } catch (\Exception $e) { + } catch (Exception $e) { restore_error_handler(); throw $e; } } /** - * @throws Exception + * @throws Exception|\Doctrine\DBAL\Exception */ public function publish( string $callbackName, @@ -167,8 +166,6 @@ public function publish( throw new Exception('Parameter "identifier" has to be set if "isUnique" is true.'); } - $this->checkArguments($parameters, $this->getCallback($callbackName)); - $priority = $this->getPriority($priority, $callbackName); $entity = new BackgroundJob(); @@ -195,11 +192,11 @@ public function publishToBroker(BackgroundJob $entity): void return; } - $this->bulkBrokerCallbacks[] = function() use ($entity) { + $this->bulkBrokerCallbacks[] = function () use ($entity) { try { // Pokud mám u callbacku nastavenou frontu, v DB zůstává původní, ale do brokera použiju tu z konfigu. // Tedy řadím do různých front pro různé consumery, ale z pohledu záznamů v DB se jedná o stejnou frontu. - $this->producer->publish((string) $entity->getId(), $this->getQueueForEntityIncludeCallback($entity), $entity->getPriority(), $entity->getPostponedBy()); + $this->producer->publish((string)$entity->getId(), $this->getQueueForEntityIncludeCallback($entity), $entity->getPriority(), $entity->getPostponedBy()); } catch (Exception $e) { $entity->setState(BackgroundJob::STATE_BROKER_FAILED) ->setErrorMessage($e->getMessage()); @@ -208,12 +205,15 @@ public function publishToBroker(BackgroundJob $entity): void $this->logException(self::UNEXPECTED_ERROR_MESSAGE, $entity, $e); } }; - if (!$this->transaction && count($this->bulkBrokerCallbacks) >= $this->bulkSize) { + if (!$this->connection->isTransactionActive() && count($this->bulkBrokerCallbacks) >= $this->bulkSize) { $this->doPublishToBroker(); } } - private function doPublishToBroker() + /** + * @internal + */ + public function doPublishToBroker(): void { foreach ($this->bulkBrokerCallbacks as $_closure) { $_closure(); @@ -222,14 +222,16 @@ private function doPublishToBroker() } /** - * @internal - * - * @param int|BackgroundJob $entity - * @return void + * @throws SchemaException + * @throws \Doctrine\DBAL\Exception * @throws Exception + * @internal */ public function process($entity, string $queue, int $priority): void { + // U publishera chceme transakci stejnou s flush, proto používáme stejné connection jako je v aplikaci. Ale u consumera chceme vlastní connection, aby když se revertne aplikační transakce, tak aby consumer mohl zapsat chybový stav k BackgroundJob. + $this->createConnection(); + if (!$entity instanceof BackgroundJob) { if (!$entity = $this->getEntity($entity, $queue, $priority)) { return; @@ -267,13 +269,6 @@ public function process($entity, string $queue, int $priority): void return; } - try { - $this->checkArguments($entity->getParameters(), $callback); - } catch (Exception $e) { - $this->logException($e, $entity); - return; - } - // změna stavu na zpracovává se try { $entity->setState(BackgroundJob::STATE_PROCESSING); @@ -381,8 +376,17 @@ public function process($entity, string $queue, int $priority): void } } + private function createConnection(): void + { + if (!$this->connectionCreated) { + $this->connection = DriverManager::getConnection($this->connection->getParams()); + $this->connectionCreated = true; + } + } + /** * @throws Exception + * @throws \Doctrine\DBAL\Exception */ public function getUnfinishedJobIdentifiers(array $identifiers = [], bool $excludeProcessing = false): array { @@ -426,7 +430,9 @@ public function getConfig(): array */ public function createQueryBuilder(): QueryBuilder { - $this->updateSchema(); + if ($this->config['autoUpdateSchema']) { + $this->updateSchema(); + } return $this->connection->createQueryBuilder() ->select('*') @@ -436,10 +442,10 @@ public function createQueryBuilder(): QueryBuilder } /** - * @throws Exception + * @throws Exception|\Doctrine\DBAL\Exception * @internal */ - public function fetchAll(QueryBuilder $qb, int $maxResults = null, $toEntity = true): array + public function fetchAll(QueryBuilder $qb, ?int $maxResults = null, $toEntity = true): array { $sql = $qb->setMaxResults($maxResults)->getSQL(); $parameters = $qb->getParameters(); @@ -459,61 +465,24 @@ public function fetchAll(QueryBuilder $qb, int $maxResults = null, $toEntity = t return $entities; } - public function startBulk() + public function startBulk(): void { $this->bulkSize = $this->config['bulkSize']; - if ($this->transaction) { - $this->connection->beginTransaction(); - } - } - - public function endBulk() - { - $this->doPublishToDatabase(); - if ($this->transaction) { - $this->connection->commit(); - } - $this->doPublishToBroker(); - $this->bulkSize = 1; - } - - /** - * @throws \Doctrine\DBAL\Exception - * @throws Exception - */ - public function startTransaction() - { - if ($this->transaction) { - throw new Exception('Nested transactions not implemented.'); - } - $this->transaction = true; - $this->startBulk(); } /** * @throws \Doctrine\DBAL\Exception + * @throws SchemaException */ - public function commitTransaction() + public function endBulk(): void { - $this->endBulk(); - $this->transaction = false; - } - - public function rollbackTransaction() - { - if (!$this->transaction) { - return; - } - - $this->connection->rollBack(); - $this->transaction = false; - $this->bulkDatabaseEntities = []; - $this->bulkBrokerCallbacks = []; + $this->doPublishToDatabase(); + $this->doPublishToBroker(); $this->bulkSize = 1; } /** - * @throws Exception + * @throws Exception|\Doctrine\DBAL\Exception */ private function fetch(QueryBuilder $qb): ?BackgroundJob { @@ -521,7 +490,7 @@ private function fetch(QueryBuilder $qb): ?BackgroundJob } /** - * @throws Exception + * @throws Exception|\Doctrine\DBAL\Exception */ private function count(QueryBuilder $qb): int { @@ -530,6 +499,7 @@ private function count(QueryBuilder $qb): int /** * @throws Exception + * @throws \Doctrine\DBAL\Exception */ private function isRedundant(BackgroundJob $entity): bool { @@ -549,7 +519,8 @@ private function isRedundant(BackgroundJob $entity): bool } /** - * @throws Exception + * @throws SchemaException + * @throws \Doctrine\DBAL\Exception */ private function getPreviousUnfinishedJob(BackgroundJob $entity): ?BackgroundJob { @@ -582,31 +553,33 @@ private function getPreviousUnfinishedJob(BackgroundJob $entity): ?BackgroundJob public function save(BackgroundJob $entity): void { $this->databaseConnectionCheckAndReconnect(); - $this->updateSchema(); + if ($this->config['autoUpdateSchema']) { + $this->updateSchema(); + } - try { - if (!$entity->getId()) { - if ($this->producer) { - $entity->setProcessedByBroker(true); - } + if (!$entity->getId()) { + if ($this->producer) { + $entity->setProcessedByBroker(true); + } - $this->bulkDatabaseEntities[] = $entity; + $this->bulkDatabaseEntities[] = $entity; - if (count($this->bulkDatabaseEntities) >= $this->bulkSize) { - $this->doPublishToDatabase(); - } - } else { - $this->connection->update($this->config['tableName'], $entity->getDatabaseValues(), ['id' => $entity->getId()]); + if (count($this->bulkDatabaseEntities) >= $this->bulkSize) { + $this->doPublishToDatabase(); } - } catch (Exception $e) { - $this->rollbackTransaction(); - throw $e; + } else { + $this->connection->update($this->config['tableName'], $entity->getDatabaseValues(), ['id' => $entity->getId()]); } } - private function doPublishToDatabase() { + /** + * @throws SchemaException + * @throws \Doctrine\DBAL\Exception + */ + private function doPublishToDatabase(): void + { if (empty($this->bulkDatabaseEntities)) { - return null; + return; } // Protože vkládám více záznamů najednou, potřebuju si vše uzavřít do transakce, abych si bezpečně získal seznam vložených ID a nastavil je vloženým entitám. @@ -635,16 +608,17 @@ private function doPublishToDatabase() { /** * Podporuje INSERT s více VALUES v jednom příkazu. - * Vychází z @link \Doctrine\DBAL\Connection::insert + * Vychází z @param BackgroundJob[] $entities + * @throws \Doctrine\DBAL\Exception + * @link Connection::insert * - * @param BackgroundJob[] $entities */ - private function insertMultipleEntities(array $entities) + private function insertMultipleEntities(array $entities): void { $table = $this->config['tableName']; if (empty($entities)) { - return null; + return; } $columns = array_keys($entities[0]->getDatabaseValues()); @@ -665,7 +639,11 @@ private function insertMultipleEntities(array $entities) $setToSql = implode(', ', $set); $sql = "INSERT INTO $table ($columnsToSql) VALUES $setToSql"; - return $this->connection->executeStatement($sql, $values); + if (method_exists($this->connection, 'executeStatement')) { + $this->connection->executeStatement($sql, $values); + } else { + $this->connection->executeUpdate($sql, $values); + } } /** @@ -737,7 +715,7 @@ public function updateSchema(bool $force = false): void $schemaManager = $this->createSchemaManager(); if ($schemaManager->tablesExist([$this->config['tableName']])) { $tableDiff = $schemaManager->createComparator()->compareTables($this->createSchemaManager()->introspectTable($this->config['tableName']), $table); - $sqls = $tableDiff ? $this->connection->getDatabasePlatform()->getAlterTableSQL($tableDiff) : []; + $sqls = $this->connection->getDatabasePlatform()->getAlterTableSQL($tableDiff); } else { $sqls = $this->connection->getDatabasePlatform()->getCreateTableSQL($table); } @@ -864,7 +842,8 @@ private function getEntity(int $id, string $queue, int $priority): ?BackgroundJo } /** - * @throws Exception + * @throws SchemaException + * @throws \Doctrine\DBAL\Exception */ private function checkUnfinishedJobs(BackgroundJob $entity): bool { @@ -884,110 +863,6 @@ private function checkUnfinishedJobs(BackgroundJob $entity): bool return true; } - /** - * @throws ReflectionException - */ - private function checkArguments(?array $args, $callback) - { - return; // TODO - - // Create a ReflectionFunction object based on the provided callback - $reflection = new ReflectionMethod($callback[0], $callback[1]); - - // Retrieve the parameters of the method - $params = $reflection->getParameters(); - - $builtInTypesMapping = [ - 'integer' => 'int', - 'boolean' => 'bool', - 'double' => 'float', - ]; - - if (version_compare(PHP_VERSION, '8.0', '<')) { - $requiredParams = 0; - foreach ($params as $_param) { - if (!$_param->isOptional()) { - $requiredParams++; - } - } - - // Check the number of arguments - if (count($args) < $requiredParams) { - throw new InvalidArgumentException("Number of arguments does not match."); - } - - $argsValues = array_values($args); - // For PHP lower than 8.0 - foreach ($params as $index => $param) { - $type = $param->getType(); - - $argument = $argsValues[$index]; - - // For nullable parameters - if ($param->isOptional() && is_null($argument)) { - continue; - } - - // For other parameters - if ($type) { - $expectedType = $type->getName(); - if ($type->isBuiltin()) { - $actualType = gettype($argument); - $actualType = $builtInTypesMapping[$actualType] ?? $actualType; - if (gettype($argument) !== $expectedType) { - throw new InvalidArgumentException("Parameter type does not match at index $index: expected $expectedType, got " . gettype($argument)); - } - } else { - if (!is_a($argument, $expectedType)) { - throw new InvalidArgumentException("Parameter type does not match at index $index: expected $expectedType or subtype, got " . get_class($argument)); - } - } - } - } - } else { - $paramNames = []; - foreach ($params as $param) { - $paramNames[$param->getName()] = $param->getName(); - } - - foreach ($args as $_name => $_value) { - if (!array_key_exists($_name, $paramNames)) { - throw new InvalidArgumentException("Missing parameter for the argument $_name."); - } - } - - foreach ($params as $param) { - $name = $param->getName(); - $type = $param->getType(); - - if (!array_key_exists($name, $args)) { - if (!$param->isOptional()) { - throw new InvalidArgumentException("Missing argument for the parameter $name."); - } - - continue; - } - - $argument = $args[$name]; - - if ($type) { - $expectedType = $type->getName(); - if ($type->isBuiltin()) { - $actualType = gettype($argument); - $actualType = $builtInTypesMapping[$actualType] ?? $actualType; - if ($actualType !== $expectedType) { - throw new InvalidArgumentException("Parameter $name type does not match: expected $expectedType, got " . gettype($argument)); - } - } else { - if (!is_a($argument, $expectedType)) { - throw new InvalidArgumentException("Parameter $name type does not match: expected $expectedType or subtype, got " . get_class($argument)); - } - } - } - } - } - } - private function getPriority(?int $priority, string $callbackName): int { if (is_null($priority)) { @@ -1021,10 +896,10 @@ private function getMemories(): array ]; } - public function dieIfNecessary() { + public function dieIfNecessary(): void + { if ($this->shouldDie) { die(); } } - } diff --git a/src/Broker/Consumer.php b/src/Broker/Consumer.php index 3229a39..d6a3231 100644 --- a/src/Broker/Consumer.php +++ b/src/Broker/Consumer.php @@ -4,5 +4,5 @@ interface Consumer { - public function consume(string $queue, array $priorities): void; + public function consume(string $queue, array $priorities, ?string $consumerLabel = null): void; } \ No newline at end of file diff --git a/src/Broker/PhpAmqpLib/Consumer.php b/src/Broker/PhpAmqpLib/Consumer.php index ead4d9a..2fdeb31 100644 --- a/src/Broker/PhpAmqpLib/Consumer.php +++ b/src/Broker/PhpAmqpLib/Consumer.php @@ -3,6 +3,7 @@ namespace ADT\BackgroundQueue\Broker\PhpAmqpLib; use ADT\BackgroundQueue\BackgroundQueue; +use ADT\BackgroundQueue\Console\ReloadConsumersCommand; use Exception; use PhpAmqpLib\Exception\AMQPProtocolChannelException; use PhpAmqpLib\Message\AMQPMessage; @@ -21,13 +22,12 @@ public function __construct(Manager $manager, BackgroundQueue $backgroundQueue) /** * @throws Exception */ - public function consume(string $queue, array $priorities): void + public function consume(string $queue, array $priorities, ?string $consumerLabel = null): void { // TODO Do budoucna cheme podporovat libovolné priority a ne pouze jejich výčet. // Zde si musíme vytáhnout seznam existujících front. To lze přes HTTP API pomocí CURL. - // Nejprve se chceme kouknout, jestli není zaslána zpráva k ukončení, proto na první místo dáme TOP_PRIORITY frontu. - array_unshift($priorities, Manager::QUEUE_TOP_PRIORITY); + $priorities = $this->manager->includeTopPriority($priorities, $consumerLabel); // Sestavíme si seznam názvů front v RabbitMQ (tedy včetně priorit) a všechny inicializujeme $queuesWithPriorities = []; diff --git a/src/Broker/PhpAmqpLib/Manager.php b/src/Broker/PhpAmqpLib/Manager.php index 9a16836..957137e 100644 --- a/src/Broker/PhpAmqpLib/Manager.php +++ b/src/Broker/PhpAmqpLib/Manager.php @@ -10,6 +10,7 @@ class Manager { const QUEUE_TOP_PRIORITY = 0; + const QUEUE_NAME_PARTS_DELIMITER = '_'; private array $connectionParams; private array $queueParams; @@ -141,14 +142,38 @@ public function setupQos() $this->initQos = true; } - public function getQueueWithPriority(string $queue, int $priority): string + public function getQueueWithPriority(string $queue, string $priority): string { - return $queue . '_' . $priority; + if (strpos($queue, self::QUEUE_NAME_PARTS_DELIMITER) !== false) { + throw new \Exception('Priority cannot contains "' . self::QUEUE_NAME_PARTS_DELIMITER . '".'); + } + return $queue . self::QUEUE_NAME_PARTS_DELIMITER . $priority; + } + + public function includeTopPriority(array $priorities, ?string $label = null): array + { + array_unshift($priorities, $this->getTopPriorityName($label)); + return $priorities; + } + + public function getTopPriorityName(?string $label = null): string + { + $topPriority = self::QUEUE_TOP_PRIORITY; + if (!is_null($label)) { + if (strpos($label, self::QUEUE_NAME_PARTS_DELIMITER) !== false) { + throw new \Exception('Label cannot contains "' . self::QUEUE_NAME_PARTS_DELIMITER . '".'); + } + + $topPriority .= self::QUEUE_NAME_PARTS_DELIMITER . $label; + } + + return $topPriority; } + public function parseQueueAndPriority(string $queueWithPriority): array { - $parts = explode('_', $queueWithPriority); + $parts = explode(self::QUEUE_NAME_PARTS_DELIMITER, $queueWithPriority); if (count($parts) === 2) { return [$parts[0], $parts[1]]; @@ -157,7 +182,7 @@ public function parseQueueAndPriority(string $queueWithPriority): array while (true) { $part = array_shift($parts); if (is_numeric($part)) { - return [implode('_', $nameParts), $part]; + return [implode(self::QUEUE_NAME_PARTS_DELIMITER, $nameParts), $part]; } else { $nameParts[] = $part; } diff --git a/src/Broker/PhpAmqpLib/Producer.php b/src/Broker/PhpAmqpLib/Producer.php index 4495d11..15322d2 100644 --- a/src/Broker/PhpAmqpLib/Producer.php +++ b/src/Broker/PhpAmqpLib/Producer.php @@ -18,7 +18,7 @@ public function __construct( Manager $manager) $this->manager = $manager; } - public function publish(string $id, string $queue, int $priority, ?int $expiration = null): void + public function publish(string $id, string $queue, string $priority, ?int $expiration = null): void { $queue = $this->manager->getQueueWithPriority($queue, $priority); $exchange = $queue; @@ -46,9 +46,9 @@ public function publish(string $id, string $queue, int $priority, ?int $expirati } - public function publishDie(string $queue): void + public function publishDie(string $queue, ?string $consumerLabel = null): void { - $this->publish(self::DIE, $queue, Manager::QUEUE_TOP_PRIORITY); + $this->publish(self::DIE, $queue, $this->manager->getTopPriorityName($consumerLabel)); } private function createMessage(string $body): AMQPMessage diff --git a/src/Broker/Producer.php b/src/Broker/Producer.php index a1cc19d..68d16f0 100644 --- a/src/Broker/Producer.php +++ b/src/Broker/Producer.php @@ -4,6 +4,6 @@ interface Producer { - public function publish(string $id, string $queue, int $priority, ?int $expiration = null): void; - public function publishDie(string $queue): void; + public function publish(string $id, string $queue, string $priority, ?int $expiration = null): void; + public function publishDie(string $queue, ?string $consumerLabel = null): void; } \ No newline at end of file diff --git a/src/Console/ConsumeCommand.php b/src/Console/ConsumeCommand.php index db0bf05..8815de2 100644 --- a/src/Console/ConsumeCommand.php +++ b/src/Console/ConsumeCommand.php @@ -28,6 +28,7 @@ protected function configure() $this->addArgument('queue', InputArgument::REQUIRED); $this->addOption('jobs', 'j', InputOption::VALUE_REQUIRED, 'Number of jobs consumed by one consumer in one process', 1); $this->addOption('priorities', 'p', InputOption::VALUE_REQUIRED, 'Priorities for consume (e.g. 10, 20-40, 25-, -20)'); + $this->addOption('label', 'l', InputOption::VALUE_OPTIONAL, 'Consumer label for restart it by reload command'); $this->setDescription('Start consumer.'); } @@ -35,6 +36,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int { $jobs = $input->getOption('jobs'); $priorities = $this->getPrioritiesListBasedConfig($input->getOption('priorities')); + $label = $input->getOption('label'); if (!is_numeric($jobs)) { $output->writeln("Option --jobs has to be integer"); @@ -43,7 +45,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int for ($i = 0; $i < (int)$jobs; $i++) { $this->backgroundQueue->dieIfNecessary(); - $this->consumer->consume($input->getArgument('queue'), $priorities); + $this->consumer->consume($input->getArgument('queue'), $priorities, $label); } return 0; diff --git a/src/Console/ReloadConsumersCommand.php b/src/Console/ReloadConsumersCommand.php index 9ef700f..bde0f34 100644 --- a/src/Console/ReloadConsumersCommand.php +++ b/src/Console/ReloadConsumersCommand.php @@ -27,17 +27,24 @@ protected function configure() 'A queue whose consumers are to reload.' ); $this->addArgument( - "number", - InputArgument::REQUIRED, - 'Number of consumers to reload.' + "consumers-labels", + InputArgument::OPTIONAL, + 'Labels of consumers to restart separated by comma.' ); - $this->setDescription('Creates the specified number of noop messages to reload consumers consuming specified queue.'); + $this->setDescription('Restart specified consumers by lables on specified queue.'); } protected function executeCommand(InputInterface $input, OutputInterface $output): int { - for ($i = 0; $i < $input->getArgument("number"); $i++) { - $this->producer->publishDie($input->getArgument("queue")); + $consumersLabels = $input->getArgument("consumers-labels"); + if ($consumersLabels) { + $consumersLabels = explode(',', $consumersLabels); + } else { + $consumersLabels = [null]; + } + + foreach ($consumersLabels as $consumerLabel) { + $this->producer->publishDie($input->getArgument("queue"), $consumerLabel); } return 0; diff --git a/src/Console/UpdateSchemaCommand.php b/src/Console/UpdateSchemaCommand.php index 3ff76f0..8f923b7 100755 --- a/src/Console/UpdateSchemaCommand.php +++ b/src/Console/UpdateSchemaCommand.php @@ -34,6 +34,7 @@ protected function configure() /** * @throws SchemaException * @throws Exception + * @throws \Doctrine\DBAL\Exception */ protected function executeCommand(InputInterface $input, OutputInterface $output): int { diff --git a/src/Doctrine/Connection.php b/src/Doctrine/Connection.php new file mode 100644 index 0000000..81514e0 --- /dev/null +++ b/src/Doctrine/Connection.php @@ -0,0 +1,34 @@ +transactionNestingLevel++; + } + + public function rollBack(): void + { + parent::rollBack(); + $this->transactionNestingLevel--; + } + + public function commit(): void + { + parent::commit(); + $this->transactionNestingLevel--; + + if ($this->transactionNestingLevel === 0) { + $this->getBackgroundQueue()->doPublishToBroker(); + } + } +} \ No newline at end of file