Skip to content

Commit 9645d73

Browse files
committed
local node as participant
1 parent 2c64afe commit 9645d73

File tree

3 files changed

+167
-40
lines changed

3 files changed

+167
-40
lines changed

contrib/postgres_fdw/connection.c

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,6 @@ begin_remote_xact(ConnCacheEntry *entry)
470470
if (current_global_cid == 0)
471471
{
472472
MemoryContext oldcxt;
473-
char *resp;
474473

475474
/*
476475
* This is the first remote participant, create global
@@ -484,38 +483,20 @@ begin_remote_xact(ConnCacheEntry *entry)
484483
++two_phase_xact_count);
485484
MemoryContextSwitchTo(oldcxt);
486485

487-
488-
// res = PQexec(entry->conn, psprintf("SELECT pg_global_snaphot_create('%s')",
489-
// two_phase_xact_gid));
490-
491-
// if (PQresultStatus(res) != PGRES_TUPLES_OK)
492-
// {
493-
// pgfdw_report_error(ERROR, res, entry->conn, true, sql);
494-
// }
495-
// resp = PQgetvalue(res, 0, 0);
496-
// if (resp == NULL || (*resp) == '\0' || sscanf(resp, "%ld", &current_global_cid) != 1)
497-
// {
498-
// pgfdw_report_error(ERROR, res, entry->conn, true, sql);
499-
// }
500-
// PQclear(res);
486+
current_global_cid = DtmLocalExtend(two_phase_xact_gid);
487+
}
501488

489+
Assert(two_phase_xact_gid);
490+
/* join the new participant */
491+
res = PQexec(entry->conn,
492+
psprintf("SELECT pg_global_snaphot_join("UINT64_FORMAT", '%s')",
493+
current_global_cid, two_phase_xact_gid));
502494

503-
current_global_cid = DtmLocalExtend(two_phase_xact_gid);
495+
if (PQresultStatus(res) != PGRES_TUPLES_OK)
496+
{
497+
pgfdw_report_error(ERROR, res, entry->conn, true, sql);
504498
}
505-
// else
506-
// {
507-
Assert(two_phase_xact_gid);
508-
/* join the new participant */
509-
res = PQexec(entry->conn,
510-
psprintf("SELECT pg_global_snaphot_join("UINT64_FORMAT", '%s')",
511-
current_global_cid, two_phase_xact_gid));
512-
513-
if (PQresultStatus(res) != PGRES_TUPLES_OK)
514-
{
515-
pgfdw_report_error(ERROR, res, entry->conn, true, sql);
516-
}
517-
PQclear(res);
518-
// }
499+
PQclear(res);
519500
}
520501

