8000 timestamp tests cleanup · postgrespro/postgres_cluster@2eeb0f3 · GitHub
[go: up one dir, main page]

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

8000
Appearance settings

Commit 2eeb0f3

Browse files
committed
timestamp tests cleanup
1 parent c95b0f4 commit 2eeb0f3

File tree

11 files changed

+496
-1839
lines changed

11 files changed

+496
-1839
lines changed
Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
---
22
- hosts: nodes[1]
3-
roles:
4-
- role: postgrespro
5-
deploy_dtm: true
3+
# roles:
4+
# - role: postgrespro
5+
# deploy_dtm: true
66

77
- hosts: nodes
88
roles:
99
- role: postgrespro
1010
pg_port: 15432
1111
deploy_postgres: true
1212
pg_dtm_enable: true
13-
pg_dtm_enable: false
14-
# pg_config_role:
15-
# - line: "dtm.buffer_size = 65536"
16-
pg_dtm_host: "{{ groups['nodes'][0] }}"
13+
pg_config_role:
14+
- line: "dtm.vacuum_delay = 1"
15+
# pg_dtm_host: "{{ groups['nodes'][0] }}"
16+
1717

contrib/pg_dtm/tests/deploy_layouts/roles/postgrespro/tasks/postgres.yml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,18 @@
77
- name: remove dtm.so
88
shell: rm -f {{pg_dst}}/lib/pg_dtm.so
99

10-
- name: build dtm extension
11-
shell: make clean && make && make install
12-
args:
13-
chdir: "{{pg_src}}/contrib/pg_dtm"
14-
creates: "{{pg_dst}}/lib/pg_dtm.so"
15-
16-
# - name: build ts-dtm extension
10+
# - name: build dtm extension
1711
# shell: make clean && make && make install
1812
# args:
19-
# chdir: "{{pg_src}}/contrib/pg_tsdtm"
13+
# chdir: "{{pg_src}}/contrib/pg_dtm"
2014
# creates: "{{pg_dst}}/lib/pg_dtm.so"
2115

16+
- name: build ts-dtm extension
17+
shell: make clean && make && make install
18+
args:
19+
chdir: "{{pg_src}}/contrib/pg_tsdtm"
20+
creates: "{{pg_dst}}/lib/pg_dtm.so"
21+
2222
- stat: path={{pg_datadir}}/postmaster.pid
2323
register: pg_pidfile
2424

contrib/pg_dtm/tests/deploy_layouts/single.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
- hosts: nodes[-1]
33
roles:
44

5-
- role: postgrespro
6-
deploy_dtm: true
5+
# - role: postgrespro
6+
# deploy_dtm: true
77

88
- role: postgrespro
99
deploy_postgres: true

contrib/pg_dtm/tests/farms/sai

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
158.250.29.6 ansible_ssh_user=cluster offset=4001
88
158.250.29.8 ansible_ssh_user=cluster offset=2001
99
158.250.29.9 ansible_ssh_user=cluster offset=1001
10-
158.250.29.10 ansible_ssh_user=cluster offset=1
10+
#158.250.29.10 ansible_ssh_user=cluster offset=1
1111

