@@ -34,6 +34,7 @@ var cfg struct {
34
34
UseDtm bool
35
35
InitOnly bool
36
36
SkipInit bool
37
+ Parallel bool
37
38
38
39
Isolation string // "repeatable read" or "read committed"
39
40
@@ -49,6 +50,7 @@ var cfg struct {
49
50
Writers struct {
50
51
Num int
51
52
Updates int
53
+ StartId int
52
54
AllowGlobal bool
53
55
AllowLocal bool
54
56
PrivateRows bool
@@ -105,14 +107,16 @@ func init() {
105
107
flag .IntVar (& cfg .Readers .Num , "r" , 1 , "The number of readers" )
106
108
flag .IntVar (& cfg .Writers .Num , "w" , 8 , "The number of writers" )
107
109
flag .IntVar (& cfg .Writers .Updates , "u" , 10000 , "The number updates each writer performs" )
110
+ flag .IntVar (& cfg .Writers .StartId , "k" , 0 , "Script will update rows starting from this value" )
108
111
flag .BoolVar (& cfg .Verbose , "v" , false , "Show progress and other stuff for mortals" )
109
112
flag .BoolVar (& cfg .UseDtm , "m" , false , "Use DTM to keep global consistency" )
110
113
flag .BoolVar (& cfg .Writers .AllowGlobal , "g" , false , "Allow global updates" )
111
114
flag .BoolVar (& cfg .Writers .AllowLocal , "l" , false , "Allow local updates" )
112
115
flag .BoolVar (& cfg .Writers .PrivateRows , "p" , false , "Private rows (avoid waits/aborts caused by concurrent updates of the same rows)" )
113
116
flag .BoolVar (& cfg .Writers .UseCursors , "c" , false , "Use cursors for updates" )
114
- flag .BoolVar (& cfg .InitOnly , "f" , false , "Only feed databses with data" )
117
+ flag .BoolVar (& cfg .InitOnly , "f" , false , "Only feed databases with data" )
115
118
flag .BoolVar (& cfg .SkipInit , "s" , false , "Skip init phase" )
119
+ flag .BoolVar (& cfg .Parallel , "o" , false , "Use parallel execs" )
116
120
flag .Parse ()
117
121
118
122
if len (cfg .ConnStrs ) == 0 {
@@ -205,6 +209,31 @@ func commit(conns ...*pgx.Conn) {
205
209
wg .Wait ()
206
210
}
207
211
212
+ func parallel_exec (conns []* pgx.Conn , requests []string ) bool {
213
+ var wg sync.WaitGroup
214
+ state := true
215
+ wg .Add (len (conns ))
216
+ for i := range conns {
217
+ if cfg .Parallel {
218
+ go func (j int ) {
219
+ _ , err := conns [j ].Exec (requests [j ])
220
+ if err != nil {
221
+ state = false
222
+ }
223
+ wg .Done ()
224
+ }(i )
225
+ } else {
226
+ _ , err := conns [i ].Exec (requests [i ])
227
+ if err != nil {
228
+ state = false
229
+ }
230
+ wg .Done ()
231
+ }
232
+ }
233
+ wg .Wait ()
234
+ return state
235
+ }
236
+
208
237
func prepare_one (connstr string , wg * sync.WaitGroup ) {
209
238
dbconf , err := pgx .ParseDSN (connstr )
210
239
checkErr (err )
@@ -221,23 +250,7 @@ func prepare_one(connstr string, wg *sync.WaitGroup) {
221
250
exec (conn , "drop table if exists t" )
222
251
exec (conn , "create table t(u int primary key, v int)" )
223
252
exec (conn , "insert into t (select generate_series(0,$1-1), $2)" , cfg .Accounts .Num , cfg .Accounts .Balance )
224
- /*
225
- exec(conn, "begin transaction isolation level " + cfg.Isolation)
226
253
227
- start := time.Now()
228
- for i := 0; i < cfg.Accounts.Num; i++ {
229
- exec(conn, "insert into t values ($1, $2)", i, cfg.Accounts.Balance)
230
- if time.Since(start).Seconds() > 1 {
231
- if cfg.Verbose {
232
- fmt.Printf(
233
- "inserted %0.2f%%: %d of %d records\n",
234
- float32(i + 1) * 100.0 / float32(cfg.Accounts.Num), i + 1, cfg.Accounts.Num,
235
- )
236
- }
237
- start = time.Now()
238
- }
239
- }
240
- */
241
254
exec (conn , "commit" )
242
255
wg .Done ()
243
256
}
@@ -278,6 +291,10 @@ func writer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
278
291
279
292
var conns []* pgx.Conn
280
293
294
+ if len (cfg .ConnStrs ) == 1 {
295
+ cfg .ConnStrs .Set (cfg .ConnStrs [0 ])
296
+ }
297
+
281
298
for _ , connstr := range cfg .ConnStrs {
282
299
dbconf , err := pgx .ParseDSN (connstr )
283
300
checkErr (err )
@@ -293,153 +310,62 @@ func writer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
293
310
for myCommits < cfg .Writers .Updates {
294
311
amount := 1
295
312
296
- from_acc := rand .Intn (cfg .Accounts .Num )
297
- to_acc := rand .Intn (cfg .Accounts .Num )
298
-
299
- if cfg .Writers .PrivateRows {
300
- from_acc += id - (from_acc % cfg .Writers .Num )
301
- to_acc += id - (to_acc % cfg .Writers .Num )
302
- if (from_acc == to_acc ) {
303
- to_acc = (from_acc + cfg .Writers .Num ) % cfg .Accounts .Num
304
- }
305
- } else {
306
- if (from_acc == to_acc ) {
307
- to_acc = (from_acc + 1 ) % cfg .Accounts .Num
308
- }
309
- }
310
-
311
- if (from_acc > to_acc ) {
312
- from_acc , to_acc = to_acc , from_acc
313
- }
313
+ from_acc := cfg .Writers .StartId + 2 * id + 1
314
+ to_acc := cfg .Writers .StartId + 2 * id + 2
314
315
315
316
src := conns [rand .Intn (len (conns ))]
316
317
dst := conns [rand .Intn (len (conns ))]
317
-
318
318
if src == dst {
319
- if cfg .Writers .AllowLocal {
320
- // local update
321
- exec (src , "begin transaction isolation level " + cfg .Isolation )
322
- ok1 := execUpdate (src , "update t set v = v - $1 where u=$2" , amount , from_acc )
323
- ok2 := execUpdate (src , "update t set v = v + $1 where u=$2" , amount , to_acc )
324
- if ! ok1 || ! ok2 {
325
- exec (src , "rollback" )
326
- nAborts += 1
327
- } else {
328
- exec (src , "commit" )
329
- nCommits += 1
330
- myCommits += 1
331
- }
332
- } else {
333
- if len (conns ) > 1 {
334
- continue
335
- }
336
-
337
- // global single-node update
338
- if cfg .UseDtm {
339
- execQuery (src , "select dtm_begin_transaction()" )
340
- }
319
+ continue
320
+ }
341
321
342
- // start transaction
343
- exec (src , "begin transaction isolation level " + cfg .Isolation )
344
-
345
- ok := true
346
- if (cfg .Writers .UseCursors ) {
347
- exec (
348
- src ,
349
- "declare cur0 cursor for select * from t where u=$1 for update" ,
350
- from_acc ,
351
- )
352
-
353
- ok = execUpdate (src , "fetch from cur0" ) && ok
354
-
355
- ok = execUpdate (
356
- src , "update t set v = v - $1 where current of cur0" ,
357
- amount ,
358
- ) && ok
359
- ok = execUpdate (
360
- src , "update t set v = v + $1 where current of cur0" ,
361
- amount ,
362
- ) && ok
363
- } else {
364
- ok = execUpdate (
365
- src , "update t set v = v - $1 where u=$2" ,
366
- amount , from_acc ,
367
- ) && ok
368
- ok = execUpdate (
369
- src , "update t set v = v + $1 where u=$2" ,
370
- amount , to_acc ,
371
- ) && ok
372
- }
322
+ if cfg .UseDtm {
323
+ xid := execQuery (src , "select dtm_begin_transaction()" )
324
+ exec (dst , "select dtm_join_transaction($1)" , xid )
325
+ }
373
326
374
- if ok {
375
- commit (src )
376
- nCommits += 1
377
- myCommits += 1
378
- } else {
379
- exec (src , "rollback" )
380
- nAborts += 1
381
- }
382
- }
327
+ parallel_exec ([]* pgx.Conn {src ,dst }, []string {"begin transaction isolation level " + cfg .Isolation , "begin transaction isolation level " + cfg .Isolation })
328
+
329
+ ok := true
330
+ if (cfg .Writers .UseCursors ) {
331
+ exec (
332
+ src ,
333
+ "declare cur0 cursor for select * from t where u=$1 for update" ,
334
+ from_acc ,
335
+ )
336
+ exec (
337
+ dst ,
338
+ "declare cur0 cursor for select * from t where u=$1 for update" ,
339
+ to_acc ,
340
+ )
341
+
342
+ ok = execUpdate (src , "fetch from cur0" ) && ok
343
+ ok = execUpdate (dst , "fetch from cur0" ) && ok
344
+
345
+ ok = execUpdate (
346
+ src , "update t set v = v - $1 where current of cur0" ,
347
+ amount ,
348
+ ) && ok
349
+ ok = execUpdate (
350
+ dst , "update t set v = v + $1 where current of cur0" ,
351
+ amount ,
352
+ ) && ok
383
353
} else {
384
- // global update
385
- if ! cfg .Writers .AllowGlobal {
386
- // which we do not want
387
- continue
388
- }
389
-
390
- if cfg .UseDtm {
391
- xid := execQuery (src , "select dtm_begin_transaction()" )
392
- exec (dst , "select dtm_join_transaction($1)" , xid )
393
- }
394
354
395
- // start transaction
396
- exec (src , "begin transaction isolation level " + cfg .Isolation )
397
- exec (dst , "begin transaction isolation level " + cfg .Isolation )
398
-
399
- ok := true
400
- if (cfg .Writers .UseCursors ) {
401
- exec (
402
- src ,
403
- "declare cur0 cursor for select * from t where u=$1 for update" ,
404
- from_acc ,
405
- )
406
- exec (
407
- dst ,
408
- "declare cur0 cursor for select * from t where u=$1 for update" ,
409
- to_acc ,
410
- )
355
+ sql1 := fmt .Sprintf ("update t set v = v - %d where u=%d" , amount , from_acc )
356
+ sql2 := fmt .Sprintf ("update t set v = v + %d where u=%d" , amount , to_acc )
411
357
412
- ok = execUpdate (src , "fetch from cur0" ) && ok
413
- ok = execUpdate (dst , "fetch from cur0" ) && ok
414
-
415
- ok = execUpdate (
416
- src , "update t set v = v - $1 where current of cur0" ,
417
- amount ,
418
- ) && ok
419
- ok = execUpdate (
420
- dst , "update t set v = v + $1 where current of cur0" ,
421
- amount ,
422
- ) && ok
423
- } else {
424
- ok = execUpdate (
425
- src , "update t set v = v - $1 where u=$2" ,
426
- amount , from_acc ,
427
- ) && ok
428
- ok = execUpdate (
429
- dst , "update t set v = v + $1 where u=$2" ,
430
- amount , to_acc ,
431
- ) && ok
432
- }
358
+ ok = parallel_exec ([]* pgx.Conn {src ,dst }, []string {sql1 ,sql2 })
359
+ }
433
360
434
- if ok {
435
- commit (src , dst )
436
- nCommits += 1
437
- myCommits += 1
438
- } else {
439
- exec (src , "rollback" )
440
- exec (dst , "rollback" )
441
- nAborts += 1
442
- }
361
+ if ok {
362
+ commit (src , dst )
363
+ nCommits += 1
364
+ myCommits += 1
365
+ } else {
366
+ exec (src , "rollback" )
367
+ exec (dst , "rollback" )
368
+ nAborts += 1
443
369
}
444
370
445
371
if time .Since (start ).Seconds () > 1 {
@@ -471,6 +397,8 @@ func reader(wg *sync.WaitGroup, inconsistency *bool) {
471
397
conns = append (conns , conn )
472
398
}
473
399
400
+
401
+
474
402
for running {
475
403
var sum int64 = 0
476
404
var xid int32
0 commit comments