8000 Trigger-based variant · symfony/symfony@5207f51 · GitHub
[go: up one dir, main page]

Skip to content

Commit 5207f51

Browse files
committed
Trigger-based variant
1 parent 5363561 commit 5207f51

File tree

1 file changed

+35
-43
lines changed

1 file changed

+35
-43
lines changed

src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php

Lines changed: 35 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -136,49 +136,19 @@ public function send(string $body, array $headers, int $delay = 0): string
136136
'available_at' => '?',
137137
]);
138138

139-
$send = function () use ($queryBuilder, $body, $headers, $now, $availableAt) {
140-
$this->executeQuery($queryBuilder->getSQL(), [
141-
$body,
142-
json_encode($headers),
143-
$this->configuration['queue_name'],
144-
$now,
145-
$availableAt,
146-
], [
147-
null,
148-
null,
149-
null,
150-
Type::DATETIME,
151-
Type::DATETIME,
152-
]);
153-
};
154-
155-
if ($this->usePgsqlNotify) {
156-
if (0 !== $delay) {
157-
throw new InvalidArgumentException('Delays are not supported when using PostgreSQL NOTIFY, set the "pgsql_get_notify" option to false to disable this feature. You can also create a dedicated queue for delayed message not using NOTIFY.');
158-
}
159-
160-
$sendAndNotify = function () use ($send) {
161-
$this->driverConnection->beginTransaction();
162-
$send();
163-
$sth = $this->driverConnection->prepare('SELECT pg_notify(?, ?)');
164-
$sth->execute([$this->configuration['table_name'], $this->configuration['queue_name']]);
165-
$this->driverConnection->commit();
166-
};
167-
168-
try {
169-
$sendAndNotify();
170-
} catch (TableNotFoundException $e) {
171-
$this->driverConnection->rollBack();
172-
173-
// create table
174-
if ($this->autoSetup) {
175-
$this->setup();
176-
}
177-
$sendAndNotify();
178-
}
179-
} else {
180-
$send();
181-
}
139+
$this->executeQuery($queryBuilder->getSQL(), [
140+
$body,
141+
json_encode($headers),
142+
$this->configuration['queue_name'],
143+
$now,
144+
$availableAt,
145+
], [
146+
null,
147+
null,
148+
null,
149+
Type::DATETIME,
150+
Type::DATETIME,
151+
]);
182152

183153
return $this->driverConnection->lastInsertId();
184154
}
@@ -287,6 +257,28 @@ public function setup(): void
287257
$this->driverConnection->getConfiguration()->setFilterSchemaAssetsExpression($assetFilter);
288258
}
289259

260+
if ($this->usePgsqlNotify) {
261+
$sql = sprintf(<<<'SQL'
262+
-- create trigger function
263+
CREATE OR REPLACE FUNCTION notify_%1$s() RETURNS TRIGGER AS $$
264+
BEGIN
265+
PERFORM pg_notify('%1$s', NEW.queue_name::text);
266+
RETURN NEW;
267+
END;
268+
$$ LANGUAGE plpgsql;
269+
270+
-- register trigger
271+
DROP TRIGGER IF EXISTS notify_trigger ON %1$s;
272+
273+
CREATE TRIGGER notify_trigger
274+
AFTER INSERT
275+
ON %1$s
276+
FOR EACH ROW EXECUTE PROCEDURE notify_%1$s();
277+
SQL
278+
, $this->configuration['table_name']);
279+
$this->driverConnection->exec($sql);
280+
}
281+
290282
$this->autoSetup = false;
291283
}
292284

0 commit comments

Comments
 (0)
0