521502
/* A new potential participant for 2PC */
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
use strict;
2+
use warnings;
3+
4+
use PostgresNode;
5+
use TestLib;
6+
use Test::More tests => 1;
7+
8+
# my $master = get_new_node("master");
9+
# $master->init;
10+
# $master->append_conf('postgresql.conf', qq(
11+
# max_prepared_transactions = 30
12+
# log_checkpoints = true
13+
# postgres_fdw.use_tsdtm = on
14+
# ));
15+
# $master->start;
16+
17+
my $shard1 = get_new_node("shard1");
18+
$shard1->init;
19+
$shard1->append_conf('postgresql.conf', qq(
20+
max_prepared_transactions = 30
21+
postgres_fdw.use_tsdtm = on
22+
));
23+
$shard1->start;
24+
25+
my $shard2 = get_new_node("shard2");
26+
$shard2->init;
27+
$shard2->append_conf('postgresql.conf', qq(
28+
max_prepared_transactions = 30
29+
postgres_fdw.use_tsdtm = on
30+
));
31+
$shard2->start;
32+
33+
###############################################################################
34+
# Prepare nodes
35+
###############################################################################
36+
37+
my @shards = ($shard1, $shard2);
38+
39+
foreach my $node (@shards)
40+
{
41+
$node->safe_psql('postgres', "CREATE EXTENSION postgres_fdw");
42+
$node->safe_psql('postgres', "CREATE TABLE accounts(id integer primary key, amount integer)");
43+
$node->safe_psql('postgres', "CREATE TABLE accounts_local() inherits(accounts)");
44+
$node->safe_psql('postgres', "CREATE TABLE global_transactions(tx_time timestamp)");
45+
$node->safe_psql('postgres', "CREATE TABLE local_transactions(tx_time timestamp)");
46+
47+
foreach my $neighbor (@shards)
48+
{
49+
next if ($neighbor eq $node);
50+
51+
my $port = $neighbor->port;
52+
my $host = $neighbor->host;
53+
54+
$node->safe_psql('postgres', "CREATE SERVER shard_$port FOREIGN DATA WRAPPER postgres_fdw options(dbname 'postgres', host '$host', port '$port')");
55+
$node->safe_psql('postgres', "CREATE FOREIGN TABLE accounts_fdw_$port() inherits (accounts) server shard_$port options(table_name 'accounts_local')");
56+
$node->safe_psql('postgres', "CREATE USER MAPPING for stas SERVER shard_$port options (user 'stas')");
57+
}
58+
59+
}
60+
61+
diag("\n");
62+
diag( $shard1->connstr('postgres'), "\n" );
63+
diag( $shard2->connstr('postgres'), "\n" );
64+
65+
$shard1->psql('postgres', "insert into accounts_local select 2*id-1, 0 from generate_series(1, 10010) as id;");
66+
$shard2->psql('postgres', "insert into accounts_local select 2*id, 0 from generate_series(1, 10010) as id;");
67+
68+
diag("\n");
69+
diag( $shard1->connstr('postgres'), "\n" );
70+
diag( $shard2->connstr('postgres'), "\n" );
71+
72+
#sleep(6000);
73+
74+
$shard1->pgbench(-n, -c => 20, -t => 30, -f => "$TestLib::log_path/../../t/bank.sql", 'postgres' );
75+
$shard2->pgbench(-n, -c => 20, -t => 30, -f => "$TestLib::log_path/../../t/bank.sql", 'postgres' );
76+
77+
diag("\n");
78+
diag( $shard1->connstr('postgres'), "\n" );
79+
diag( $shard2->connstr('postgres'), "\n" );
80+
# sleep(3600);
81+
82+
###############################################################################
83+
# Helpers
84+
###############################################################################
85+
86+
sub count_and_delete_rows
87+
{
88+
my ($node, $table) = @_;
89+
my ($rc, $count, $err);
90+
91+
($rc, $count, $err) = $node->psql('postgres',"select count(*) from $table",
92+
on_error_die => 1);
93+
94+
die "count_rows: $err" if ($err ne '');
95+
96+
$node->psql('postgres',"delete from $table", on_error_die => 1);
97+
98+
diag($node->name, ": completed $count transactions");
99+
100+
return $count;
101+
}
102+
103+
###############################################################################
104+
# Concurrent global transactions
105+
###############################################################################
106+
107+
my ($err, $rc);
108+
my $started;
109+
my $seconds = 30;
110+
my $selects;
111+
my $total = '0';
112+
my $oldtotal = '0';
113+
my $isolation_errors = 0;
114+
115+
116+
my ($pgb_handle1, $pgb_handle2);
117+
118+
$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => "$TestLib::log_path/../../t/bank.sql", 'postgres' );
119+
$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => "$TestLib::log_path/../../t/bank.sql", 'postgres' );
120+
121+
$started = time();
122+
$selects = 0;
123+
my $i = 1;
124+
while (time() - $started < $seconds)
125+
{
126+
my $shard = $shard1;
127+
foreach my $shard (@shards)
128+
{
129+
$total = $shard->safe_psql('postgres', "select sum(amount) from accounts");
130+
if ( ($total ne $oldtotal) and ($total ne '') )
131+
{
132+
$isolation_errors++;
133+
$oldtotal = $total;
134+
diag("$i: Isolation error. Total = $total");
135+
}
136+
if ($total ne '') { $selects++; }
137+
$i++;
138+
}
139+
}
140+
141+
$shard1->pgbench_await($pgb_handle1);
142+
$shard2->pgbench_await($pgb_handle2);
143+
144+
# sanity check
145+
diag("completed $selects selects");
146+
die "no actual transactions happend" unless ( $selects > 0 &&
147+
count_and_delete_rows($shard1, 'global_transactions') > 0 &&
148+
count_and_delete_rows($shard2, 'global_transactions') > 0);
149+
150+
is($isolation_errors, 0, 'isolation between concurrent global transaction');
151+

src/backend/access/transam/global_snapshot.c

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
392392
SpinLockRelease(&local->lock);
393393
return true;
394394
}
395-
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS)
395+
if (ts->status == TRANSACTION_STATUS_UNKNOWN)
396396
{
397397
DTM_TRACE((stderr, "%d: wait for in-doubt transaction %u in snapshot %lu\n", getpid(), xid, dtm_tx.snapshot));
398398
SpinLockRelease(&local->lock);
@@ -431,22 +431,17 @@ DtmInitialize()
431431

432432
info.keysize = sizeof(TransactionId);
433433
info.entrysize = sizeof(DtmTransStatus);
434-
info.hash = dtm_xid_hash_fn;
435-
info.match = dtm_xid_match_fn;
436434
xid2status = ShmemInitHash("xid2status",
437435
DTM_HASH_INIT_SIZE, DTM_HASH_INIT_SIZE,
438436
&info,
439-
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE);
437+
HASH_ELEM | HASH_BLOBS);
440438

441439
info.keysize = MAX_GTID_SIZE;
442440
info.entrysize = sizeof(DtmTransId);
443-
info.hash = dtm_gtid_hash_fn;
444-
info.match = dtm_gtid_match_fn;
445-
info.keycopy = dtm_gtid_keycopy_fn;
446441
gtid2xid = ShmemInitHash("gtid2xid",
447442
DTM_HASH_INIT_SIZE, DTM_HASH_INIT_SIZE,
448443
&info,
449-
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_KEYCOPY);
444+
HASH_ELEM);
450445

451446
TM = &DtmTM;
452447

@@ -526,7 +521,7 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
526521
{
527522
DtmTransId *id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_ENTER, NULL);
528523

529-
id->xid = x->xid;
524+
id->xid = GetCurrentTransactionId();
530525
id->nSubxids = 0;
531526
id->subxids = 0;
532527
}
@@ -559,7 +554,7 @@ DtmLocalBeginPrepare(GlobalTransactionId gtid)
559554
Assert(id != NULL);
560555
Assert(TransactionIdIsValid(id->xid));
561556
ts = (DtmTransStatus *) hash_search(xid2status, &id->xid, HASH_ENTER, NULL);
562-
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
557+
ts->status = TRANSACTION_STATUS_UNKNOWN;
563558
ts->cid = dtm_get_cid();
564559
ts->nSubxids = id->nSubxids;
565560
DtmTransactionListAppend(ts);

0 commit comments

Comments
 (0)