Skip to content

Commit aaec649

Browse files
committed
Add cursors using to transfers test.
1 parent a69772f commit aaec649

File tree

3 files changed

+78
-19
lines changed

3 files changed

+78
-19
lines changed

contrib/pg_dtm/pg_dtm--1.0.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,7 @@ LANGUAGE C;
1616
CREATE FUNCTION dtm_get_current_snapshot_xmax() RETURNS integer
1717
AS 'MODULE_PATHNAME','dtm_get_current_snapshot_xmax'
1818
LANGUAGE C;
19+
20+
CREATE FUNCTION dtm_get_current_snapshot_xcnt() RETURNS integer
21+
AS 'MODULE_PATHNAME','dtm_get_current_snapshot_xcnt'
22+
LANGUAGE C;

contrib/pg_dtm/pg_dtm.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,7 @@ PG_FUNCTION_INFO_V1(dtm_begin_transaction);
775775
PG_FUNCTION_INFO_V1(dtm_join_transaction);
776776
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xmax);
777777
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xmin);
778+
PG_FUNCTION_INFO_V1(dtm_get_current_snapshot_xcnt);
778779

779780
Datum
780781
dtm_get_current_snapshot_xmin(PG_FUNCTION_ARGS)
@@ -788,6 +789,12 @@ dtm_get_current_snapshot_xmax(PG_FUNCTION_ARGS)
788789
PG_RETURN_INT32(CurrentTransactionSnapshot->xmax);
789790
}
790791

792+
Datum
793+
dtm_get_current_snapshot_xcnt(PG_FUNCTION_ARGS)
794+
{
795+
PG_RETURN_INT32(CurrentTransactionSnapshot->xcnt);
796+
}
797+
791798
Datum
792799
dtm_begin_transaction(PG_FUNCTION_ARGS)
793800
{

contrib/pg_dtm/tests/transfers.go

Lines changed: 67 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ const (
1515
N_ACCOUNTS = 100000
1616
//ISOLATION_LEVEL = "repeatable read"
1717
ISOLATION_LEVEL = "read committed"
18+
GLOBAL_UPDATES = true
19+
LOCAL_UPDATES = false
20+
CURSORS = false
1821
)
1922

2023

@@ -134,28 +137,71 @@ func transfer(id int, cCommits chan int, cAborts chan int, wg *sync.WaitGroup) {
134137
amount := 1
135138
account1 := rand.Intn(N_ACCOUNTS)
136139
account2 := rand.Intn(N_ACCOUNTS)
140+
srci := rand.Intn(2)
141+
dsti := rand.Intn(2)
142+
if (srci > dsti) {
143+
srci, dsti = dsti, srci
144+
}
137145

138-
src := conn[0]
139-
dst := conn[1]
146+
src := conn[srci]
147+
dst := conn[dsti]
140148

141-
xid = execQuery(src, "select dtm_begin_transaction()")
142-
exec(dst, "select dtm_join_transaction($1)", xid)
149+
if src == dst {
150+
// local update
151+
if !LOCAL_UPDATES {
152+
// which we do not want
153+
continue
154+
}
143155

144-
// start transaction
145-
exec(src, "begin transaction isolation level " + ISOLATION_LEVEL)
146-
exec(dst, "begin transaction isolation level " + ISOLATION_LEVEL)
156+
exec(src, "begin transaction isolation level " + ISOLATION_LEVEL)
157+
ok1 := execUpdate(src, "update t set v = v - $1 where u=$2", amount, account1)
158+
ok2 := execUpdate(src, "update t set v = v + $1 where u=$2", amount, account2)
159+
if !ok1 || !ok2 {
160+
exec(src, "rollback")
161+
nAborts += 1
162+
} else {
163+
exec(src, "commit")
164+
nCommits += 1
165+
myCommits += 1
166+
}
167+
} else {
168+
// global update
169+
if !GLOBAL_UPDATES {
170+
// which we do not want
171+
continue
172+
}
147173

148-
ok1 := execUpdate(src, "update t set v = v - $1 where u=$2", amount, account1)
149-
ok2 := execUpdate(dst, "update t set v = v + $1 where u=$2", amount, account2)
174+
xid = execQuery(src, "select dtm_begin_transaction()")
175+
exec(dst, "select dtm_join_transaction($1)", xid)
150176

151-
if !ok1 || !ok2 {
152-
exec(src, "rollback")
153-
exec(dst, "rollback")
154-
nAborts += 1
155-
} else {
156-
commit(src, dst)
157-
nCommits += 1
158-
myCommits += 1
177+
// start transaction
178+
exec(src, "begin transaction isolation level " + ISOLATION_LEVEL)
179+
exec(dst, "begin transaction isolation level " + ISOLATION_LEVEL)
180+
181+
ok := true
182+
if (CURSORS) {
183+
exec(src, "declare cur0 cursor for select * from t where u=$1 for update", account1)
184+
exec(dst, "declare cur0 cursor for select * from t where u=$1 for update", account2)
185+
186+
ok = execUpdate(src, "fetch from cur0") && ok
187+
ok = execUpdate(dst, "fetch from cur0") && ok
188+
189+
ok = execUpdate(src, "update t set v = v - $1 where current of cur0", amount) && ok
190+
ok = execUpdate(dst, "update t set v = v + $1 where current of cur0", amount) && ok
191+
} else {
192+
ok = execUpdate(src, "update t set v = v - $1 where u=$2", amount, account1) && ok
193+
ok = execUpdate(dst, "update t set v = v + $1 where u=$2", amount, account2) && ok
194+
}
195+
196+
if ok {
197+
commit(src, dst)
198+
nCommits += 1
199+
myCommits += 1
200+
} else {
201+
exec(src, "rollback")
202+
exec(dst, "rollback")
203+
nAborts += 1
204+
}
159205
}
160206

161207
if time.Since(start).Seconds() > 1 {
@@ -197,11 +243,13 @@ func inspect(wg *sync.WaitGroup) {
197243
if (sum != prevSum) {
198244
xmin1 := execQuery(conn1, "select dtm_get_current_snapshot_xmin()")
199245
xmax1 := execQuery(conn1, "select dtm_get_current_snapshot_xmax()")
246+
xcnt1 := execQuery(conn1, "select dtm_get_current_snapshot_xcnt()")
200247
xmin2 := execQuery(conn2, "select dtm_get_current_snapshot_xmin()")
201248
xmax2 := execQuery(conn2, "select dtm_get_current_snapshot_xmax()")
249+
xcnt2 := execQuery(conn2, "select dtm_get_current_snapshot_xcnt()")
202250
fmt.Printf(
203-
"Total=%d xid=%d snap1=[%d, %d) snap2=[%d, %d)\n",
204-
sum, xid, xmin1, xmax1, xmin2, xmax2,
251+
"Total=%d xid=%d snap1=[%d, %d){%d} snap2=[%d, %d){%d}\n",
252+
sum, xid, xmin1, xmax1, xcnt1, xmin2, xmax2, xcnt2,
205253
)
206254
prevSum = sum
207255
}

0 commit comments

Comments
 (0)