8000 Add cursors using to transfers test. · postgrespro/postgres_cluster@aaec649 · GitHub
[go: up one dir, main page]

Skip to content

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit aaec649

Browse files
committed
Add cursors using to transfers test.
1 parent a69772f commit aaec649

File tree

3 files changed

+78
-19
lines changed

3 files changed

+78
-19
lines changed

contrib/pg_dtm/pg_dtm--1.0.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,7 @@ LANGUAGE C;
1616
CREATE FUNCTION dtm_get_current_snapshot_xmax() RETURNS integer
1717
AS 'MODULE_PATHNAME','dtm_get_current_snapshot_xmax'
1818
LANGUAGE C;
19+
20+
CREATE FUNCTION dtm_get_current_snapshot_xcnt() RETURNS integer
21+
AS 'MODULE_PATHNAME','dtm_get_current_snapshot_xcnt'
22+
LANGUAGE C;

contrib/pg_dtm/pg_dtm.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,7 @@ PG_FUNCTION_INFO_V1(dtm_begin_transaction);
775775
PG_FUNCTION_INFO_V1(dtm_join_transaction);
776776
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xmax);
777777
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xmin);
778+
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xcnt);
778779

779780
Datum
780781
dtm_get_current_snapshot_xmin(PG_FUNCTION_ARGS)
@@ -788,6 +789,12 @@ dtm_get_current_snapshot_xmax(PG_FUNCTION_ARGS)
788789
PG_RETURN_INT32(CurrentTransactionSnapshot->xmax);
789790
}
790791

792+
Datum
793+
dtm_get_current_snapshot_xcnt(PG_FUNCTION_ARGS)
794+
{
795+
PG_RETURN_INT32(CurrentTransactionSnapshot->xcnt);
796+
}
797+
791798
Datum
792799
dtm_begin_transaction(PG_FUNCTION_ARGS)
793800
{

contrib/pg_dtm/tests/transfers.go

Lines changed: 67 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ const (
1515
N_ACCOUNTS = 100000
1616
//ISOLATION_LEVEL = "repeatable read"
1717
ISOLATION_LEVEL = "read committed"
18+
GLOBAL_UPDATES = true
19+
LOCAL_UPDATES = false
20+
CURSORS = false
1821
)
1922

2023

@@ -134,28 +137,71 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
134137
amount := 1
135138
account1 := rand.Intn(N_ACCOUNTS)
136139
account2 := rand.Intn(N_ACCOUNTS)
140+
srci := rand.Intn(2)
141+
dsti := rand.Intn(2)
142+
if (srci > dsti) {
143+
srci, dsti = dsti, srci
144+
}
137145

138-
src := conn[0]
139-
dst := conn[1]
146+
src := conn[srci]
147+
dst := conn[dsti]
140148

141-
xid = execQuery(src, "select dtm_begin_transaction()")
142-
exec(dst, "select dtm_join_transaction($1)", xid)
149+
if src == dst {
150+
// local update
151+
if !LOCAL_UPDATES {
152+
// which we do not want
153+
continue
154+
}
143155

144-
// start transaction
145-
exec(src, "begin transaction isolation level " + ISOLATION_LEVEL)
146-
exec(dst, "begin transaction isolation level " + ISOLATION_LEVEL)
156+
exec(src, "begin transaction isolation level " + ISOLATION_LEVEL)
157+
ok1 := execUpdate(src, "update t set v = v - $1 where u=$2", amount, account1)
158+
ok2 := execUpdate(src, "update t set v = v + $1 where u=$2", amount, account2)
159+
if !ok1 || !ok2 {
160+
exec(src, "rollback")
161+
nAborts += 1
162+
} else {
163+
exec(src, "commit")
164+
nCommits += 1
165+
myCommits += 1
166+
}
167+
} else {
168+
// global update
169+
if !GLOBAL_UPDATES {
170+
// which we do not want
171+
continue
172+
}
147173

148-
ok1 := execUpdate(src, "update t set v = v - $1 where u=$2", amount, account1)
149-
ok2 := execUpdate(dst, "update t set v = v + $1 where u=$2", amount, account2)
174+
xid = execQuery(src, "select dtm_begin_transaction()")
175+
exec(dst, "select dtm_join_transaction($1)", xid)
150176

151-
if !ok1 || !ok2 {
152-
exec(src, "rollback")
153-
exec(dst, "rollback")
154-
nAborts += 1
155-
} else {
156-
commit(src, dst)
157-
nCommits += 1
158-
myCommits += 1
177+
// start transaction
178+
exec(src, "begin transaction isolation level " + ISOLATION_LEVEL)
179+
exec(dst, "begin transaction isolation level " + ISOLATION_LEVEL)
180+
181+
ok := true
182+
if (CURSORS) {
183+
exec(src, "declare cur0 cursor for select * from t where u=$1 for update", account1)
184+
exec(dst, "declare cur0 cursor for select * from t where u=$1 for update", account2)
185+
186+
ok = execUpdate(src, "fetch from cur0") && ok
187+
ok = execUpdate(dst, "fetch from cur0") && ok
188+
189+
ok = execUpdate(src, "update t set v = v - $1 where current of cur0", amount) && ok
190+
ok = execUpdate(dst, "update t set v = v + $1 where current of cur0", amount) && ok
191+
} else {
192+
ok = execUpdate(src, "update t set v = v - $1 where u=$2", amount, account1) && ok
193+
ok = execUpdate(dst, "update t set v = v + $1 where u=$2", amount, account2) && ok
194+
}
195+
196+
if ok {
197+
commit(src, dst)
198+
nCommits += 1
199+
myCommits += 1
200+
} else {
201+
exec(src, "rollback")
202+
exec(dst, "rollback")
203+
nAborts += 1
204+
}
159205
}
160206

161207
if time.Since(start).Seconds() > 1 {
@@ -197,11 +243,13 @@ func inspect(wg *sync.WaitGroup) {
197243
if (sum != prevSum) {
198244
xmin1 := execQuery(conn1, "select dtm_get_current_snapshot_xmin()")
199245
xmax1 := execQuery(conn1, "select dtm_get_current_snapshot_xmax()")
246+
xcnt1 := execQuery(conn1, "select dtm_get_current_snapshot_xcnt()")
200247
xmin2 := execQuery(conn2, "select dtm_get_current_snapshot_xmin()")
201248
xmax2 := execQuery(conn2, "select dtm_get_current_snapshot_xmax()")
249+
xcnt2 := execQuery(conn2, "select dtm_get_current_snapshot_xcnt()")
202250
fmt.Printf(
203-
"Total=%d xid=%d snap1=[%d, %d) snap2=[%d, %d)\n",
204-
sum, xid, xmin1, xmax1, xmin2, xmax2,
251+
"Total=%d xid=%d snap1=[%d, %d){%d} snap2=[%d, %d){%d}\n",
252+
sum, xid, xmin1, xmax1, xcnt1, xmin2, xmax2, xcnt2,
205253
)
206254
prevSum = sum
207255
}

0 commit comments

Comments
 (0)
0