1212
[master]
1313
158.250.29.10 ansible_ssh_user=cluster offset=1
Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"flag"
6+
"os"
7+
"sync"
8+
"time"
9+
"github.com/jackc/pgx"
10+
)
11+
12+
type ConnStrings []string
13+
14+
var backend interface{
15+
prepare(connstrs []string)
16+
writer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup)
17+
reader(wg *sync.WaitGroup, cFetches chan int, inconsistency *bool)
18+
}
19+
20+
var cfg struct {
21+
ConnStrs ConnStrings
22+
Backend string
23+
Verbose bool
24+
UseDtm bool
25+
Init bool
26+
Parallel bool
27+
Isolation string
28+
AccountsNum int
29+
ReadersNum int
30+
IterNum int
31+
32+
Writers struct {
33+
Num int
34+
StartId int
35+
}
36+
}
37+
38+
// The first method of flag.Value interface
39+
func (c *ConnStrings) String() string {
40+< 10000 /span>
if len(*c) > 0 {
41+
return (*c)[0]
42+
} else {
43+
return ""
44+
}
45+
}
46+
47+
// The second method of flag.Value interface
48+
func (c *ConnStrings) Set(value string) error {
49+
*c = append(*c, value)
50+
return nil
51+
}
52+
53+
func append_with_comma(s *string, x string) {
54+
if len(*s) > 0 {
55+
*s = *s + ", " + x
56+
} else {
57+
*s = x
58+
}
59+
}
60+
61+
func dump_cfg() {
62+
fmt.Printf("Connections: %d\n", len(cfg.ConnStrs))
63+
for _, cs := range cfg.ConnStrs {
64+
fmt.Printf(" %s\n", cs)
65+
}
66+
fmt.Printf("Isolation: %s\n", cfg.Isolation)
67+
fmt.Printf(
68+
"Accounts: %d × $%d\n",
69+
cfg.AccountsNum, 0,
70+
)
71+
fmt.Printf("Readers: %d\n", cfg.ReadersNum)
72+
73+
fmt.Printf(
74+
"Writers: %d × %d updates\n",
75+
cfg.Writers.Num, cfg.IterNum,
76+
)
77+
}
78+
79+
func init() {
80+
flag.StringVar(&cfg.Backend, "b", "transfers",
81+
"Backend to use. Default to 'transfers'")
82+
flag.Var(&cfg.ConnStrs, "C",
83+
"Connection string (repeat for multiple connections)")
84+
flag.BoolVar(&cfg.Init, &qu F42D ot;i", false,
85+
"Init database")
86+
flag.BoolVar(&cfg.UseDtm, "g", false,
87+
"Use DTM to keep global consistency")
88+
flag.IntVar(&cfg.AccountsNum, "a", 100000,
89+
"The number of bank accounts")
90+
flag.IntVar(&cfg.Writers.StartId, "s", 0,
91+
"StartID. Script will update rows starting from this value")
92+
flag.IntVar(&cfg.IterNum, "n", 10000,
93+
"The number updates each writer (reader in case of Reades backend) performs")
94+
flag.IntVar(&cfg.ReadersNum, "r", 1,
95+
"The number of readers")
96+
flag.IntVar(&cfg.Writers.Num, "w", 8,
97+
"The number of writers")
98+
flag.BoolVar(&cfg.Verbose, "v", false,
99+
"Show progress and other stuff for mortals")
100+
flag.BoolVar(&cfg.Parallel, "p", false,
101+
"Use parallel execs")
102+
repread := flag.Bool("l", false,
103+
"Use 'repeatable read' isolation level instead of 'read committed'")
104+
flag.Parse()
105+
106+
if len(cfg.ConnStrs) == 0 {
107+
flag.PrintDefaults()
108+
os.Exit(1)
109+
}
110+
111+
if cfg.AccountsNum < 2 {
112+
fmt.Println(
113+
"There should be at least 2 accounts (to avoid deadlocks)",
114+
)
115+
os.Exit(1)
116+
}
117+
118+
if *repread {
119+
cfg.Isolation = "repeatable read"
120+
} else {
121+
cfg.Isolation = "read committed"
122+
}
123+
124+
dump_cfg()
125+
}
126+
127+
func main() {
128+
if len(cfg.ConnStrs) < 2 {
129+
fmt.Println("ERROR: This test needs at leas two connections")
130+
os.Exit(1)
131+
}
132+
133+
// switch cfg.Backend {
134+
// case "transfers":
135+
// backend = new(Transfers)
136+
// case "fdw":
137+
// backend = new(TransfersFDW)
138+
// case "readers":
139+
// backend = new(Readers)
140+
// case "pgshard":
141+
// backend = new(TransfersPgShard)
142+
// default:
143+
// fmt.Println("No backend named: '%s'\n", cfg.Backend)
144+
// return
145+
// }
146+
backend = new(TransfersTS)
147+
148+
start := time.Now()
149+
150+
if (cfg.Init){
151+
backend.prepare(cfg.ConnStrs)
152+
fmt.Printf("database prepared in %0.2f seconds\n", time.Since(start).Seconds())
153+
return
154+
}
155+
156+
var writerWg sync.WaitGroup
157+
var readerWg sync.WaitGroup
158+
159+
cCommits := make(chan int)
160+
cFetches:= make(chan int)
161+
cAborts := make(chan int)
162+
163+
go progress(cfg.Writers.Num * cfg.IterNum, cCommits, cAborts)
164+
165+
start = time.Now()
166+
writerWg.Add(cfg.Writers.Num)
167+
for i := 0; i < cfg.Writers.Num; i++ {
168+
go backend.writer(i, cCommits, cAborts, &writerWg)
169+
}
170+
running = true
171+
172+
inconsistency := false
173+
readerWg.Add(cfg.ReadersNum)
174+
for i := 0; i < cfg.ReadersNum; i++ {
175+
go backend.reader(&readerWg, cFetches, &inconsistency)
176+
}
177+
178+
writerWg.Wait()
179+
running = false
180+
readerWg.Wait()
181+
182+
fmt.Printf("writers finished in %0.2f seconds\n",
183+
time.Since(start).Seconds())
184+
fmt.Printf("TPS = %0.2f\n",
185+
float64(cfg.Writers.Num*cfg.IterNum)/time.Since(start).Seconds())
186+
187+
if inconsistency {
188+
fmt.Printf("INCONSISTENCY DETECTED\n")
189+
}
190+
fmt.Printf("done.\n")
191+
}
192+
193+
var running = false
194+
195+
func asyncCommit(conn *pgx.Conn, wg *sync.WaitGroup) {
196+
exec(conn, "commit")
197+
wg.Done()
198+
}
199+
200+
func commit(conns ...*pgx.Conn) {
201+
var wg sync.WaitGroup
202+
wg.Add(len(conns))
203+
for _, conn := range conns {
204+
go asyncCommit(conn, &wg)
205+
}
206+
wg.Wait()
207+
}
208+
209+
func parallel_exec(conns []*pgx.Conn, requests []string) bool {
210+
var wg sync.WaitGroup
211+
state := true
212+
wg.Add(len(conns))
213+
for i := range conns {
214+
if cfg.Parallel {
215+
go func(j int) {
216+
_, err := conns[j].Exec(requests[j])
217+
if err != nil {
218+
state = false
219+
}
220+
wg.Done()
221+
}(i)
222+
} else {
223+
_, err := conns[i].Exec(requests[i])
224+
if err != nil {
225+
state = false
226+
}
227+
wg.Done()
228+
}
229+
}
230+
wg.Wait()
231+
return state
232+
}
233+
234+
func progress(total int, cCommits chan int, cAborts chan int) {
235+
commits := 0
236+
aborts := 0
237+
start := time.Now()
238+
for newcommits := range cCommits {
239+
newaborts := <-cAborts
240+
commits += newcommits
241+
aborts += newaborts
242+
if time.Since(start).Seconds() > 1 {
243+
if cfg.Verbose {
244+
fmt.Printf(
245+
"progress %0.2f%%: %d commits, %d aborts\n",
246+
float32(commits) * 100.0 / float32(total), commits, aborts,
247+
)
248+
}
249+
start = time.Now()
250+
}
251+
}
252+
}
253+
254+
func exec(conn *pgx.Conn, stmt string, arguments ...interface{}) {
255+
var err error
256+
// fmt.Println(stmt)
257+
_, err = conn.Exec(stmt, arguments... )
258+
checkErr(err)
259+
}
260+
261+
func execUpdate(conn *pgx.Conn, stmt string, arguments ...interface{}) bool {
262+
var err error
263+
// fmt.Println(stmt)
264+
_, err = conn.Exec(stmt, arguments... )
265+
//if err != nil {
266+
// fmt.Println(err)
267+
//}
268+
return err == nil
269+
}
270+
271+
func execQuery(conn *pgx.Conn, stmt string, arguments ...interface{}) int32 {
272+
var err error
273+
var result int32
274+
err = conn.QueryRow(stmt, arguments...).Scan(&result)
275+
checkErr(err)
276+
return result
277+
}
278+
279+
func execQuery64(conn *pgx.Conn, stmt string, arguments ...interface{}) int64 {
280+
var err error
281+
var result int64
282+
err = conn.QueryRow(stmt, arguments...).Scan(&result)
283+
checkErr(err)
284+
return result
285+
}
286+
287+
func checkErr(err error) {
288+
if err != nil {
289+
panic(err)
290+
}
291+
}
292+
293+
// vim: expandtab ts=4 sts=4 sw=4

0 commit comments

Comments
 (0)
0