8000 Logical replication · postgrespro/testgres@ca5b546 · GitHub
[go: up one dir, main page]

Skip to content

Commit ca5b546

Browse files
committed
Logical replication
1 parent 5bc608e commit ca5b546

File tree

3 files changed

+282
-3
lines changed

3 files changed

+282
-3
lines changed

testgres/node.py

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,13 @@
4848
ExecUtilException, \
4949
QueryException, \
5050
StartNodeException, \
51-
TimeoutException
51+
TimeoutException, \
52+
InitNodeException
5253

5354
from .logger import TestgresLogger
5455

56+
from .pubsub import Publication, Subscription
57+
5558
from .utils import \
5659
eprint, \
5760
get_bin_path, \
@@ -278,6 +281,7 @@ def default_conf(self,
278281
fsync=False,
279282
unix_sockets=True,
280283
allow_streaming=True,
284+
allow_logical=False,
281285
log_statement='all'):
282286
"""
283287
Apply default settings to this node.
@@ -286,6 +290,7 @@ def default_conf(self,
286290
fsync: should this node use fsync to keep data safe?
287291
unix_sockets: should we enable UNIX sockets?
288292
allow_streaming: should this node add a hba entry for replication?
293+
allow_logical: can this node be used as a logical replication publisher?
289294
log_statement: one of ('all', 'off', 'mod', 'ddl').
290295
291296
Returns:
@@ -365,6 +370,12 @@ def get_auth_method(t):
365370
wal_keep_segments,
366371
wal_level))
367372

373+
if allow_logical:
374+
if not pg_version_ge('10'):
375+
raise InitNodeException("Logical replication is only "
376+
"available for Postgres 10 and newer")
377+
conf.write(u"wal_level = logical\n")
378+
368379
# disable UNIX sockets if asked to
369380
if not unix_sockets:
370381
conf.write(u"unix_socket_directories = ''\n")
@@ -751,7 +762,8 @@ def poll_query_until(self,
751762
expected=True,
752763
commit=True,
753764
raise_programming_error=True,
754-
raise_internal_error=True):
765+
raise_internal_error=True,
766+
zero_rows_is_ok=False):
755767
"""
756768
Run a query once per second until it returns 'expected'.
757769
Query should return a single value (1 row, 1 column).
@@ -788,7 +800,12 @@ def poll_query_until(self,
788800
raise QueryException('Query returned None', query)
789801

790802
if len(res) == 0:
791-
raise QueryException('Query returned 0 rows', query)
803+
if zero_rows_is_ok:
804+
time.sleep(sleep_time)
805+
attempts += 1
806+
continue
807+
else:
808+
raise QueryException('Query returned 0 rows', query)
792809

793810
if len(res[0]) == 0:
794811
raise QueryException('Query returned 0 columns', query)
@@ -902,6 +919,41 @@ def catchup(self, dbname=None, username=None):
902919
except Exception as e:
903920
raise_from(CatchUpException("Failed to catch up", poll_lsn), e)
904921

922+
def publish(self,
923+
pubname,
924+
tables=None,
925+
dbname=None,
926+
username=None):
927+
"""
928+
Create publication for logical replication
929+
930+
Args:
931+
pubname: publication name
932+
tables: tables names list
933+
dbname: database name where objects or interest are located
934+
username: replication username
935+
"""
936+
return Publication(pubname, self, tables, dbname, username)
937+
938+
def subscribe(self,
939+
publication,
940+
subname,
941+
dbname=None,
942+
username=None,
943+
**kwargs):
944+
"""
945+
Create subscription for logical replication
946+
947+
Args:
948+
subname: subscription name
949+
publication: publication object obtained from publish()
950+
951+
"""
952+
return Subscription(subname, self, publication,
953+
dbname=dbname,
954+
username=username,
955+
**kwargs)
956+
905957
def pgbench(self,
906958
dbname=None,
907959
username=None,

testgres/pubsub.py

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
# coding: utf-8
2+
3+
from six import raise_from
4+
5+
from .defaults import default_dbname, default_username
6+
from .exceptions import CatchUpException
7+
from .utils import pg_version_ge
8+
9+
10+
class Publication(object):
11+
def __init__(self, pubname, node, tables=None, dbname=None, username=None):
12+
"""
13+
Constructor
14+
15+
Args:
16+
pubname: publication name
17+
node: publisher's node
18+
tables: tables list or None for all tables
19+
dbname: database name used to connect and perform subscription
20+
username: username used to connect to the database
21+
"""
22+
self.name = pubname
23+
self.node = node
24+
self.dbname = dbname or default_dbname()
25+
self.username = username or default_username()
26+
27+
# create publication in database
28+
t = 'table ' + ', '.join(tables) if tables else 'all tables'
29+
query = "create publication {} for {}"
30+
node.safe_psql(query.format(pubname, t),
31+
dbname=dbname,
32+
username=username)
33+
34+
def close(self, dbname=None, username=None):
35+
"""
36+
Drop publication
37+
"""
38+
self.node.safe_psql("drop publication {}".format(self.name),
39+
dbname=dbname, username=username)
40+
41+
def add_tables(self, tables, dbname=None, username=None):
42+
"""
43+
Add tables
44+
45+
Args:
46+
tables: a list of tables to add to the publication
47+
"""
48+
if not tables:
49+
raise ValueError("Tables list is empty")
50+
51+
query = "alter publication {} add table {}"
52+
self.node.safe_psql(query.format(self.name, ', '.join(tables)),
53+
dbname=dbname or self.dbname,
54+
username=username or self.username)
55+
56+
57+
class Subscription(object):
58+
def __init__(self,
59+
subname,
60+
node,
61+
publication,
62+
dbname=None,
63+
username=None,
64+
**kwargs):
65+
"""
66+
Constructor
67+
68+
Args:
69+
subname: subscription name
70+
node: subscriber's node
71+
publication: Publication object we are subscribing to
72+
dbname: database name used to connect and perform subscription
73+
username: username used to connect to the database
74+
**kwargs: subscription parameters (see CREATE SUBSCRIPTION
75+
in PostgreSQL documentation for more information)
76+
"""
77+
self.name = subname
78+
self.node = node
79+
self.pub = publication
80+
81+
# connection info
82+
conninfo = (
83+
u"dbname={} user={} host={} port={}"
84+
).format(self.pub.dbname,
85+
self.pub.username,
86+
self.pub.node.host,
87+
self.pub.node.port)
88+
89+
query = (
90+
"create subscription {} connection '{}' publication {}"
91+
).format(subname, conninfo, self.pub.name)
92+
93+
# additional parameters
94+
if kwargs:
95+
params = ','.join('{}={}'.format(k, v) for k, v in kwargs.iteritems())
96+
query += " with ({})".format(params)
97+
98+
node.safe_psql(query, dbname=dbname, username=username)
99+
100+
def disable(self, dbname=None, username=None):
101+
"""
102+
Disables the running subscription.
103+
"""
104+
query = "alter subscription {} disable"
105+
self.node.safe_psql(query.format(self.name),
106+
dbname=None,
107+
username=None)
108+
109+
def enable(self, dbname=None, username=None):
110+
"""
111+
Enables the previously disabled subscription.
112+
"""
113+
query = "alter subscription {} enable"
114+
self.node.safe_psql(query.format(self.name),
115+
dbname=None,
116+
username=None)
117+
118+
def refresh(self, copy_data=True, dbname=None, username=None):
119+
"""
120+
Disables the running subscription.
121+
"""
122+
query = "alter subscription {} refresh publication with (copy_data={})"
123+
self.node.safe_psql(query.format(self.name, copy_data),
124+
dbname=dbname,
125+
username=username)
126+
127+
def close(self, dbname=None, username=None):
128+
"""
129+
Drops subscription
130+
"""
131+
self.node.safe_psql("drop subscription {}".format(self.name),
132+
dbname=dbname, username=username)
133+
134+
def catchup(self, username=None):
135+
"""
136+
Wait until subscription catches up with publication.
137+
138+
Args:
139+
username: remote node's user name
140+
"""
141+
if pg_version_ge('10'):
142+
query = (
143+
"select pg_current_wal_lsn() - replay_lsn = 0 "
144+
"from pg_stat_replication where application_name = '{}'"
145+
).format(self.name)
146+
else:
147+
query = (
148+
"select pg_current_xlog_location() - replay_location = 0 "
149+
"from pg_stat_replication where application_name = '{}'"
150+
).format(self.name)
151+
152+
try:
153+
# wait until this LSN reaches subscriber
154+
self.pub.node.poll_query_until(
155+
query=query,
156+
dbname=self.pub.dbname,
157+
username=username or self.pub.username,
158+
max_attempts=60,
159+
zero_rows_is_ok=True) # statistics may have not updated yet
160+
except Exception as e:
161+
raise_from(CatchUpException("Failed to catch up", query), e)

tests/test_simple.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,72 @@ def test_replicate(self):
382382
res = node.execute('select * from test')
383383
self.assertListEqual(res, [])
384384

385+
def test_logical_replication(self):
386+
with get_new_node() as node1, get_new_node() as node2:
387+
node1.init(allow_logical=True)
388+
node1.start()
389+
node2.init().start()
390+
391+
create_table = 'create table test (a int, b int)'
392+
node1.safe_psql(create_table)
393+
node2.safe_psql(create_table)
394+
395+
# create publication / create subscription
396+
pub = node1.publish('mypub')
397+
sub = node2.subscribe(pub, 'mysub')
398+
399+
node1.safe_psql('insert into test values (1, 1), (2, 2)')
400+
401+
# wait until changes apply on subscriber and check them
402+
sub.catchup()
403+
res = node2.execute('select * from test')
404+
self.assertListEqual(res, [(1, 1), (2, 2)])
405+
406+
# disable and put some new data
407+
sub.disable()
408+
node1.safe_psql('insert into test values (3, 3)')
409+
410+
# enable and ensure that data successfully transfered
411+
sub.enable()
412+
sub.catchup()
413+
res = node2.execute('select * from test')
414+
self.assertListEqual(res, [(1, 1), (2, 2), (3, 3)])
415+
416+
# Add new tables. Since we added "all tables" to publication
417+
# (default behaviour of publish() method) we don't need
418+
# to explicitely perform pub.add_table()
419+
create_table = 'create table test2 (c char)'
420+
node1.safe_psql(create_table)
421+
node2.safe_psql(create_table)
422+
sub.refresh()
423+
424+
# put new data
425+
node1.safe_psql('insert into test2 values (\'a\'), (\'b\')')
426+
sub.catchup()
427+
res = node2.execute('select * from test2')
428+
self.assertListEqual(res, [('a',), ('b',)])
429+
430+
# drop subscription
431+
sub.close()
432+
pub.close()
433+
434+
# create new publication and subscription for specific table
435+
# (ommitting copying data as it's already done)
436+
pub = node1.publish('newpub', tables=['test'])
437+
sub = node2.subscribe(pub, 'newsub', copy_data=False)
438+
439+
node1.safe_psql('insert into test values (4, 4)')
440+
sub.catchup()
441+
res = node2.execute('select * from test')
442+
self.assertListEqual(res, [(1, 1), (2, 2), (3, 3), (4, 4)])
443+
444+
# explicitely add table
445+
pub.add_tables(['test2'])
446+
node1.safe_psql('insert into test2 values (\'c\')')
447+
sub.catchup()
448+
res = node2.execute('select * from test2')
449+
self.assertListEqual(res, [('a',), ('b',)])
450+
385451
def test_incorrect_catchup(self):
386452
with get_new_node() as node:
387453
node.init(allow_streaming=True).start()

0 commit comments

Comments
 (0)
0