10000 Process NOTIFY events in reception order. · wulczer/txpostgres@66cd467 · GitHub
[go: up one dir, main page]

Skip to content

Commit 66cd467

Browse files
committed
Process NOTIFY events in reception order.
Repeatedly popping them from the notify list caused observer functions to be called in reverse order. Instead, iterate over the list directly and clear it out at the and. Noticed by Max Walton, though I didn't use his patch. Fixes #31.
1 parent f019075 commit 66cd467

File tree

3 files changed

+42
-2
lines changed

3 files changed

+42
-2
lines changed

NEWS

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ What's new in txpostgres 1.3.0
55
(while still prefering straingh psycopg2)
66
- allow choosing the exact psycopg2 implementation using an
77
environment variable
8+
- NOTIFY events are now processed in reception order (thanks to
9+
Max Walton)
810

911
What's new in txpostgres 1.2.0
1012
------------------------------

test/test_txpostgres.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1278,6 +1278,43 @@ def observer(notify):
12781278
return d.addCallback(lambda _: self.assertEquals(
12791279
len(self.notifies), 3))
12801280

1281+
def test_notifyDeliveryOrder(self):
1282+
"""
1283+
Notifications are delivered to observers in the same order that they
1284+
were sent.
1285+
"""
1286+
# this tests accesses NOTIFY payloads, so it requires PostgreSQL 9.0+
1287+
# and Psycopg 2.3+
1288+
if self.conn.server_version < 90000:
1289+
raise unittest.SkipTest(
1290+
"PostgreSQL < 9.0.0 does not support NOTIFY payloads")
1291+
1292+
if getattr(psycopg2.extensions.Notify, 'payload', None) is None:
1293+
raise unittest.SkipTest(
1294+
"psycopg2 does not have NOTIFY payload support. You need at "
1295+
"least version 2.3.0 of psycopg2 to process NOTIFY payloads.")
1296+
1297+
dl = [defer.Deferred() for _ in range(10)]
1298+
payloads = map(str, range(10))
1299+
notifyD = defer.DeferredList(dl)
1300+
1301+
def observer(notify):
1302+
self.notifies.append(notify)
1303+
dl.pop().callback(None)
1304+
1305+
self.conn.addNotifyObserver(observer)
1306+
1307+
d = self.conn.runOperation("listen txpostgres_test")
1308+
# send all notification together to ensure that they are processed
1309+
# inside a single checkForNotifies call
1310+
d.addCallback(lambda _: self.notifyconn.runOperation(
1311+
'notify txpostgres_test, %s; ' * 10, payloads))
1312+
# wait for all notification to be processed
1313+
d.addCallback(lambda _: notifyD)
1314+
# check that they were processed in the right order
1315+
return d.addCallback(lambda _: self.assertEquals(
1316+
[n.payload for n in self.notifies], payloads))
1317+
12811318
def test_multipleObservers(self):
12821319
"""
12831320
Multiple registered notify observers each get notified.

txpostgres/txpostgres.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -695,8 +695,7 @@ def checkForNotifies(self):
695695
self.cooperator.cooperate(self._checkForNotifies())
696696

697697
def _checkForNotifies(self):
698-
while self._connection.notifies:
699-
notify = self._connection.notifies.pop()
698+
for notify in self._connection.notifies:
700699
# don't iterate over self._notifyObservers directly because the
701700
# observer function might call removeNotifyObserver, thus modifying
702701
# the set while it's being iterated
@@ -708,6 +707,8 @@ def _checkForNotifies(self):
708707
# observers
709708
yield defer.maybeDeferred(observer, notify).addErrback(log.err)
710709

710+
del self._connection.notifies[:]
711+
711712
def addNotifyObserver(self, observer):
712713
"""
713714
Add an observer function that will get called whenever a :pg:`NOTIFY

0 commit comments

Comments
 (0)
0