diff --git a/composer.json b/composer.json index 0eca2f4..ee2ffde 100644 --- a/composer.json +++ b/composer.json @@ -20,10 +20,11 @@ "require": { "php": "^7.4|^8.0", "ext-json": "*", + "ext-pdo": "*", "doctrine/dbal": "^2.0|^3.0|^4.0", "symfony/console": "^4.0|^5.0|^6.0", "psr/log": "^1.0|^2.0|^3.0", - "adt/utils": "^2.10", + "adt/utils": "^2.14", "adt/command-lock": "^1.1" }, "autoload": { diff --git a/src/BackgroundQueue.php b/src/BackgroundQueue.php index c9043b1..6b1e8ff 100644 --- a/src/BackgroundQueue.php +++ b/src/BackgroundQueue.php @@ -14,16 +14,14 @@ use Doctrine\DBAL\DriverManager; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Schema\AbstractSchemaManager; -use Doctrine\DBAL\Schema\Comparator; use Doctrine\DBAL\Schema\Schema; use Doctrine\DBAL\Schema\SchemaException; use Doctrine\DBAL\Types\Types; use Exception; use InvalidArgumentException; +use PDOException; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; -use ReflectionException; -use ReflectionMethod; use RuntimeException; use Throwable; use TypeError; @@ -33,23 +31,20 @@ class BackgroundQueue const UNEXPECTED_ERROR_MESSAGE = 'Unexpected error occurred.'; private array $config; + private bool $connectionCreated = false; private Connection $connection; private LoggerInterface $logger; private ?Producer $producer = null; - private bool $transaction = false; private int $bulkSize = 1; private array $bulkBrokerCallbacks = []; private array $bulkDatabaseEntities = []; private bool $shouldDie = false; /** - * @throws \Doctrine\DBAL\Exception + * @throws Exception */ public function __construct(array $config) { - if (is_string($config['connection'])) { - $config['connection'] = $this->parseDsn($config['connection']); - } if (empty($config['waitingJobExpiration'])) { $config['waitingJobExpiration'] = 1000; } @@ -78,14 +73,14 @@ public function __construct(array $config) $config['bulkSize'] = 1; } if (!isset($config['parametersFormat'])) { - $config['parametersFormat'] = BackgroundJob::PARAMETERS_FORMAT_SERIALIZE; + $config['parametersFormat'] = BackgroundJob::PARAMETERS_FORMAT_JSON; } if (!in_array($config['parametersFormat'], BackgroundJob::PARAMETERS_FORMATS, true)) { throw new Exception('Unsupported parameters format: ' . $config['parametersFormat']); } $this->config = $config; - $this->connection = DriverManager::getConnection($config['connection']); + $this->connection = $config['connection']; $this->logger = $config['logger'] ?: new NullLogger(); if ($config['producer']) { $this->producer = $config['producer']; @@ -95,6 +90,7 @@ public function __construct(array $config) /** * Bezpečně ověříme, že nedošlo ke ztrátě spojení k DB. * Pokud ano, připojíme se znovu. + * @throws \Doctrine\DBAL\Exception */ private function databaseConnectionCheckAndReconnect(): void { @@ -111,7 +107,7 @@ private function databaseConnectionCheckAndReconnect(): void $this->connection->close(); $this->connection->getNativeConnection(); } - } catch (\Exception $e) { + } catch (Exception $e) { $this->logger->log('critical', new Exception('BackgroundQueue - database connection lost (exception): ' . $e->getMessage(), 0, $e)); $this->connection->close(); $this->connection->getNativeConnection(); @@ -120,10 +116,13 @@ private function databaseConnectionCheckAndReconnect(): void } } + /** + * @throws Exception + */ private function databasePing(): bool { set_error_handler(function ($severity, $message) { - throw new \PDOException($message, $severity); + throw new PDOException($message, $severity); }); try { @@ -132,18 +131,18 @@ private function databasePing(): bool return true; - } catch (\Doctrine\DBAL\DBALException $e) { + } catch (\Doctrine\DBAL\Exception $e) { restore_error_handler(); return false; - } catch (\Exception $e) { + } catch (Exception $e) { restore_error_handler(); throw $e; } } /** - * @throws Exception + * @throws Exception|\Doctrine\DBAL\Exception */ public function publish( string $callbackName, @@ -167,8 +166,6 @@ public function publish( throw new Exception('Parameter "identifier" has to be set if "isUnique" is true.'); } - $this->checkArguments($parameters, $this->getCallback($callbackName)); - $priority = $this->getPriority($priority, $callbackName); $entity = new BackgroundJob(); @@ -195,11 +192,11 @@ public function publishToBroker(BackgroundJob $entity): void return; } - $this->bulkBrokerCallbacks[] = function() use ($entity) { + $this->bulkBrokerCallbacks[] = function () use ($entity) { try { // Pokud mám u callbacku nastavenou frontu, v DB zůstává původní, ale do brokera použiju tu z konfigu. // Tedy řadím do různých front pro různé consumery, ale z pohledu záznamů v DB se jedná o stejnou frontu. - $this->producer->publish((string) $entity->getId(), $this->getQueueForEntityIncludeCallback($entity), $entity->getPriority(), $entity->getPostponedBy()); + $this->producer->publish((string)$entity->getId(), $this->getQueueForEntityIncludeCallback($entity), $entity->getPriority(), $entity->getPostponedBy()); } catch (Exception $e) { $entity->setState(BackgroundJob::STATE_BROKER_FAILED) ->setErrorMessage($e->getMessage()); @@ -208,12 +205,15 @@ public function publishToBroker(BackgroundJob $entity): void $this->logException(self::UNEXPECTED_ERROR_MESSAGE, $entity, $e); } }; - if (!$this->transaction && count($this->bulkBrokerCallbacks) >= $this->bulkSize) { + if (!$this->connection->isTransactionActive() && count($this->bulkBrokerCallbacks) >= $this->bulkSize) { $this->doPublishToBroker(); } } - private function doPublishToBroker() + /** + * @internal + */ + public function doPublishToBroker(): void { foreach ($this->bulkBrokerCallbacks as $_closure) { $_closure(); @@ -222,14 +222,16 @@ private function doPublishToBroker() } /** - * @internal - * - * @param int|BackgroundJob $entity - * @return void + * @throws SchemaException + * @throws \Doctrine\DBAL\Exception * @throws Exception + * @internal */ public function process($entity, string $queue, int $priority): void { + // U publishera chceme transakci stejnou s flush, proto používáme stejné connection jako je v aplikaci. Ale u consumera chceme vlastní connection, aby když se revertne aplikační transakce, tak aby consumer mohl zapsat chybový stav k BackgroundJob. + $this->createConnection(); + if (!$entity instanceof BackgroundJob) { if (!$entity = $this->getEntity($entity, $queue, $priority)) { return; @@ -267,13 +269,6 @@ public function process($entity, string $queue, int $priority): void return; } - try { - $this->checkArguments($entity->getParameters(), $callback); - } catch (Exception $e) { - $this->logException($e, $entity); - return; - } - // změna stavu na zpracovává se try { $entity->setState(BackgroundJob::STATE_PROCESSING); @@ -381,8 +376,17 @@ public function process($entity, string $queue, int $priority): void } } + private function createConnection(): void + { + if (!$this->connectionCreated) { + $this->connection = DriverManager::getConnection($this->connection->getParams()); + $this->connectionCreated = true; + } + } + /** * @throws Exception + * @throws \Doctrine\DBAL\Exception */ public function getUnfinishedJobIdentifiers(array $identifiers = [], bool $excludeProcessing = false): array { @@ -426,7 +430,9 @@ public function getConfig(): array */ public function createQueryBuilder(): QueryBuilder { - $this->updateSchema(); + if ($this->config['autoUpdateSchema']) { + $this->updateSchema(); + } return $this->connection->createQueryBuilder() ->select('*') @@ -436,10 +442,10 @@ public function createQueryBuilder(): QueryBuilder } /** - * @throws Exception + * @throws Exception|\Doctrine\DBAL\Exception * @internal */ - public function fetchAll(QueryBuilder $qb, int $maxResults = null, $toEntity = true): array + public function fetchAll(QueryBuilder $qb, ?int $maxResults = null, $toEntity = true): array { $sql = $qb->setMaxResults($maxResults)->getSQL(); $parameters = $qb->getParameters(); @@ -459,61 +465,24 @@ public function fetchAll(QueryBuilder $qb, int $maxResults = null, $toEntity = t return $entities; } - public function startBulk() + public function startBulk(): void { $this->bulkSize = $this->config['bulkSize']; - if ($this->transaction) { - $this->connection->beginTransaction(); - } - } - - public function endBulk() - { - $this->doPublishToDatabase(); - if ($this->transaction) { - $this->connection->commit(); - } - $this->doPublishToBroker(); - $this->bulkSize = 1; - } - - /** - * @throws \Doctrine\DBAL\Exception - * @throws Exception - */ - public function startTransaction() - { - if ($this->transaction) { - throw new Exception('Nested transactions not implemented.'); - } - $this->transaction = true; - $this->startBulk(); } /** * @throws \Doctrine\DBAL\Exception + * @throws SchemaException */ - public function commitTransaction() + public function endBulk(): void { - $this->endBulk(); - $this->transaction = false; - } - - public function rollbackTransaction() - { - if (!$this->transaction) { - return; - } - - $this->connection->rollBack(); - $this->transaction = false; - $this->bulkDatabaseEntities = []; - $this->bulkBrokerCallbacks = []; + $this->doPublishToDatabase(); + $this->doPublishToBroker(); $this->bulkSize = 1; } /** - * @throws Exception + * @throws Exception|\Doctrine\DBAL\Exception */ private function fetch(QueryBuilder $qb): ?BackgroundJob { @@ -521,7 +490,7 @@ private function fetch(QueryBuilder $qb): ?BackgroundJob } /** - * @throws Exception + * @throws Exception|\Doctrine\DBAL\Exception */ private function count(QueryBuilder $qb): int { @@ -530,6 +499,7 @@ private function count(QueryBuilder $qb): int /** * @throws Exception + * @throws \Doctrine\DBAL\Exception */ private function isRedundant(BackgroundJob $entity): bool { @@ -549,7 +519,8 @@ private function isRedundant(BackgroundJob $entity): bool } /** - * @throws Exception + * @throws SchemaException + * @throws \Doctrine\DBAL\Exception */ private function getPreviousUnfinishedJob(BackgroundJob $entity): ?BackgroundJob { @@ -582,31 +553,33 @@ private function getPreviousUnfinishedJob(BackgroundJob $entity): ?BackgroundJob public function save(BackgroundJob $entity): void { $this->databaseConnectionCheckAndReconnect(); - $this->updateSchema(); + if ($this->config['autoUpdateSchema']) { + $this->updateSchema(); + } - try { - if (!$entity->getId()) { - if ($this->producer) { - $entity->setProcessedByBroker(true); - } + if (!$entity->getId()) { + if ($this->producer) { + $entity->setProcessedByBroker(true); + } - $this->bulkDatabaseEntities[] = $entity; + $this->bulkDatabaseEntities[] = $entity; - if (count($this->bulkDatabaseEntities) >= $this->bulkSize) { - $this->doPublishToDatabase(); - } - } else { - $this->connection->update($this->config['tableName'], $entity->getDatabaseValues(), ['id' => $entity->getId()]); + if (count($this->bulkDatabaseEntities) >= $this->bulkSize) { + $this->doPublishToDatabase(); } - } catch (Exception $e) { - $this->rollbackTransaction(); - throw $e; + } else { + $this->connection->update($this->config['tableName'], $entity->getDatabaseValues(), ['id' => $entity->getId()]); } } - private function doPublishToDatabase() { + /** + * @throws SchemaException + * @throws \Doctrine\DBAL\Exception + */ + private function doPublishToDatabase(): void + { if (empty($this->bulkDatabaseEntities)) { - return null; + return; } // Protože vkládám více záznamů najednou, potřebuju si vše uzavřít do transakce, abych si bezpečně získal seznam vložených ID a nastavil je vloženým entitám. @@ -635,16 +608,17 @@ private function doPublishToDatabase() { /** * Podporuje INSERT s více VALUES v jednom příkazu. - * Vychází z @link \Doctrine\DBAL\Connection::insert + * Vychází z @param BackgroundJob[] $entities + * @throws \Doctrine\DBAL\Exception + * @link Connection::insert * - * @param BackgroundJob[] $entities */ - private function insertMultipleEntities(array $entities) + private function insertMultipleEntities(array $entities): void { $table = $this->config['tableName']; if (empty($entities)) { - return null; + return; } $columns = array_keys($entities[0]->getDatabaseValues()); @@ -665,7 +639,11 @@ private function insertMultipleEntities(array $entities) $setToSql = implode(', ', $set); $sql = "INSERT INTO $table ($columnsToSql) VALUES $setToSql"; - return $this->connection->executeStatement($sql, $values); + if (method_exists($this->connection, 'executeStatement')) { + $this->connection->executeStatement($sql, $values); + } else { + $this->connection->executeUpdate($sql, $values); + } } /** @@ -713,7 +691,8 @@ public function updateSchema(bool $force = false): void $table->addColumn('queue', Types::STRING, ['length' => 255])->setNotnull(true); $table->addColumn('priority', Types::INTEGER)->setNotnull(false); $table->addColumn('callback_name', Types::STRING, ['length' => 255])->setNotnull(true); - $table->addColumn('parameters', Types::BLOB)->setNotnull(true); + $table->addColumn('parameters', Types::BLOB)->setNotnull(false); + $table->addColumn('parameters_json', Types::JSON)->setNotnull(false); $table->addColumn('state', Types::SMALLINT)->setNotnull(true); $table->addColumn('created_at', Types::DATETIME_IMMUTABLE)->setNotnull(true); $table->addColumn('last_attempt_at', Types::DATETIME_IMMUTABLE)->setNotnull(false); @@ -737,7 +716,7 @@ public function updateSchema(bool $force = false): void $schemaManager = $this->createSchemaManager(); if ($schemaManager->tablesExist([$this->config['tableName']])) { $tableDiff = $schemaManager->createComparator()->compareTables($this->createSchemaManager()->introspectTable($this->config['tableName']), $table); - $sqls = $tableDiff ? $this->connection->getDatabasePlatform()->getAlterTableSQL($tableDiff) : []; + $sqls = $this->connection->getDatabasePlatform()->getAlterTableSQL($tableDiff); } else { $sqls = $this->connection->getDatabasePlatform()->getCreateTableSQL($table); } @@ -864,7 +843,8 @@ private function getEntity(int $id, string $queue, int $priority): ?BackgroundJo } /** - * @throws Exception + * @throws SchemaException + * @throws \Doctrine\DBAL\Exception */ private function checkUnfinishedJobs(BackgroundJob $entity): bool { @@ -884,110 +864,6 @@ private function checkUnfinishedJobs(BackgroundJob $entity): bool return true; } - /** - * @throws ReflectionException - */ - private function checkArguments(?array $args, $callback) - { - return; // TODO - - // Create a ReflectionFunction object based on the provided callback - $reflection = new ReflectionMethod($callback[0], $callback[1]); - - // Retrieve the parameters of the method - $params = $reflection->getParameters(); - - $builtInTypesMapping = [ - 'integer' => 'int', - 'boolean' => 'bool', - 'double' => 'float', - ]; - - if (version_compare(PHP_VERSION, '8.0', '<')) { - $requiredParams = 0; - foreach ($params as $_param) { - if (!$_param->isOptional()) { - $requiredParams++; - } - } - - // Check the number of arguments - if (count($args) < $requiredParams) { - throw new InvalidArgumentException("Number of arguments does not match."); - } - - $argsValues = array_values($args); - // For PHP lower than 8.0 - foreach ($params as $index => $param) { - $type = $param->getType(); - - $argument = $argsValues[$index]; - - // For nullable parameters - if ($param->isOptional() && is_null($argument)) { - continue; - } - - // For other parameters - if ($type) { - $expectedType = $type->getName(); - if ($type->isBuiltin()) { - $actualType = gettype($argument); - $actualType = $builtInTypesMapping[$actualType] ?? $actualType; - if (gettype($argument) !== $expectedType) { - throw new InvalidArgumentException("Parameter type does not match at index $index: expected $expectedType, got " . gettype($argument)); - } - } else { - if (!is_a($argument, $expectedType)) { - throw new InvalidArgumentException("Parameter type does not match at index $index: expected $expectedType or subtype, got " . get_class($argument)); - } - } - } - } - } else { - $paramNames = []; - foreach ($params as $param) { - $paramNames[$param->getName()] = $param->getName(); - } - - foreach ($args as $_name => $_value) { - if (!array_key_exists($_name, $paramNames)) { - throw new InvalidArgumentException("Missing parameter for the argument $_name."); - } - } - - foreach ($params as $param) { - $name = $param->getName(); - $type = $param->getType(); - - if (!array_key_exists($name, $args)) { - if (!$param->isOptional()) { - throw new InvalidArgumentException("Missing argument for the parameter $name."); - } - - continue; - } - - $argument = $args[$name]; - - if ($type) { - $expectedType = $type->getName(); - if ($type->isBuiltin()) { - $actualType = gettype($argument); - $actualType = $builtInTypesMapping[$actualType] ?? $actualType; - if ($actualType !== $expectedType) { - throw new InvalidArgumentException("Parameter $name type does not match: expected $expectedType, got " . gettype($argument)); - } - } else { - if (!is_a($argument, $expectedType)) { - throw new InvalidArgumentException("Parameter $name type does not match: expected $expectedType or subtype, got " . get_class($argument)); - } - } - } - } - } - } - private function getPriority(?int $priority, string $callbackName): int { if (is_null($priority)) { @@ -1021,10 +897,10 @@ private function getMemories(): array ]; } - public function dieIfNecessary() { + public function dieIfNecessary(): void + { if ($this->shouldDie) { die(); } } - } diff --git a/src/Console/UpdateSchemaCommand.php b/src/Console/UpdateSchemaCommand.php index 3ff76f0..8f923b7 100755 --- a/src/Console/UpdateSchemaCommand.php +++ b/src/Console/UpdateSchemaCommand.php @@ -34,6 +34,7 @@ protected function configure() /** * @throws SchemaException * @throws Exception + * @throws \Doctrine\DBAL\Exception */ protected function executeCommand(InputInterface $input, OutputInterface $output): int { diff --git a/src/Doctrine/Connection.php b/src/Doctrine/Connection.php new file mode 100644 index 0000000..81514e0 --- /dev/null +++ b/src/Doctrine/Connection.php @@ -0,0 +1,34 @@ +transactionNestingLevel++; + } + + public function rollBack(): void + { + parent::rollBack(); + $this->transactionNestingLevel--; + } + + public function commit(): void + { + parent::commit(); + $this->transactionNestingLevel--; + + if ($this->transactionNestingLevel === 0) { + $this->getBackgroundQueue()->doPublishToBroker(); + } + } +} \ No newline at end of file diff --git a/src/Entity/BackgroundJob.php b/src/Entity/BackgroundJob.php index 8355092..1599072 100644 --- a/src/Entity/BackgroundJob.php +++ b/src/Entity/BackgroundJob.php @@ -2,6 +2,7 @@ namespace ADT\BackgroundQueue\Entity; +use ADT\Utils\Utils; use DateTime; use DateTimeImmutable; use Exception; @@ -44,6 +45,7 @@ final class BackgroundJob private ?int $priority; private string $callbackName; private $parameters; /** @see self::setParameters() */ + private $parameters_json = null; /** @see self::setParameters() */ private int $state = self::STATE_READY; private DateTimeImmutable $createdAt; private ?DateTimeImmutable $lastAttemptAt = null; @@ -134,19 +136,26 @@ public function setSerialGroup(?string $serialGroup): self public function getParameters(): array { $this->parameters = is_resource($this->parameters) ? stream_get_contents($this->parameters) : $this->parameters; - - if (substr($this->parameters, 0, 2) === 'a:') { + + if (!is_null($this->parameters)) { return unserialize($this->parameters); } - return json_decode($this->parameters, true); + $parametersJson = json_decode($this->parameters_json, true); + $parameters = []; + foreach ($parametersJson as $key => $value) { + $parameters[$key] = Utils::getDateTimeFromArray($value, true); + } + return $parameters; } /** * Parametry ukládá jako serializované pole nebo jako json. * Formát určuje parametr v BackgroundQueue `parametersFormat`. * - `serialize` => ukládá jako serializované pole a je bez omezení - * - `json` => parametry mohou obsahovat pouze skalární typy, pole a NULL + * - `json` => ukládá jako json + * - parametry mohou obsahovat pouze skalární typy, pole, NULL a \DateTimeInterface + * - pokud je nějaký z parametrů objekt, automaticky se použije "serialize" * * @param object|array|string|int|float|bool|null $parameters * @param string $parametersFormat @@ -156,17 +165,23 @@ public function setParameters($parameters, string $parametersFormat): self { $parameters = is_array($parameters) ? $parameters : [$parameters]; + if ($parametersFormat == self::PARAMETERS_FORMAT_JSON) { + foreach ($parameters as $parameter) { + if (!is_scalar($parameter) && !is_array($parameter) && !is_null($parameter) && !($parameter instanceof \DateTimeInterface)) { + $parametersFormat = self::PARAMETERS_FORMAT_SERIALIZE; + break; + } + } + } + switch ($parametersFormat) { case self::PARAMETERS_FORMAT_SERIALIZE: $this->parameters = serialize($parameters); + $this->parameters_json = null; break; case self::PARAMETERS_FORMAT_JSON: - foreach ($parameters as $idx => $parameter) { - if (!is_scalar($parameter) && !is_array($parameter) && !is_null($parameter)) { - throw new Exception("Unsupported type '" . gettype($parameter) . "' for \$parameters[$idx] using parametersFormat = " . self::PARAMETERS_FORMAT_JSON); - } - } - $this->parameters = json_encode($parameters); + $this->parameters = null; + $this->parameters_json = json_encode($parameters); break; default: throw new Exception("Unsupported parameters format: $parametersFormat"); @@ -327,6 +342,7 @@ public static function createEntity(array $values): self $entity->priority = $values['priority']; $entity->callbackName = $values['callback_name']; $entity->parameters = $values['parameters']; + $entity->parameters_json = $values['parameters_json']; $entity->state = $values['state']; $entity->createdAt = new DateTimeImmutable($values['created_at']); $entity->lastAttemptAt = $values['last_attempt_at'] ? new DateTimeImmutable($values['last_attempt_at']) : null; @@ -353,6 +369,7 @@ public function getDatabaseValues(): array 'priority' => $this->priority, 'callback_name' => $this->callbackName, 'parameters' => $this->parameters, + 'parameters_json' => $this->parameters_json, 'state' => $this->state, 'created_at' => $this->createdAt->format('Y-m-d H:i:s'), 'last_attempt_at' => $this->lastAttemptAt ? $this->lastAttemptAt->format('Y-m-d H:i:s') : null,