8000 rework transfers.go logic for one node · postgrespro/postgres_cluster@7a19770 · GitHub
[go: up one dir, main page]

Skip to content

Commit 7a19770

Browse files
committed
rework transfers.go logic for one node
1 parent 9ebaab8 commit 7a19770

File tree

1 file changed

+82
-154
lines changed

1 file changed

+82
-154
lines changed

contrib/pg_dtm/tests/transfers.go

Lines changed: 82 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ var cfg struct {
3434
UseDtm bool
3535
InitOnly bool
3636
SkipInit bool
37+
Parallel bool
3738

3839
Isolation string // "repeatable read" or "read committed"
3940

@@ -49,6 +50,7 @@ var cfg struct {
4950
Writers struct {
5051
Num int
5152
Updates int
53+
StartId int
5254
AllowGlobal bool
5355
AllowLocal bool
5456
PrivateRows bool
@@ -105,14 +107,16 @@ func init() {
105107
flag.IntVar(&cfg.Readers.Num, "r", 1, "The number of readers")
106108
flag.IntVar(&cfg.Writers.Num, "w", 8, "The number of writers")
107109
flag.IntVar(&cfg.Writers.Updates, "u", 10000, "The number updates each writer performs")
110+
flag.IntVar(&cfg.Writers.StartId, "k", 0, "Script will update rows starting from this value")
108111
flag.BoolVar(&cfg.Verbose, "v", false, "Show progress and other stuff for mortals")
109112
flag.BoolVar(&cfg.UseDtm, "m", false, "Use DTM to keep global consistency")
110113
flag.BoolVar(&cfg.Writers.AllowGlobal, "g", false, "Allow global updates")
111114
flag.BoolVar(&cfg.Writers.AllowLocal, "l", false, "Allow local updates")
112115
flag.BoolVar(&cfg.Writers.PrivateRows, "p", false, "Private rows (avoid waits/aborts caused by concurrent updates of the same rows)")
113116
flag.BoolVar(&cfg.Writers.UseCursors, "c", false, "Use cursors for updates")
114-
flag.BoolVar(&cfg.InitOnly, "f", false, "Only feed databses with data")
117+
flag.BoolVar(&cfg.InitOnly, "f", false, "Only feed databases with data")
115118
flag.BoolVar(&cfg.SkipInit, "s", false, "Skip init phase")
119+
flag.BoolVar(&cfg.Parallel, "o", false, "Use parallel execs")
116120
flag.Parse()
117121

118122
if len(cfg.ConnStrs) == 0 {
@@ -205,6 +209,31 @@ func commit(conns ...*pgx.Conn) {
205209
wg.Wait()
206210
}
207211

212+
func parallel_exec(conns []*pgx.Conn, requests []string) bool {
213+
var wg sync.WaitGroup
214+
state := true
215+
wg.Add(len(conns))
216+
for i := range conns {
217+
if cfg.Parallel {
218+
go func(j int) {
219+
_, err := conns[j].Exec(requests[j])
220+
if err != nil {
221+
state = false
222+
}
223+
wg.Done()
224+
}(i)
225+
} else {
226+
_, err := conns[i].Exec(requests[i])
227+
if err != nil {
228+
state = false
229+
}
230+
wg.Done()
231+
}
232+
}
233+
wg.Wait()
234+
return state
235+
}
236+
208237
func prepare_one(connstr string, wg *sync.WaitGroup) {
209238
dbconf, err := pgx.ParseDSN(connstr)
210239
checkErr(err)
@@ -221,23 +250,7 @@ func prepare_one(connstr string, wg *sync.WaitGroup) {
221250
exec(conn, "drop table if exists t")
222251
exec(conn, "create table t(u int primary key, v int)")
223252
exec(conn, "insert into t (select generate_series(0,$1-1), $2)", cfg.Accounts.Num, cfg.Accounts.Balance)
224-
/*
225-
exec(conn, "begin transaction isolation level " + cfg.Isolation)
226253

227-
start := time.Now()
228-
for i := 0; i < cfg.Accounts.Num; i++ {
229-
exec(conn, "insert into t values ($1, $2)", i, cfg.Accounts.Balance)
230-
if time.Since(start).Seconds() > 1 {
231-
if cfg.Verbose {
232-
fmt.Printf(
233-
"inserted %0.2f%%: %d of %d records\n",
234-
float32(i + 1) * 100.0 / float32(cfg.Accounts.Num), i + 1, cfg.Accounts.Num,
235-
)
236-
}
237-
start = time.Now()
238-
}
239-
}
240-
*/
241254
exec(conn, "commit")
242255
wg.Done()
243256
}
@@ -278,6 +291,10 @@ func writer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
278291

279292
var conns []*pgx.Conn
280293

294+
if len(cfg.ConnStrs) == 1 {
295+
cfg.ConnStrs.Set(cfg.ConnStrs[0])
296+
}
297+
281298
for _, connstr := range cfg.ConnStrs {
282299
dbconf, err := pgx.ParseDSN(connstr)
283300
checkErr(err)
@@ -293,153 +310,62 @@ func writer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
293310
for myCommits < cfg.Writers.Updates {
294311
amount := 1
295312

296-
from_acc := rand.Intn(cfg.Accounts.Num)
297-
to_acc := rand.Intn(cfg.Accounts.Num)
298-
299-
if cfg.Writers.PrivateRows {
300-
from_acc += id - (from_acc % cfg.Writers.Num)
301-
to_acc += id - (to_acc % cfg.Writers.Num)
302-
if (from_acc == to_acc) {
303-
to_acc = (from_acc + cfg.Writers.Num) % cfg.Accounts.Num
304-
}
305-
} else {
306-
if (from_acc == to_acc) {
307-
to_acc = (from_acc + 1) % cfg.Accounts.Num
308-
}
309-
}
310-
311-
if (from_acc > to_acc) {
312-
from_acc, to_acc = to_acc, from_acc
313-
}
313+
from_acc := cfg.Writers.StartId + 2*id + 1
314+
to_acc := cfg.Writers.StartId + 2*id + 2
314315

315316
src := conns[rand.Intn(len(conns))]
316317
dst := conns[rand.Intn(len(conns))]
317-
318318
if src == dst {
319-
if cfg.Writers.AllowLocal {
320-
// local update
321-
exec(src, "begin transaction isolation level " + cfg.Isolation)
322-
ok1 := execUpdate(src, "update t set v = v - $1 where u=$2", amount, from_acc)
323-
ok2 := execUpdate(src, "update t set v = v + $1 where u=$2", amount, to_acc)
324-
if !ok1 || !ok2 {
325-
exec(src, "rollback")
326-
nAborts += 1
327-
} else {
328-
exec(src, "commit")
329-
nCommits += 1
330-
myCommits += 1
331-
}
332-
} else {
333-
if len(conns) > 1 {
334-
continue
335-
}
336-
337-
// global single-node update
338-
if cfg.UseDtm {
339-
execQuery(src, "select dtm_begin_transaction()")
340-
}
319+
continue
320+
}
341321

342-
// start transaction
343-
exec(src, "begin transaction isolation level " + cfg.Isolation)
344-
345-
ok := true
346-
if (cfg.Writers.UseCursors) {
347-
exec(
348-
src,
349-
"declare cur0 cursor for select * from t where u=$1 for update",
350-
from_acc,
351-
)
352-
353-
ok = execUpdate(src, "fetch from cur0") && ok
354-
355-
ok = execUpdate(
356-
src, "update t set v = v - $1 where current of cur0",
357-
amount,
358-
) && ok
359-
ok = execUpdate(
360-
src, "update t set v = v + $1 where current of cur0",
361-
amount,
362-
) && ok
363-
} else {
364-
ok = execUpdate(
365-
src, "update t set v = v - $1 where u=$2",
366-
amount, from_acc,
367-
) && ok
368-
ok = execUpdate(
369-
src, "update t set v = v + $1 where u=$2",
370-
amount, to_acc,
371-
) && ok
372-
}
322+
if cfg.UseDtm {
323+
xid := execQuery(src, "select dtm_begin_transaction()")
324+
exec(dst, "select dtm_join_transaction($1)", xid)
325+
}
373326

374-
if ok {
375-
commit(src)
376-
nCommits += 1
377-
myCommits += 1
378-
} else {
379-
exec(src, "rollback")
380-
nAborts += 1
381-
}
382-
}
327+
parallel_exec([]*pgx.Conn{src,dst}, []string{"begin transaction isolation level " + cfg.Isolation, "begin transaction isolation level " + cfg.Isolation})
328+
329+
ok := true
330+
if (cfg.Writers.UseCursors) {
331+
exec(
332+
src,
333+
"declare cur0 cursor for select * from t where u=$1 for update",
334+
from_acc,
335+
)
336+
exec(
337+
dst,
338+
"declare cur0 cursor for select * from t where u=$1 for update",
339+
to_acc,
340+
)
341+
342+
ok = execUpdate(src, "fetch from cur0") && ok
343+
ok = execUpdate(dst, "fetch from cur0") && ok
344+
345+
ok = execUpdate(
346+
src, "update t set v = v - $1 where current of cur0",
347+
amount,
348+
) && ok
349+
ok = execUpdate(
350+
dst, "update t set v = v + $1 where current of cur0",
351+
amount,
352+
) && ok
383353
} else {
384-
// global update
385-
if !cfg.Writers.AllowGlobal {
386-
// which we do not want
387-
continue
388-
}
389-
390-
if cfg.UseDtm {
391-
xid := execQuery(src, "select dtm_begin_transaction()")
392-
exec(dst, "select dtm_join_transaction($1)", xid)
393-
}
394354

395-
// start transaction
396-
exec(src, "begin transaction isolation level " + cfg.Isolation)
397-
exec(dst, "begin transaction isolation level " + cfg.Isolation)
398-
399-
ok := true
400-
if (cfg.Writers.UseCursors) {
401-
exec(
402-
src,
403-
"declare cur0 cursor for select * from t where u=$1 for update",
404-
from_acc,
405-
)
406-
exec(
407-
dst,
408-
"declare cur0 cursor for select * from t where u=$1 for update",
409-
to_acc,
410-
)
355+
sql1 := fmt.Sprintf("update t set v = v - %d where u=%d", amount, from_acc)
356+
sql2 := fmt.Sprintf("update t set v = v + %d where u=%d", amount, to_acc)
411357

412-
ok = execUpdate(src, "fetch from cur0") && ok
413-
ok = execUpdate(dst, "fetch from cur0") && ok
414-
415-
ok = execUpdate(
416-
src, "update t set v = v - $1 where current of cur0",
417-
amount,
418-
) && ok
419-
ok = execUpdate(
420-
dst, "update t set v = v + $1 where current of cur0",
421-
amount,
422-
) && ok
423-
} else {
424-
ok = execUpdate(
425-
src, "update t set v = v - $1 where u=$2",
426-
amount, from_acc,
427-
) && ok
428-
ok = execUpdate(
429-
dst, "update t set v = v + $1 where u=$2",
430-
amount, to_acc,
431-
) && ok
432-
}
358+
ok = parallel_exec([]*pgx.Conn{src,dst}, []string{sql1,sql2})
359+
}
433360

434-
if ok {
435-
commit(src, dst)
436-
nCommits += 1
437-
myCommits += 1
438-
} else {
439-
exec(src, "rollback")
440-
exec(dst, "rollback")
441-
nAborts += 1
442-
}
361+
if ok {
362+
commit(src, dst)
363+
nCommits += 1
364+
myCommits += 1
365+
} else {
366+
exec(src, "rollback")
367+
exec(dst, "rollback")
368+
nAborts += 1
443369
}
444370

445371
if time.Since(start).Seconds() > 1 {
@@ -471,6 +397,8 @@ func reader(wg *sync.WaitGroup, inconsistency *bool) {
471397
conns = append(conns, conn)
472398
}
473399

400+
401+
474402
for running {
475403
var sum int64 = 0
476404
var xid int32

0 commit comments

Comments
 (0)
0