forked from Ilhasoft/courier
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbatch_test.go
110 lines (87 loc) · 2.79 KB
/
batch_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package batch
import (
"fmt"
"sync"
"testing"
"time"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
"github.com/stretchr/testify/assert"
)
type Label struct {
ID int `db:"id"`
Label string `db:"label"`
}
func (l *Label) RowID() string {
if l.ID != 0 {
return fmt.Sprintf("%d", l.ID)
}
return ""
}
func TestBatchInsert(t *testing.T) {
db := sqlx.MustConnect("postgres", "postgres://courier:courier@localhost:5432/courier_test?sslmode=disable")
db.MustExec("DROP TABLE IF EXISTS labels;")
db.MustExec("CREATE TABLE labels(id serial primary key, label text not null unique);")
var callbackErr error
callback := func(err error, value Value) {
callbackErr = err
}
committer := NewCommitter("labels", db, "INSERT INTO labels(label) VALUES(:label);", time.Millisecond*250, &sync.WaitGroup{}, callback)
committer.Start()
defer committer.Stop()
committer.Queue(&Label{0, "label1"})
committer.Queue(&Label{0, "label2"})
committer.Queue(&Label{0, "label3"})
time.Sleep(time.Second)
assert.NoError(t, callbackErr)
count := 0
db.Get(&count, "SELECT count(*) FROM labels;")
assert.Equal(t, 3, count)
committer.Queue(&Label{0, "label4"})
committer.Queue(&Label{0, "label3"})
time.Sleep(time.Second)
assert.Error(t, callbackErr)
assert.Equal(t, `labels: error comitting value: error during bulk insert: pq: duplicate key value violates unique constraint "labels_label_key"`, callbackErr.Error())
db.Get(&count, "SELECT count(*) FROM labels;")
assert.Equal(t, 4, count)
}
func TestBatchUpdate(t *testing.T) {
db := sqlx.MustConnect("postgres", "postgres://courier:courier@localhost:5432/courier_test?sslmode=disable")
db.MustExec("DROP TABLE IF EXISTS labels;")
db.MustExec("CREATE TABLE labels(id serial primary key, label text not null unique);")
db.MustExec("INSERT INTO labels(label) VALUES('label1'), ('label2'), ('label3');")
var callbackErr error
callback := func(err error, value Value) {
callbackErr = err
}
committer := NewCommitter("labels", db, `
UPDATE
labels
SET
label = l.status
FROM
(VALUES(:id, :label))
AS
l(id, status)
WHERE
labels.id = l.id::int;
`, time.Millisecond*250, &sync.WaitGroup{}, callback)
committer.Queue(&Label{1, "label01"})
committer.Queue(&Label{2, "label02"})
committer.Queue(&Label{1, "label001"})
committer.Queue(&Label{3, "label03"})
committer.Start()
defer committer.Stop()
time.Sleep(time.Second)
assert.NoError(t, callbackErr)
count := 0
db.Get(&count, "SELECT count(*) FROM labels;")
assert.Equal(t, 3, count)
label := ""
db.Get(&label, "SELECT label FROM labels WHERE id = 1;")
assert.Equal(t, "label001", label)
db.Get(&label, "SELECT label FROM labels WHERE id = 2;")
assert.Equal(t, "label02", label)
db.Get(&label, "SELECT label FROM labels WHERE id = 3;")
assert.Equal(t, "label03", label)
}