Skip to content

Commit 2eeb0f3

Browse files
committed
timestamp tests cleanup
1 parent c95b0f4 commit 2eeb0f3

File tree

11 files changed

+496
-1839
lines changed

11 files changed

+496
-1839
lines changed
Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
---
22
- hosts: nodes[1]
3-
roles:
4-
- role: postgrespro
5-
deploy_dtm: true
3+
# roles:
4+
# - role: postgrespro
5+
# deploy_dtm: true
66

77
- hosts: nodes
88
roles:
99
- role: postgrespro
1010
pg_port: 15432
1111
deploy_postgres: true
1212
pg_dtm_enable: true
13-
pg_dtm_enable: false
14-
# pg_config_role:
15-
# - line: "dtm.buffer_size = 65536"
16-
pg_dtm_host: "{{ groups['nodes'][0] }}"
13+
pg_config_role:
14+
- line: "dtm.vacuum_delay = 1"
15+
# pg_dtm_host: "{{ groups['nodes'][0] }}"
16+
1717

contrib/pg_dtm/tests/deploy_layouts/roles/postgrespro/tasks/postgres.yml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,18 @@
77
- name: remove dtm.so
88
shell: rm -f {{pg_dst}}/lib/pg_dtm.so
99

10-
- name: build dtm extension
11-
shell: make clean && make && make install
12-
args:
13-
chdir: "{{pg_src}}/contrib/pg_dtm"
14-
creates: "{{pg_dst}}/lib/pg_dtm.so"
15-
16-
# - name: build ts-dtm extension
10+
# - name: build dtm extension
1711
# shell: make clean && make && make install
1812
# args:
19-
# chdir: "{{pg_src}}/contrib/pg_tsdtm"
13+
# chdir: "{{pg_src}}/contrib/pg_dtm"
2014
# creates: "{{pg_dst}}/lib/pg_dtm.so"
2115

16+
- name: build ts-dtm extension
17+
shell: make clean && make && make install
18+
args:
19+
chdir: "{{pg_src}}/contrib/pg_tsdtm"
20+
creates: "{{pg_dst}}/lib/pg_dtm.so"
21+
2222
- stat: path={{pg_datadir}}/postmaster.pid
2323
register: pg_pidfile
2424

contrib/pg_dtm/tests/deploy_layouts/single.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
- hosts: nodes[-1]
33
roles:
44

5-
- role: postgrespro
6-
deploy_dtm: true
5+
# - role: postgrespro
6+
# deploy_dtm: true
77

88
- role: postgrespro
99
deploy_postgres: true

contrib/pg_dtm/tests/farms/sai

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
158.250.29.6 ansible_ssh_user=cluster offset=4001
88
158.250.29.8 ansible_ssh_user=cluster offset=2001
99
158.250.29.9 ansible_ssh_user=cluster offset=1001
10-
158.250.29.10 ansible_ssh_user=cluster offset=1
10+
#158.250.29.10 ansible_ssh_user=cluster offset=1
1111

1212
[master]
1313
158.250.29.10 ansible_ssh_user=cluster offset=1

