From feed5038d8ded0294714aefb455becb294bb7acf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Sat, 17 May 2025 11:59:59 +0200 Subject: [PATCH 01/10] Fixes schema generation lock directory The schema generation lock directory was incorrectly set to the locks directory instead of the temp directory. This commit updates the path to ensure the lock is created in the correct location. --- src/BackgroundQueue.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index c9043b1..e4fb2ae 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -701,7 +701,7 @@ private static function bindParamArray(string $prefix, array $values, array &$bi */ public function updateSchema(bool $force = false): void { - if (!$force && !FileSystem::createDirAtomically($this->config['locksDir'] . '/background_queue_schema_generated')) { + if (!$force && !FileSystem::createDirAtomically($this->config['tempDir'] . '/background_queue_schema_generated')) { return; } From c6e9961d280780b959e8212bb276935d1f573771 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Sat, 17 May 2025 17:33:53 +0200 Subject: [PATCH 02/10] Respects auto-update schema config Respects the 'autoUpdateSchema' configuration option, preventing schema updates if disabled. Also, corrects the directory used for checking if the schema has been generated. It was looking into a `tempDir` instead of `locksDir`. --- src/BackgroundQueue.php | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index e4fb2ae..3f86ebd 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -701,7 +701,11 @@ private static function bindParamArray(string $prefix, array $values, array &$bi */ public function updateSchema(bool $force = false): void { - if (!$force && !FileSystem::createDirAtomically($this->config['tempDir'] . '/background_queue_schema_generated')) { + if (!$this->config['autoUpdateSchema']) { + return; + } + + if (!$force && !FileSystem::createDirAtomically($this->config['locksDir'] . '/background_queue_schema_generated')) { return; } From a99d5fd4c4ad15e9b4a6369f5d484c6c31df3b31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Sat, 17 May 2025 17:37:38 +0200 Subject: [PATCH 03/10] Allows forced schema updates from console Modifies the schema update process to allow ignoring the `autoUpdateSchema` configuration when triggered from the console. This provides a mechanism to force schema updates when needed, regardless of the automatic update setting. --- src/BackgroundQueue.php | 4 ++-- src/Console/UpdateSchemaCommand.php | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index 3f86ebd..a270a94 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -699,9 +699,9 @@ private static function bindParamArray(string $prefix, array $values, array &$bi * @throws Exception * @internal */ - public function updateSchema(bool $force = false): void + public function updateSchema(bool $force = false, bool $ignoreAutoUpdateSchema = false): void { - if (!$this->config['autoUpdateSchema']) { + if (!$ignoreAutoUpdateSchema && !$this->config['autoUpdateSchema']) { return; } diff --git a/src/Console/UpdateSchemaCommand.php b/src/Console/UpdateSchemaCommand.php index 3ff76f0..3617a18 100755 --- a/src/Console/UpdateSchemaCommand.php +++ b/src/Console/UpdateSchemaCommand.php @@ -34,10 +34,11 @@ protected function configure() /** * @throws SchemaException * @throws Exception + * @throws \Doctrine\DBAL\Exception */ protected function executeCommand(InputInterface $input, OutputInterface $output): int { - $this->backgroundQueue->updateSchema($input->getOption('force')); + $this->backgroundQueue->updateSchema($input->getOption('force'), ignoreAutoUpdateSchema: true); return 0; } From 7dbeca2f9ef29d59701d08924282e3be12880a26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Sat, 17 May 2025 17:40:07 +0200 Subject: [PATCH 04/10] Conditionally updates schema Ensures schema updates are only performed when the `autoUpdateSchema` configuration option is enabled. Removes the `$ignoreAutoUpdateSchema` parameter from the `updateSchema` method, simplifying its usage and aligning it with the intended behavior. The schema update command now respects the configuration setting. --- src/BackgroundQueue.php | 14 +++++++------- src/Console/UpdateSchemaCommand.php | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index a270a94..343cfd1 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -426,7 +426,9 @@ public function getConfig(): array */ public function createQueryBuilder(): QueryBuilder { - $this->updateSchema(); + if ($this->config['autoUpdateSchema']) { + $this->updateSchema(); + } return $this->connection->createQueryBuilder() ->select('*') @@ -582,7 +584,9 @@ private function getPreviousUnfinishedJob(BackgroundJob $entity): ?BackgroundJob public function save(BackgroundJob $entity): void { $this->databaseConnectionCheckAndReconnect(); - $this->updateSchema(); + if ($this->config['autoUpdateSchema']) { + $this->updateSchema(); + } try { if (!$entity->getId()) { @@ -699,12 +703,8 @@ private static function bindParamArray(string $prefix, array $values, array &$bi * @throws Exception * @internal */ - public function updateSchema(bool $force = false, bool $ignoreAutoUpdateSchema = false): void + public function updateSchema(bool $force = false): void { - if (!$ignoreAutoUpdateSchema && !$this->config['autoUpdateSchema']) { - return; - } - if (!$force && !FileSystem::createDirAtomically($this->config['locksDir'] . '/background_queue_schema_generated')) { return; } diff --git a/src/Console/UpdateSchemaCommand.php b/src/Console/UpdateSchemaCommand.php index 3617a18..8f923b7 100755 --- a/src/Console/UpdateSchemaCommand.php +++ b/src/Console/UpdateSchemaCommand.php @@ -38,7 +38,7 @@ protected function configure() */ protected function executeCommand(InputInterface $input, OutputInterface $output): int { - $this->backgroundQueue->updateSchema($input->getOption('force'), ignoreAutoUpdateSchema: true); + $this->backgroundQueue->updateSchema($input->getOption('force')); return 0; } From 3db6f43297c68bb99e50a0f52b72361911dffa09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Sat, 17 May 2025 21:29:45 +0200 Subject: [PATCH 05/10] Updates connection handling in BackgroundQueue Updates the BackgroundQueue to directly use the connection object provided in the configuration. This simplifies the connection process and relies on the consumer to pass an already established connection, removing the need for connection creation within the class. --- src/BackgroundQueue.php | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index 343cfd1..60981d8 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -47,9 +47,6 @@ class BackgroundQueue */ public function __construct(array $config) { - if (is_string($config['connection'])) { - $config['connection'] = $this->parseDsn($config['connection']); - } if (empty($config['waitingJobExpiration'])) { $config['waitingJobExpiration'] = 1000; } @@ -85,7 +82,7 @@ public function __construct(array $config) } $this->config = $config; - $this->connection = DriverManager::getConnection($config['connection']); + $this->connection = $config['connection']; $this->logger = $config['logger'] ?: new NullLogger(); if ($config['producer']) { $this->producer = $config['producer']; From ab85954c5c24ee0cdb0b867ccfeec099e6c923f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Sat, 17 May 2025 22:06:48 +0200 Subject: [PATCH 06/10] Adapts to different Doctrine versions Checks if the `executeStatement` method exists before calling it, and falls back to `executeUpdate` for older Doctrine versions where `executeStatement` is not available. This ensures compatibility with a broader range of Doctrine versions. --- src/BackgroundQueue.php | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index 60981d8..9055e08 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -666,7 +666,11 @@ private function insertMultipleEntities(array $entities) $setToSql = implode(', ', $set); $sql = "INSERT INTO $table ($columnsToSql) VALUES $setToSql"; - return $this->connection->executeStatement($sql, $values); + if (method_exists($this->connection, 'executeStatement')) { + return $this->connection->executeStatement($sql, $values); + } else { + return $this->connection->executeUpdate($sql, $values); + } } /** From be4e83b6306ed687e96fe30a88f7378ab9940736 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Thu, 22 May 2025 09:58:23 +0200 Subject: [PATCH 07/10] Improves background queue reliability and efficiency Refactors the background queue to enhance reliability and efficiency by: - Adding connection checks and reconnection logic to prevent job loss due to database connection issues. - Implementing bulk processing for database operations to reduce the number of queries. - Removing transaction management from the queue itself to rely on the application's transaction boundaries. - Adding a dedicated connection for the consumer to allow updating job status even during application transaction rollbacks. - Adding ext-pdo dependency. --- composer.json | 1 + src/BackgroundQueue.php | 278 +++++++++++----------------------------- 2 files changed, 75 insertions(+), 204 deletions(-) diff --git a/composer.json b/composer.json index 0eca2f4..d86fb9d 100644 --- a/composer.json +++ b/composer.json @@ -20,6 +20,7 @@ "require": { "php": "^7.4|^8.0", "ext-json": "*", + "ext-pdo": "*", "doctrine/dbal": "^2.0|^3.0|^4.0", "symfony/console": "^4.0|^5.0|^6.0", "psr/log": "^1.0|^2.0|^3.0", diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index 9055e08..0d7eb99 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -14,16 +14,14 @@ use Doctrine\DBAL\DriverManager; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Schema\AbstractSchemaManager; -use Doctrine\DBAL\Schema\Comparator; use Doctrine\DBAL\Schema\Schema; use Doctrine\DBAL\Schema\SchemaException; use Doctrine\DBAL\Types\Types; use Exception; use InvalidArgumentException; +use PDOException; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; -use ReflectionException; -use ReflectionMethod; use RuntimeException; use Throwable; use TypeError; @@ -33,17 +31,17 @@ class BackgroundQueue const UNEXPECTED_ERROR_MESSAGE = 'Unexpected error occurred.'; private array $config; + private bool $connectionCreated = false; private Connection $connection; private LoggerInterface $logger; private ?Producer $producer = null; - private bool $transaction = false; private int $bulkSize = 1; private array $bulkBrokerCallbacks = []; private array $bulkDatabaseEntities = []; private bool $shouldDie = false; /** - * @throws \Doctrine\DBAL\Exception + * @throws Exception */ public function __construct(array $config) { @@ -92,6 +90,7 @@ public function __construct(array $config) /** * Bezpečně ověříme, že nedošlo ke ztrátě spojení k DB. * Pokud ano, připojíme se znovu. + * @throws \Doctrine\DBAL\Exception */ private function databaseConnectionCheckAndReconnect(): void { @@ -108,7 +107,7 @@ private function databaseConnectionCheckAndReconnect(): void $this->connection->close(); $this->connection->getNativeConnection(); } - } catch (\Exception $e) { + } catch (Exception $e) { $this->logger->log('critical', new Exception('BackgroundQueue - database connection lost (exception): ' . $e->getMessage(), 0, $e)); $this->connection->close(); $this->connection->getNativeConnection(); @@ -117,10 +116,13 @@ private function databaseConnectionCheckAndReconnect(): void } } + /** + * @throws Exception + */ private function databasePing(): bool { set_error_handler(function ($severity, $message) { - throw new \PDOException($message, $severity); + throw new PDOException($message, $severity); }); try { @@ -129,18 +131,18 @@ private function databasePing(): bool return true; - } catch (\Doctrine\DBAL\DBALException $e) { + } catch (\Doctrine\DBAL\Exception $e) { restore_error_handler(); return false; - } catch (\Exception $e) { + } catch (Exception $e) { restore_error_handler(); throw $e; } } /** - * @throws Exception + * @throws Exception|\Doctrine\DBAL\Exception */ public function publish( string $callbackName, @@ -164,8 +166,6 @@ public function publish( throw new Exception('Parameter "identifier" has to be set if "isUnique" is true.'); } - $this->checkArguments($parameters, $this->getCallback($callbackName)); - $priority = $this->getPriority($priority, $callbackName); $entity = new BackgroundJob(); @@ -192,11 +192,11 @@ public function publishToBroker(BackgroundJob $entity): void return; } - $this->bulkBrokerCallbacks[] = function() use ($entity) { + $this->bulkBrokerCallbacks[] = function () use ($entity) { try { // Pokud mám u callbacku nastavenou frontu, v DB zůstává původní, ale do brokera použiju tu z konfigu. // Tedy řadím do různých front pro různé consumery, ale z pohledu záznamů v DB se jedná o stejnou frontu. - $this->producer->publish((string) $entity->getId(), $this->getQueueForEntityIncludeCallback($entity), $entity->getPriority(), $entity->getPostponedBy()); + $this->producer->publish((string)$entity->getId(), $this->getQueueForEntityIncludeCallback($entity), $entity->getPriority(), $entity->getPostponedBy()); } catch (Exception $e) { $entity->setState(BackgroundJob::STATE_BROKER_FAILED) ->setErrorMessage($e->getMessage()); @@ -205,12 +205,15 @@ public function publishToBroker(BackgroundJob $entity): void $this->logException(self::UNEXPECTED_ERROR_MESSAGE, $entity, $e); } }; - if (!$this->transaction && count($this->bulkBrokerCallbacks) >= $this->bulkSize) { + if (!$this->connection->isTransactionActive() && count($this->bulkBrokerCallbacks) >= $this->bulkSize) { $this->doPublishToBroker(); } } - private function doPublishToBroker() + /** + * @internal + */ + public function doPublishToBroker(): void { foreach ($this->bulkBrokerCallbacks as $_closure) { $_closure(); @@ -219,14 +222,16 @@ private function doPublishToBroker() } /** - * @internal - * - * @param int|BackgroundJob $entity - * @return void + * @throws SchemaException + * @throws \Doctrine\DBAL\Exception * @throws Exception + * @internal */ public function process($entity, string $queue, int $priority): void { + // U publishera chceme transakci stejnou s flush, proto používáme stejné connection jako je v aplikaci. Ale u consumera chceme vlastní connection, aby když se revertne aplikační transakce, tak aby consumer mohl zapsat chybový stav k BackgroundJob. + $this->createConnection(); + if (!$entity instanceof BackgroundJob) { if (!$entity = $this->getEntity($entity, $queue, $priority)) { return; @@ -264,13 +269,6 @@ public function process($entity, string $queue, int $priority): void return; } - try { - $this->checkArguments($entity->getParameters(), $callback); - } catch (Exception $e) { - $this->logException($e, $entity); - return; - } - // změna stavu na zpracovává se try { $entity->setState(BackgroundJob::STATE_PROCESSING); @@ -378,8 +376,17 @@ public function process($entity, string $queue, int $priority): void } } + private function createConnection(): void + { + if (!$this->connectionCreated) { + $this->connection = DriverManager::getConnection($this->connection->getParams()); + $this->connectionCreated = true; + } + } + /** * @throws Exception + * @throws \Doctrine\DBAL\Exception */ public function getUnfinishedJobIdentifiers(array $identifiers = [], bool $excludeProcessing = false): array { @@ -435,10 +442,10 @@ public function createQueryBuilder(): QueryBuilder } /** - * @throws Exception + * @throws Exception|\Doctrine\DBAL\Exception * @internal */ - public function fetchAll(QueryBuilder $qb, int $maxResults = null, $toEntity = true): array + public function fetchAll(QueryBuilder $qb, ?int $maxResults = null, $toEntity = true): array { $sql = $qb->setMaxResults($maxResults)->getSQL(); $parameters = $qb->getParameters(); @@ -458,61 +465,24 @@ public function fetchAll(QueryBuilder $qb, int $maxResults = null, $toEntity = t return $entities; } - public function startBulk() + public function startBulk(): void { $this->bulkSize = $this->config['bulkSize']; - if ($this->transaction) { - $this->connection->beginTransaction(); - } - } - - public function endBulk() - { - $this->doPublishToDatabase(); - if ($this->transaction) { - $this->connection->commit(); - } - $this->doPublishToBroker(); - $this->bulkSize = 1; - } - - /** - * @throws \Doctrine\DBAL\Exception - * @throws Exception - */ - public function startTransaction() - { - if ($this->transaction) { - throw new Exception('Nested transactions not implemented.'); - } - $this->transaction = true; - $this->startBulk(); } /** * @throws \Doctrine\DBAL\Exception + * @throws SchemaException */ - public function commitTransaction() + public function endBulk(): void { - $this->endBulk(); - $this->transaction = false; - } - - public function rollbackTransaction() - { - if (!$this->transaction) { - return; - } - - $this->connection->rollBack(); - $this->transaction = false; - $this->bulkDatabaseEntities = []; - $this->bulkBrokerCallbacks = []; + $this->doPublishToDatabase(); + $this->doPublishToBroker(); $this->bulkSize = 1; } /** - * @throws Exception + * @throws Exception|\Doctrine\DBAL\Exception */ private function fetch(QueryBuilder $qb): ?BackgroundJob { @@ -520,7 +490,7 @@ private function fetch(QueryBuilder $qb): ?BackgroundJob } /** - * @throws Exception + * @throws Exception|\Doctrine\DBAL\Exception */ private function count(QueryBuilder $qb): int { @@ -529,6 +499,7 @@ private function count(QueryBuilder $qb): int /** * @throws Exception + * @throws \Doctrine\DBAL\Exception */ private function isRedundant(BackgroundJob $entity): bool { @@ -548,7 +519,8 @@ private function isRedundant(BackgroundJob $entity): bool } /** - * @throws Exception + * @throws SchemaException + * @throws \Doctrine\DBAL\Exception */ private function getPreviousUnfinishedJob(BackgroundJob $entity): ?BackgroundJob { @@ -585,29 +557,29 @@ public function save(BackgroundJob $entity): void $this->updateSchema(); } - try { - if (!$entity->getId()) { - if ($this->producer) { - $entity->setProcessedByBroker(true); - } + if (!$entity->getId()) { + if ($this->producer) { + $entity->setProcessedByBroker(true); + } - $this->bulkDatabaseEntities[] = $entity; + $this->bulkDatabaseEntities[] = $entity; - if (count($this->bulkDatabaseEntities) >= $this->bulkSize) { - $this->doPublishToDatabase(); - } - } else { - $this->connection->update($this->config['tableName'], $entity->getDatabaseValues(), ['id' => $entity->getId()]); + if (count($this->bulkDatabaseEntities) >= $this->bulkSize) { + $this->doPublishToDatabase(); } - } catch (Exception $e) { - $this->rollbackTransaction(); - throw $e; + } else { + $this->connection->update($this->config['tableName'], $entity->getDatabaseValues(), ['id' => $entity->getId()]); } } - private function doPublishToDatabase() { + /** + * @throws SchemaException + * @throws \Doctrine\DBAL\Exception + */ + private function doPublishToDatabase(): void + { if (empty($this->bulkDatabaseEntities)) { - return null; + return; } // Protože vkládám více záznamů najednou, potřebuju si vše uzavřít do transakce, abych si bezpečně získal seznam vložených ID a nastavil je vloženým entitám. @@ -636,16 +608,17 @@ private function doPublishToDatabase() { /** * Podporuje INSERT s více VALUES v jednom příkazu. - * Vychází z @link \Doctrine\DBAL\Connection::insert + * Vychází z @param BackgroundJob[] $entities + * @throws \Doctrine\DBAL\Exception + * @link Connection::insert * - * @param BackgroundJob[] $entities */ - private function insertMultipleEntities(array $entities) + private function insertMultipleEntities(array $entities): void { $table = $this->config['tableName']; if (empty($entities)) { - return null; + return; } $columns = array_keys($entities[0]->getDatabaseValues()); @@ -667,9 +640,9 @@ private function insertMultipleEntities(array $entities) $sql = "INSERT INTO $table ($columnsToSql) VALUES $setToSql"; if (method_exists($this->connection, 'executeStatement')) { - return $this->connection->executeStatement($sql, $values); + $this->connection->executeStatement($sql, $values); } else { - return $this->connection->executeUpdate($sql, $values); + $this->connection->executeUpdate($sql, $values); } } @@ -742,7 +715,7 @@ public function updateSchema(bool $force = false): void $schemaManager = $this->createSchemaManager(); if ($schemaManager->tablesExist([$this->config['tableName']])) { $tableDiff = $schemaManager->createComparator()->compareTables($this->createSchemaManager()->introspectTable($this->config['tableName']), $table); - $sqls = $tableDiff ? $this->connection->getDatabasePlatform()->getAlterTableSQL($tableDiff) : []; + $sqls = $this->connection->getDatabasePlatform()->getAlterTableSQL($tableDiff); } else { $sqls = $this->connection->getDatabasePlatform()->getCreateTableSQL($table); } @@ -869,7 +842,8 @@ private function getEntity(int $id, string $queue, int $priority): ?BackgroundJo } /** - * @throws Exception + * @throws SchemaException + * @throws \Doctrine\DBAL\Exception */ private function checkUnfinishedJobs(BackgroundJob $entity): bool { @@ -889,110 +863,6 @@ private function checkUnfinishedJobs(BackgroundJob $entity): bool return true; } - /** - * @throws ReflectionException - */ - private function checkArguments(?array $args, $callback) - { - return; // TODO - - // Create a ReflectionFunction object based on the provided callback - $reflection = new ReflectionMethod($callback[0], $callback[1]); - - // Retrieve the parameters of the method - $params = $reflection->getParameters(); - - $builtInTypesMapping = [ - 'integer' => 'int', - 'boolean' => 'bool', - 'double' => 'float', - ]; - - if (version_compare(PHP_VERSION, '8.0', '<')) { - $requiredParams = 0; - foreach ($params as $_param) { - if (!$_param->isOptional()) { - $requiredParams++; - } - } - - // Check the number of arguments - if (count($args) < $requiredParams) { - throw new InvalidArgumentException("Number of arguments does not match."); - } - - $argsValues = array_values($args); - // For PHP lower than 8.0 - foreach ($params as $index => $param) { - $type = $param->getType(); - - $argument = $argsValues[$index]; - - // For nullable parameters - if ($param->isOptional() && is_null($argument)) { - continue; - } - - // For other parameters - if ($type) { - $expectedType = $type->getName(); - if ($type->isBuiltin()) { - $actualType = gettype($argument); - $actualType = $builtInTypesMapping[$actualType] ?? $actualType; - if (gettype($argument) !== $expectedType) { - throw new InvalidArgumentException("Parameter type does not match at index $index: expected $expectedType, got " . gettype($argument)); - } - } else { - if (!is_a($argument, $expectedType)) { - throw new InvalidArgumentException("Parameter type does not match at index $index: expected $expectedType or subtype, got " . get_class($argument)); - } - } - } - } - } else { - $paramNames = []; - foreach ($params as $param) { - $paramNames[$param->getName()] = $param->getName(); - } - - foreach ($args as $_name => $_value) { - if (!array_key_exists($_name, $paramNames)) { - throw new InvalidArgumentException("Missing parameter for the argument $_name."); - } - } - - foreach ($params as $param) { - $name = $param->getName(); - $type = $param->getType(); - - if (!array_key_exists($name, $args)) { - if (!$param->isOptional()) { - throw new InvalidArgumentException("Missing argument for the parameter $name."); - } - - continue; - } - - $argument = $args[$name]; - - if ($type) { - $expectedType = $type->getName(); - if ($type->isBuiltin()) { - $actualType = gettype($argument); - $actualType = $builtInTypesMapping[$actualType] ?? $actualType; - if ($actualType !== $expectedType) { - throw new InvalidArgumentException("Parameter $name type does not match: expected $expectedType, got " . gettype($argument)); - } - } else { - if (!is_a($argument, $expectedType)) { - throw new InvalidArgumentException("Parameter $name type does not match: expected $expectedType or subtype, got " . get_class($argument)); - } - } - } - } - } - } - private function getPriority(?int $priority, string $callbackName): int { if (is_null($priority)) { @@ -1026,10 +896,10 @@ private function getMemories(): array ]; } - public function dieIfNecessary() { + public function dieIfNecessary(): void + { if ($this->shouldDie) { die(); } } - } From 875c730f26aec6ebbfd47dda738d5438181c5852 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Thu, 22 May 2025 11:43:47 +0200 Subject: [PATCH 08/10] Publishes queue on transaction commit Ensures background queue messages are only published to the broker when the outermost database transaction commits successfully. This is achieved by tracking the transaction nesting level and publishing the queue only when the level returns to zero. --- src/Doctrine/Connection.php | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 src/Doctrine/Connection.php diff --git a/src/Doctrine/Connection.php b/src/Doctrine/Connection.php new file mode 100644 index 0000000..0338d73 --- /dev/null +++ b/src/Doctrine/Connection.php @@ -0,0 +1,32 @@ +transactionNestingLevel++; + } + + public function rollBack(): void + { + parent::rollBack(); + $this->transactionNestingLevel--; + } + + public function commit(): void + { + parent::commit(); + $this->transactionNestingLevel--; + + if ($this->transactionNestingLevel === 0) { + $this->getBackgroundQueue()->doPublishToBroker(); + } + } +} \ No newline at end of file From 77b093390c63c3af4eb34ffc87abc29a0979b6bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Kud=C4=9Blka?= Date: Thu, 22 May 2025 11:48:17 +0200 Subject: [PATCH 09/10] fix --- src/Doctrine/Connection.php | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/Doctrine/Connection.php b/src/Doctrine/Connection.php index 0338d73..81514e0 100644 --- a/src/Doctrine/Connection.php +++ b/src/Doctrine/Connection.php @@ -2,10 +2,12 @@ namespace ADT\BackgroundQueue\Doctrine; +use ADT\BackgroundQueue\BackgroundQueue; + trait Connection { - abstract protected function getBackgroundQueue(); - + abstract protected function getBackgroundQueue(): BackgroundQueue; + protected int $transactionNestingLevel = 0; public function beginTransaction(): void From 855bb404ceeae04328e8c9d478cf723989c66fde Mon Sep 17 00:00:00 2001 From: Viktor Masicek Date: Wed, 8 Oct 2025 21:40:48 +0200 Subject: [PATCH 10/10] Extra column in database for parameters in json format --- composer.json | 2 +- src/BackgroundQueue.php | 5 +++-- src/Entity/BackgroundJob.php | 37 ++++++++++++++++++++++++++---------- 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/composer.json b/composer.json index d86fb9d..ee2ffde 100644 --- a/composer.json +++ b/composer.json @@ -24,7 +24,7 @@ "doctrine/dbal": "^2.0|^3.0|^4.0", "symfony/console": "^4.0|^5.0|^6.0", "psr/log": "^1.0|^2.0|^3.0", - "adt/utils": "^2.10", + "adt/utils": "^2.14", "adt/command-lock": "^1.1" }, "autoload": { diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index 0d7eb99..6b1e8ff 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -73,7 +73,7 @@ public function __construct(array $config) $config['bulkSize'] = 1; } if (!isset($config['parametersFormat'])) { - $config['parametersFormat'] = BackgroundJob::PARAMETERS_FORMAT_SERIALIZE; + $config['parametersFormat'] = BackgroundJob::PARAMETERS_FORMAT_JSON; } if (!in_array($config['parametersFormat'], BackgroundJob::PARAMETERS_FORMATS, true)) { throw new Exception('Unsupported parameters format: ' . $config['parametersFormat']); @@ -691,7 +691,8 @@ public function updateSchema(bool $force = false): void $table->addColumn('queue', Types::STRING, ['length' => 255])->setNotnull(true); $table->addColumn('priority', Types::INTEGER)->setNotnull(false); $table->addColumn('callback_name', Types::STRING, ['length' => 255])->setNotnull(true); - $table->addColumn('parameters', Types::BLOB)->setNotnull(true); + $table->addColumn('parameters', Types::BLOB)->setNotnull(false); + $table->addColumn('parameters_json', Types::JSON)->setNotnull(false); $table->addColumn('state', Types::SMALLINT)->setNotnull(true); $table->addColumn('created_at', Types::DATETIME_IMMUTABLE)->setNotnull(true); $table->addColumn('last_attempt_at', Types::DATETIME_IMMUTABLE)->setNotnull(false); diff --git a/src/Entity/BackgroundJob.php b/src/Entity/BackgroundJob.php index 8355092..1599072 100644 --- a/src/Entity/BackgroundJob.php +++ b/src/Entity/BackgroundJob.php @@ -2,6 +2,7 @@ namespace ADT\BackgroundQueue\Entity; +use ADT\Utils\Utils; use DateTime; use DateTimeImmutable; use Exception; @@ -44,6 +45,7 @@ final class BackgroundJob private ?int $priority; private string $callbackName; private $parameters; /** @see self::setParameters() */ + private $parameters_json = null; /** @see self::setParameters() */ private int $state = self::STATE_READY; private DateTimeImmutable $createdAt; private ?DateTimeImmutable $lastAttemptAt = null; @@ -134,19 +136,26 @@ public function setSerialGroup(?string $serialGroup): self public function getParameters(): array { $this->parameters = is_resource($this->parameters) ? stream_get_contents($this->parameters) : $this->parameters; - - if (substr($this->parameters, 0, 2) === 'a:') { + + if (!is_null($this->parameters)) { return unserialize($this->parameters); } - return json_decode($this->parameters, true); + $parametersJson = json_decode($this->parameters_json, true); + $parameters = []; + foreach ($parametersJson as $key => $value) { + $parameters[$key] = Utils::getDateTimeFromArray($value, true); + } + return $parameters; } /** * Parametry ukládá jako serializované pole nebo jako json. * Formát určuje parametr v BackgroundQueue `parametersFormat`. * - `serialize` => ukládá jako serializované pole a je bez omezení - * - `json` => parametry mohou obsahovat pouze skalární typy, pole a NULL + * - `json` => ukládá jako json + * - parametry mohou obsahovat pouze skalární typy, pole, NULL a \DateTimeInterface + * - pokud je nějaký z parametrů objekt, automaticky se použije "serialize" * * @param object|array|string|int|float|bool|null $parameters * @param string $parametersFormat @@ -156,17 +165,23 @@ public function setParameters($parameters, string $parametersFormat): self { $parameters = is_array($parameters) ? $parameters : [$parameters]; + if ($parametersFormat == self::PARAMETERS_FORMAT_JSON) { + foreach ($parameters as $parameter) { + if (!is_scalar($parameter) && !is_array($parameter) && !is_null($parameter) && !($parameter instanceof \DateTimeInterface)) { + $parametersFormat = self::PARAMETERS_FORMAT_SERIALIZE; + break; + } + } + } + switch ($parametersFormat) { case self::PARAMETERS_FORMAT_SERIALIZE: $this->parameters = serialize($parameters); + $this->parameters_json = null; break; case self::PARAMETERS_FORMAT_JSON: - foreach ($parameters as $idx => $parameter) { - if (!is_scalar($parameter) && !is_array($parameter) && !is_null($parameter)) { - throw new Exception("Unsupported type '" . gettype($parameter) . "' for \$parameters[$idx] using parametersFormat = " . self::PARAMETERS_FORMAT_JSON); - } - } - $this->parameters = json_encode($parameters); + $this->parameters = null; + $this->parameters_json = json_encode($parameters); break; default: throw new Exception("Unsupported parameters format: $parametersFormat"); @@ -327,6 +342,7 @@ public static function createEntity(array $values): self $entity->priority = $values['priority']; $entity->callbackName = $values['callback_name']; $entity->parameters = $values['parameters']; + $entity->parameters_json = $values['parameters_json']; $entity->state = $values['state']; $entity->createdAt = new DateTimeImmutable($values['created_at']); $entity->lastAttemptAt = $values['last_attempt_at'] ? new DateTimeImmutable($values['last_attempt_at']) : null; @@ -353,6 +369,7 @@ public function getDatabaseValues(): array 'priority' => $this->priority, 'callback_name' => $this->callbackName, 'parameters' => $this->parameters, + 'parameters_json' => $this->parameters_json, 'state' => $this->state, 'created_at' => $this->createdAt->format('Y-m-d H:i:s'), 'last_attempt_at' => $this->lastAttemptAt ? $this->lastAttemptAt->format('Y-m-d H:i:s') : null,