8000 [Messenger] Add support for PostgreSQL LISTEN/NOTIFY · thlbaut/symfony@01f33c3 · GitHub
[go: up one dir, main page]

Skip to content

Commit 01f33c3

Browse files
dunglasfabpot
authored andcommitted
[Messenger] Add support for PostgreSQL LISTEN/NOTIFY
1 parent a1e4222 commit 01f33c3

File tree

7 files changed

+206
-19
lines changed
  • src/Symfony/Component/Messenger/Bridge/Doctrine
    • < 8000 div class="PRIVATE_VisuallyHidden prc-TreeView-TreeViewVisuallyHidden-4-mPv" aria-hidden="true" id=":RmvtddabH1:">
      CHANGELOG.md
  • Tests/Transport
  • Transport
  • 7 files changed

    +206
    -19
    lines changed

    src/Symfony/Component/Messenger/Bridge/Doctrine/CHANGELOG.md

    Lines changed: 1 addition & 0 deletions
    8000
    Original file line numberDiff line numberDiff line change
    @@ -5,3 +5,4 @@ CHANGELOG
    55
    -----
    66

    77
    * Introduced the Doctrine bridge.
    8+
    * Added support for PostgreSQL `LISTEN`/`NOTIFY`.

    src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportFactoryTest.php

    Lines changed: 2 additions & 1 deletion
    Original file line numberDiff line numberDiff line change
    @@ -18,6 +18,7 @@
    1818
    use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection;
    1919
    use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport;
    2020
    use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransportFactory;
    21+
    use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection;
    2122
    use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
    2223

    2324
    class DoctrineTransportFactoryTest extends TestCase
    @@ -49,7 +50,7 @@ public function testCreateTransport()
    4950
    $serializer = $this->createMock(SerializerInterface::class);
    5051

    5152
    $this->assertEquals(
    52-
    new DoctrineTransport(new Connection(Connection::buildConfiguration('doctrine://default'), $driverConnection), $serializer),
    53+
    new DoctrineTransport(new Connection(PostgreSqlConnection::buildConfiguration('doctrine://default'), $driverConnection), $serializer),
    5354
    $factory->createTransport('doctrine://default', [], $serializer)
    5455
    );
    5556
    }
    Lines changed: 46 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -0,0 +1,46 @@
    1+
    <?php
    2+
    3+
    /*
    4+
    * This file is part of the Symfony package.
    5+
    *
    6+
    * (c) Fabien Potencier <fabien@symfony.com>
    7+
    *
    8+
    * For the full copyright and license information, please view the LICENSE
    9+
    * file that was distributed with this source code.
    10+
    */
    11+
    12+
    namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport;
    13+
    14+
    use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
    15+
    use PHPUnit\Framework\TestCase;
    16+
    use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection;
    17+
    18+
    /**
    19+
    * @author Kévin Dunglas <dunglas@gmail.com>
    20+
    */
    21+
    class PostgreSqlConnectionTest extends TestCase
    22+
    {
    23+
    public function testSerialize()
    24+
    {
    25+
    $this->expectException(\BadMethodCallException::class);
    26+
    $this->expectExceptionMessage('Cannot serialize '.PostgreSqlConnection::class);
    27+
    28+
    $schemaSynchronizer = $this->createMock(SchemaSynchronizer::class);
    29+
    $driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class);
    30+
    31+
    $connection = new PostgreSqlConnection([], $driverConnection, $schemaSynchronizer);
    32+
    serialize($connection);
    33+
    }
    34+
    35+
    public function testUnserialize()
    36+
    {
    37+
    $this->expectException(\BadMethodCallException::class);
    38+
    $this->expectExceptionMessage('Cannot unserialize '.PostgreSqlConnection::class);
    39+
    40+
    $schemaSynchronizer = $this->createMock(SchemaSynchronizer::class);
    41+
    $driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class);
    42+
    43+
    $connection = new PostgreSqlConnection([], $driverConnection, $schemaSynchronizer);
    44+
    $connection->__wakeup();
    45+
    }
    46+
    }

    src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php

    Lines changed: 26 additions & 14 deletions
    Original file line numberDiff line numberDiff line change
    @@ -22,15 +22,17 @@
    2222
    use Doctrine\DBAL\Types\Type;
    2323
    use Symfony\Component\Messenger\Exception\InvalidArgumentException;
    2424
    use Symfony\Component\Messenger\Exception\TransportException;
    25+
    use Symfony\Contracts\Service\ResetInterface;
    2526

    2627
    /**
    27-
    * @author Vincent Touzet <vincent.touzet@gmail.com>
    28+
    * @internal since Symfony 5.1
    2829
    *
    29-
    * @final
    30+
    * @author Vincent Touzet <vincent.touzet@gmail.com>
    31+
    * @author Kévin Dunglas <dunglas@gmail.com>
    3032
    */
    31-
    class Connection
    33+
    class Connection implements ResetInterface
    3234
    {
    33-
    private const DEFAULT_OPTIONS = [
    35+
    protected const DEFAULT_OPTIONS = [
    3436
    'table_name' => 'messenger_messages',
    3537
    'queue_name' => 'default',
    3638
    'redeliver_timeout' => 3600,
    @@ -45,22 +47,28 @@ class Connection
    4547
    * * table_name: name of the table
    4648
    * * connection: name of the Doctrine's entity manager
    4749
    * * queue_name: name of the queue
    48-
    * * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default 3600
    49-
    * * auto_setup: Whether the table should be created automatically during send / get. Default : true
    50+
    * * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default: 3600
    51+
    * * auto_setup: Whether the table should be created automatically during send / get. Default: true
    5052
    */
    51-
    private $configuration = [];
    52-
    private $driverConnection;
    53+
    protected $configuration = [];
    54+
    protected $driverConnection;
    55+
    protected $queueEmptiedAt;
    5356
    private $schemaSynchronizer;
    5457
    private $autoSetup;
    5558

    5659
    public function __construct(array $configuration, DBALConnection $driverConnection, SchemaSynchronizer $schemaSynchronizer = null)
    5760
    {
    58-
    $this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration);
    61+
    $this->configuration = array_replace_recursive(static::DEFAULT_OPTIONS, $configuration);
    5962
    $this->driverConnection = $driverConnection;
    6063
    $this->schemaSynchronizer = $schemaSynchronizer ?? new SingleDatabaseSynchronizer($this->driverConnection);
    6164
    $this->autoSetup = $this->configuration['auto_setup'];
    6265
    }
    6366

    67+
    public function reset()
    68+
    {
    69+
    $this->queueEmptiedAt = null;
    70+
    }
    71+
    6472
    public function getConfiguration(): array
    6573
    {
    6674
    return $this->configuration;
    @@ -78,20 +86,20 @@ public static function buildConfiguration(string $dsn, array $options = []): arr
    7886
    }
    7987

    8088
    $configuration = ['connection' => $components['host']];
    81-
    $configuration += $options + $query + self::DEFAULT_OPTIONS;
    89+
    $configuration += $options + $query + static::DEFAULT_OPTIONS;
    8290

    8391
    $configuration['auto_setup'] = filter_var($configuration['auto_setup'], FILTER_VALIDATE_BOOLEAN);
    8492

    8593
    // check for extra keys in options
    86-
    $optionsExtraKeys = array_diff(array_keys($options), array_keys(self::DEFAULT_OPTIONS));
    94+
    $optionsExtraKeys = array_diff(array_keys($options), array_keys(static::DEFAULT_OPTIONS));
    8795
    if (0 < \count($optionsExtraKeys)) {
    88-
    throw new InvalidArgumentException(sprintf('Unknown option found : [%s]. Allowed options are [%s]', implode(', ', $optionsExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS))));
    96+
    throw new InvalidArgumentException(sprintf('Unknown option found : [%s]. Allowed options are [%s]', implode(', ', $optionsExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS))));
    8997
    }
    9098

    9199
    // check for extra keys in options
    92-
    $queryExtraKeys = array_diff(array_keys($query), array_keys(self::DEFAULT_OPTIONS));
    100+
    $queryExtraKeys = array_diff(array_keys($query), array_keys(static::DEFAULT_OPTIONS));
    93101
    if (0 < \count($queryExtraKeys)) {
    94-
    throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s]', implode(', ', $queryExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS))));
    102+
    throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s]', implode(', ', $queryExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS))));
    95103
    }
    96104

    97105
    return $configuration;
    @@ -154,9 +162,13 @@ public function get(): ?array
    154162

    155163
    if (false === $doctrineEnvelope) {
    156164
    $this->driverConnection->commit();
    165+
    $this->queueEmptiedAt = microtime(true) * 1000;
    157166

    158167
    return null;
    159168
    }
    169+
    // Postgres can "group" notifications having the same channel and payload
    170+
    // We need to be sure to empty the queue before blocking again
    171+
    $this->queueEmptiedAt = null;
    160172

    161173
    $doctrineEnvelope = $this->decodeEnvelopeHeaders($doctrineEnvelope);
    162174

    src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransportFactory.php

    Lines changed: 9 additions & 3 deletions
    Original file line numberDiff line numberDiff line change
    @@ -36,16 +36,22 @@ public function __construct($registry)
    3636

    3737
    public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
    3838
    {
    39-
    unset($options['transport_name']);
    40-
    $configuration = Connection::buildConfiguration($dsn, $options);
    39+
    $useNotify = ($options['use_notify'] ?? true);
    40+
    unset($options['transport_name'], $options['use_notify']);
    41+
    // Always allow PostgreSQL-specific keys, to be able to transparently fallback to the native driver when LISTEN/NOTIFY isn't available
    42+
    $configuration = PostgreSqlConnection::buildConfiguration($dsn, $options);
    4143

    4244
    try {
    4345
    $driverConnection = $this->registry->getConnection($configuration['connection']);
    4446
    } catch (\InvalidArgumentException $e) {
    4547
    throw new TransportException(sprintf('Could not find Doctrine connection from Messenger DSN "%s".', $dsn), 0, $e);
    4648
    }
    4749

    48-
    $connection = new Connection($configuration, $driverConnection);
    50+
    if ($useNotify && method_exists($driverConnection->getWrappedConnection(), 'pgsqlGetNotify')) {
    51+
    $connection = new PostgreSqlConnection($configuration, $driverConnection);
    52+
    } else {
    53+
    $connection = new Connection($configuration, $driverConnection);
    54+
    }
    4955

    5056
    return new DoctrineTransport($connection, $serializer);
    5157
    }
    Lines changed: 120 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -0,0 +1,120 @@
    1+
    <?php
    2+
    3+
    /*
    4+
    * This file is part of the Symfony package.
    5+
    *
    6+
    * (c) Fabien Potencier <fabien@symfony.com>
    7+
    *
    8+
    * For the full copyright and license information, please view the LICENSE
    9+
    * file that was distributed with this source code.
    10+
    */
    11+
    12+
    declare(strict_types=1);
    13+
    14+
    namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
    15+
    16+
    /**
    17+
    * Uses PostgreSQL LISTEN/NOTIFY to push messages to workers.
    18+
    *
    19+
    * @internal
    20+
    * @final
    21+
    *
    22+
    * @author Kévin Dunglas <dunglas@gmail.com>
    23+
    */
    24+
    class PostgreSqlConnection extends Connection
    25+
    {
    26+
    /**
    27+
    * * use_notify: Set to false to disable the use of LISTEN/NOTIFY. Default: true
    28+
    * * check_delayed_interval: The interval to check for delayed messages, in milliseconds. Set to 0 to disable checks. Default: 1000
    29+
    * * get_notify_timeout: The length of time to wait for a response when calling PDO::pgsqlGetNotify, in milliseconds. Default: 0.
    30+
    */
    31+
    protected const DEFAULT_OPTIONS = parent::DEFAULT_OPTIONS + [
    32+
    'check_delayed_interval' => 1000,
    33+
    'get_notify_timeout' => 0,
    34+
    ];
    35+
    36+
    private $listening = false;
    37+
    38+
    public function __sleep()
    39+
    {
    40+
    throw new \BadMethodCallException('Cannot serialize '.__CLASS__);
    41+
    }
    42+
    43+
    public function __wakeup()
    44+
    {
    45+
    throw new \BadMethodCallException('Cannot unserialize '.__CLASS__);
    46+
    }
    47+
    48+
    public function __destruct()
    49+
    {
    50+
    $this->unlisten();
    51+
    }
    52+
    53+
    public function reset()
    54+
    {
    55+
    parent::reset();
    56+
    $this->unlisten();
    57+
    }
    58+
    59+
    public function get(): ?array
    60+
    {
    61+
    if (null === $this->queueEmptiedAt) {
    62+
    return parent::get();
    63+
    }
    64+
    65+
    if (!$this->listening) {
    66+
    // This is secure because the table name must be a valid identifier:
    67+
    // https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
    68+
    $this->driverConnection->exec(sprintf('LISTEN "%s"', $this->configuration['table_name']));
    69+
    $this->listening = true;
    70+
    }
    71+
    72+
    $notification = $this->driverConnection->getWrappedConnection()->pgsqlGetNotify(\PDO::FETCH_ASSOC, $this->configuration['get_notify_timeout']);
    73+
    if (
    74+
    // no notifications, or for another table or queue
    75+
    (false === $notification || $notification['message'] !== $this->configuration['table_name'] || $notification['payload'] !== $this->configuration['queue_name']) &&
    76+
    // delayed messages
    77+
    (microtime(true) * 1000 - $this->queueEmptiedAt < $this->configuration['check_delayed_interval'])
    78+
    ) {
    79+
    return null;
    80+
    }
    81+
    82+
    return parent::get();
    83+
    }
    84+
    85+
    public function setup(): void
    86+
    {
    87+
    parent::setup();
    88+
    89+
    $sql = sprintf(<<<'SQL'
    90+
    LOCK TABLE %1$s;
    91+
    -- create trigger function
    92+
    CREATE OR REPLACE FUNCTION notify_%1$s() RETURNS TRIGGER AS $$
    93+
    BEGIN
    94+
    PERFORM pg_notify('%1$s', NEW.queue_name::text);
    95+
    RETURN NEW;
    96+
    END;
    97+
    $$ LANGUAGE plpgsql;
    98+
    99+
    -- register trigger
    100+
    DROP TRIGGER IF EXISTS notify_trigger ON %1$s;
    101+
    102+
    CREATE TRIGGER notify_trigger
    103+
    AFTER INSERT
    104+
    ON %1$s
    105+
    FOR EACH ROW EXECUTE PROCEDURE notify_%1$s();
    106+
    SQL
    107+
    , $this->configuration['table_name']);
    108+
    $this->driverConnection->exec($sql);
    109+
    }
    110+
    111+
    private function unlisten()
    112+
    {
    113+
    if (!$this->listening) {
    114+
    return;
    115+
    }
    116+
    117+
    $this->driverConnection->exec(sprintf('UNLISTEN "%s"', $this->configuration['table_name']));
    118+
    $this->listening = false;
    119+
    }
    120+
    }

    src/Symfony/Component/Messenger/Bridge/Doctrine/composer.json

    Lines changed: 2 additions & 1 deletion
    Original file line numberDiff line numberDiff line change
    @@ -19,7 +19,8 @@
    1919
    "php": "^7.2.5",
    2020
    "doctrine/dbal": "^2.6",
    2121
    "doctrine/persistence": "^1.3",
    22-
    "symfony/messenger": "^5.1"
    22+
    "symfony/messenger": "^5.1",
    23+
    "symfony/service-contracts": "^1.1|^2"
    2324
    },
    2425
    "require-dev": {
    2526
    "symfony/serializer": "^4.4|^5.0",

    0 commit comments

    Comments
     (0)
    0