contrib/pg_tsdtm/tests/perf/perf.go

Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"flag"
6+
"os"
7+
"sync"
8+
"time"
9+
"github.com/jackc/pgx"
10+
)
11+
12+
type ConnStrings []string
13+
14+
var backend interface{
15+
prepare(connstrs []string)
16+
writer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup)
17+
reader(wg *sync.WaitGroup, cFetches chan int, inconsistency *bool)
18+
}
19+
20+
var cfg struct {
21+
ConnStrs ConnStrings
22+
Backend string
23+
Verbose bool
24+
UseDtm bool
25+
Init bool
26+
Parallel bool
27+
Isolation string
28+
AccountsNum int
29+
ReadersNum int
30+
IterNum int
31+
32+
Writers struct {
33+
Num int
34+
StartId int
35+
}
36+
}
37+
38+
// The first method of flag.Value interface
39+
func (c *ConnStrings) String() string {
40+
if len(*c) > 0 {
41+
return (*c)[0]
42+
} else {
43+
return ""
44+
}
45+
}
46+
47+
// The second method of flag.Value interface
48+
func (c *ConnStrings) Set(value string) error {
49+
*c = append(*c, value)
50+
return nil
51+
}
52+
53+
func append_with_comma(s *string, x string) {
54+
if len(*s) > 0 {
55+
*s = *s + ", " + x
56+
} else {
57+
*s = x
58+
}
59+
}
60+
61+
func dump_cfg() {
62+
fmt.Printf("Connections: %d\n", len(cfg.ConnStrs))
63+
for _, cs := range cfg.ConnStrs {
64+
fmt.Printf(" %s\n", cs)
65+
}
66+
fmt.Printf("Isolation: %s\n", cfg.Isolation)
67+
fmt.Printf(
68+
"Accounts: %d × $%d\n",
69+
cfg.AccountsNum, 0,
70+
)
71+
fmt.Printf("Readers: %d\n", cfg.ReadersNum)
72+
73+
fmt.Printf(
74+
"Writers: %d × %d updates\n",
75+
cfg.Writers.Num, cfg.IterNum,
76+
)
77+
}
78+
79+
func init() {
80+
flag.StringVar(&cfg.Backend, "b", "transfers",
81+
"Backend to use. Default to 'transfers'")
82+
flag.Var(&cfg.ConnStrs, "C",
83+
"Connection string (repeat for multiple connections)")
84+
flag.BoolVar(&cfg.Init, "i", false,
85+
"Init database")
86+
flag.BoolVar(&cfg.UseDtm, "g", false,
87+
"Use DTM to keep global consistency")
88+
flag.IntVar(&cfg.AccountsNum, "a", 100000,
89+
"The number of bank accounts")
90+
flag.IntVar(&cfg.Writers.StartId, "s", 0,
91+
"StartID. Script will update rows starting from this value")
92+
flag.IntVar(&cfg.IterNum, "n", 10000,
93+
"The number updates each writer (reader in case of Reades backend) performs")
94+
flag.IntVar(&cfg.ReadersNum, "r", 1,
95+
"The number of readers")
96+
flag.IntVar(&cfg.Writers.Num, "w", 8,
97+
"The number of writers")
98+
flag.BoolVar(&cfg.Verbose, "v", false,
99+
"Show progress and other stuff for mortals")
100+
flag.BoolVar(&cfg.Parallel, "p", false,
101+
"Use parallel execs")
102+
repread := flag.Bool("l", false,
103+
"Use 'repeatable read' isolation level instead of 'read committed'")
104+
flag.Parse()
105+
106+
if len(cfg.ConnStrs) == 0 {
107+
flag.PrintDefaults()
108+
os.Exit(1)
109+
}
110+
111+
if cfg.AccountsNum < 2 {
112+
fmt.Println(
113+
"There should be at least 2 accounts (to avoid deadlocks)",
114+
)
115+
os.Exit(1)
116+
}
117+
118+
if *repread {
119+
cfg.Isolation = "repeatable read"
120+
} else {
121+
cfg.Isolation = "read committed"
122+
}
123+
124+
dump_cfg()
125+
}
126+
127+
func main() {
128+
if len(cfg.ConnStrs) < 2 {
129+
fmt.Println("ERROR: This test needs at leas two connections")
130+
os.Exit(1)
131+
}
132+
133+
// switch cfg.Backend {
134+
// case "transfers":
135+
// backend = new(Transfers)
136+
// case "fdw":
137+
// backend = new(TransfersFDW)
138+
// case "readers":
139+
// backend = new(Readers)
140+
// case "pgshard":
141+
// backend = new(TransfersPgShard)
142+
// default:
143+
// fmt.Println("No backend named: '%s'\n", cfg.Backend)
144+
// return
145+
// }
146+
backend = new(TransfersTS)
147+
148+
start := time.Now()
149+
150+
if (cfg.Init){
151+
backend.prepare(cfg.ConnStrs)
152+
fmt.Printf("database prepared in %0.2f seconds\n", time.Since(start).Seconds())
153+
return
154+
}
155+
156+
var writerWg sync.WaitGroup
157+
var readerWg sync.WaitGroup
158+
159+
cCommits := make(chan int)
160+
cFetches:= make(chan int)
161+
cAborts := make(chan int)
162+
163+
go progress(cfg.Writers.Num * cfg.IterNum, cCommits, cAborts)
164+
165+
start = time.Now()
166+
writerWg.Add(cfg.Writers.Num)
167+
for i := 0; i < cfg.Writers.Num; i++ {
168+
go backend.writer(i, cCommits, cAborts, &writerWg)
169+
}
170+
running = true
171+
172+
inconsistency := false
173+
readerWg.Add(cfg.ReadersNum)
174+
for i := 0; i < cfg.ReadersNum; i++ {
175+
go backend.reader(&readerWg, cFetches, &inconsistency)
176+
}
177+
178+
writerWg.Wait()
179+
running = false
180+
readerWg.Wait()
181+
182+
fmt.Printf("writers finished in %0.2f seconds\n",
183+
time.Since(start).Seconds())
184+
fmt.Printf("TPS = %0.2f\n",
185+
float64(cfg.Writers.Num*cfg.IterNum)/time.Since(start).Seconds())
186+
187+
if inconsistency {
188+
fmt.Printf("INCONSISTENCY DETECTED\n")
189+
}
190+
fmt.Printf("done.\n")
191+
}
192+
193+
var running = false
194+
195+
func asyncCommit(conn *pgx.Conn, wg *sync.WaitGroup) {
196+
exec(conn, "commit")
197+
wg.Done()
198+
}
199+
200+
func commit(conns ...*pgx.Conn) {
201+
var wg sync.WaitGroup
202+
wg.Add(len(conns))
203+
for _, conn := range conns {
204+
go asyncCommit(conn, &wg)
205+
}
206+
wg.Wait()
207+
}
208+
209+
func parallel_exec(conns []*pgx.Conn, requests []string) bool {
210+
var wg sync.WaitGroup
211+
state := true
212+
wg.Add(len(conns))
213+
for i := range conns {
214+
if cfg.Parallel {
215+
go func(j int) {
216+
_, err := conns[j].Exec(requests[j])
217+
if err != nil {
218+
state = false
219+
}
220+
wg.Done()
221+
}(i)
222+
} else {
223+
_, err := conns[i].Exec(requests[i])
224+
if err != nil {
225+
state = false
226+
}
227+
wg.Done()
228+
}
229+
}
230+
wg.Wait()
231+
return state
232+
}
233+
234+
func progress(total int, cCommits chan int, cAborts chan int) {
235+
commits := 0
236+
aborts := 0
237+
start := time.Now()
238+
for newcommits := range cCommits {
239+
newaborts := <-cAborts
240+
commits += newcommits
241+
aborts += newaborts
242+
if time.Since(start).Seconds() > 1 {
243+
if cfg.Verbose {
244+
fmt.Printf(
245+
"progress %0.2f%%: %d commits, %d aborts\n",
246+
float32(commits) * 100.0 / float32(total), commits, aborts,
247+
)
248+
}
249+
start = time.Now()
250+
}
251+
}
252+
}
253+
254+
func exec(conn *pgx.Conn, stmt string, arguments ...interface{}) {
255+
var err error
256+
// fmt.Println(stmt)
257+
_, err = conn.Exec(stmt, arguments... )
258+
checkErr(err)
259+
}
260+
261+
func execUpdate(conn *pgx.Conn, stmt string, arguments ...interface{}) bool {
262+
var err error
263+
// fmt.Println(stmt)
264+
_, err = conn.Exec(stmt, arguments... )
265+
//if err != nil {
266+
// fmt.Println(err)
267+
//}
268+
return err == nil
269+
}
270+
271+
func execQuery(conn *pgx.Conn, stmt string, arguments ...interface{}) int32 {
272+
var err error
273+
var result int32
274+
err = conn.QueryRow(stmt, arguments...).Scan(&result)
275+
checkErr(err)
276+
return result
277+
}
278+
279+
func execQuery64(conn *pgx.Conn, stmt string, arguments ...interface{}) int64 {
280+
var err error
281+
var result int64
282+
err = conn.QueryRow(stmt, arguments...).Scan(&result)
283+
checkErr(err)
284+
return result
285+
}
286+
287+
func checkErr(err error) {
288+
if err != nil {
289+
panic(err)
290+
}
291+
}
292+
293+
// vim: expandtab ts=4 sts=4 sw=4

0 commit comments

Comments
 (0)