Skip to content

Commit 2c64afe

Browse files
committed
always add local node as participant
1 parent 14e6f4e commit 2c64afe

File tree

3 files changed

+40
-37
lines changed

3 files changed

+40
-37
lines changed

contrib/postgres_fdw/connection.c

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -485,35 +485,37 @@ begin_remote_xact(ConnCacheEntry *entry)
485485
MemoryContextSwitchTo(oldcxt);
486486

487487

488-
res = PQexec(entry->conn, psprintf("SELECT pg_global_snaphot_create('%s')",
489-
two_phase_xact_gid));
488+
// res = PQexec(entry->conn, psprintf("SELECT pg_global_snaphot_create('%s')",
489+
// two_phase_xact_gid));
490+
491+
// if (PQresultStatus(res) != PGRES_TUPLES_OK)
492+
// {
493+
// pgfdw_report_error(ERROR, res, entry->conn, true, sql);
494+
// }
495+
// resp = PQgetvalue(res, 0, 0);
496+
// if (resp == NULL || (*resp) == '\0' || sscanf(resp, "%ld", &current_global_cid) != 1)
497+
// {
498+
// pgfdw_report_error(ERROR, res, entry->conn, true, sql);
499+
// }
500+
// PQclear(res);
501+
502+
503+
current_global_cid = DtmLocalExtend(two_phase_xact_gid);
504+
}
505+
// else
506+
// {
507+
Assert(two_phase_xact_gid);
508+
/* join the new participant */
509+
res = PQexec(entry->conn,
510+
psprintf("SELECT pg_global_snaphot_join("UINT64_FORMAT", '%s')",
511+
current_global_cid, two_phase_xact_gid));
490512

491513
if (PQresultStatus(res) != PGRES_TUPLES_OK)
492514
{
493515
pgfdw_report_error(ERROR, res, entry->conn, true, sql);
494516
}
495-
resp = PQgetvalue(res, 0, 0);
496-
if (resp == NULL || (*resp) == '\0' || sscanf(resp, "%ld", &current_global_cid) != 1)
497-
{
498-
pgfdw_report_error(ERROR, res, entry->conn, true, sql);
499-
}
500517
PQclear(res);
501-
502-
503-
// current_global_cid = DtmLocalExtend(two_phase_xact_gid);
504-
}
505-
506-
Assert(two_phase_xact_gid);
507-
/* join the new participant */
508-
res = PQexec(entry->conn,
509-
psprintf("SELECT pg_global_snaphot_join("UINT64_FORMAT", '%s')",
510-
current_global_cid, two_phase_xact_gid));
511-
512-
if (PQresultStatus(res) != PGRES_TUPLES_OK)
513-
{
514-
pgfdw_report_error(ERROR, res, entry->conn, true, sql);
515-
}
516-
PQclear(res);
518+
// }
517519
}
518520

519521
/* A new potential participant for 2PC */
@@ -1045,22 +1047,22 @@ finalize_dtm(void)
10451047
{
10461048
char *gid = two_phase_xact_gid; // != NULL? two_phase_xact_gid : "";
10471049
cid_t maxCSN = 0;
1048-
// cid_t localCSN = 0;
1050+
cid_t localCSN = 0;
10491051

10501052
Assert(gid);
10511053

1052-
// DtmLocalBeginPrepare(gid);
1054+
DtmLocalBeginPrepare(gid);
10531055
BroadcastFunc(psprintf("SELECT pg_global_snaphot_begin_prepare('%s')",
10541056
gid));
10551057

10561058
/* Collect CSNs and choose max */
1057-
// localCSN = DtmLocalPrepare(gid, 0);
1059+
localCSN = DtmLocalPrepare(gid, 0);
10581060
BroadcastStmt(psprintf("SELECT pg_global_snaphot_prepare('%s', 0)",
10591061
gid), PGRES_TUPLES_OK, DtmMaxCSN, &maxCSN);
1060-
// if (localCSN > maxCSN)
1061-
// maxCSN = localCSN;
1062+
if (localCSN > maxCSN)
1063+
maxCSN = localCSN;
10621064

1063-
// DtmLocalEndPrepare(gid, maxCSN);
1065+
DtmLocalEndPrepare(gid, maxCSN);
10641066
BroadcastFunc(psprintf("SELECT pg_global_snaphot_end_prepare('%s',"UINT64_FORMAT")",
10651067
gid, maxCSN));
10661068

src/backend/access/transam/global_snapshot.c

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ static HTAB *xid2status;
8888
static HTAB *gtid2xid;
8989
static DtmNodeState *local;
9090
static uint64 totalSleepInterrupts;
91-
static int DtmVacuumDelay = 2; /* sec */
91+
static int DtmVacuumDelay = 10; /* sec */
9292
static bool DtmRecordCommits = 0;
9393

9494
DtmCurrentTrans dtm_tx; // XXXX: make static
@@ -489,19 +489,20 @@ DtmLocalBegin(DtmCurrentTrans * x)
489489
* Returns snapshot of current transaction.
490490
*/
491491
cid_t
492-
DtmLocalExtend(DtmCurrentTrans * x, GlobalTransactionId gtid)
492+
DtmLocalExtend(GlobalTransactionId gtid)
493493
{
494-
// DtmCurrentTrans *x = &dtm_tx;
494+
DtmCurrentTrans *x = &dtm_tx;
495495

496496
if (gtid != NULL)
497497
{
498498
SpinLockAcquire(&local->lock);
499499
{
500500
DtmTransId *id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_ENTER, NULL);
501501

502-
id->xid = x->xid;
502+
id->xid = GetCurrentTransactionId();
503503
id->nSubxids = 0;
504504
id->subxids = 0;
505+
x->xid = id->xid;
505506
}
506507
strncpy(x->gtid, gtid, MAX_GTID_SIZE);
507508
SpinLockRelease(&local->lock);
@@ -681,7 +682,7 @@ DtmLocalCommit(DtmCurrentTrans * x)
681682

682683
ts = (DtmTransStatus *) hash_search(xid2status, &x->xid, HASH_ENTER, &found);
683684
ts->status = TRANSACTION_STATUS_COMMITTED;
684-
if (x->is_prepared)
685+
if (found)
685686
{
686687
int i;
687688
DtmTransStatus *sts = ts;
@@ -755,7 +756,7 @@ DtmLocalAbort(DtmCurrentTrans * x)
755756

756757
Assert(TransactionIdIsValid(x->xid));
757758
ts = (DtmTransStatus *) hash_search(xid2status, &x->xid, HASH_ENTER, &found);
758-
if (x->is_prepared)
759+
if (found)
759760
{
760761
Assert(found);
761762
Assert(x->is_global);
@@ -902,7 +903,7 @@ Datum
902903
pg_global_snaphot_create(PG_FUNCTION_ARGS)
903904
{
904905
GlobalTransactionId gtid = text_to_cstring(PG_GETARG_TEXT_PP(0));
905-
cid_t cid = DtmLocalExtend(&dtm_tx, gtid);
906+
cid_t cid = DtmLocalExtend(gtid);
906907

907908
DTM_TRACE((stderr, "Backend %d extends transaction %u(%s) to global with cid=%lu\n", getpid(), dtm_tx.xid, gtid, cid));
908909
PG_RETURN_INT64(cid);

src/include/access/global_snapshot.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ void DtmInitialize(void);
3030
void DtmLocalBegin(DtmCurrentTrans * x);
3131

3232
/* Extend local transaction to global by assigning upper bound CSN which is returned to coordinator */
33-
cid_t DtmLocalExtend(DtmCurrentTrans * x, GlobalTransactionId gtid);
33+
extern cid_t DtmLocalExtend(GlobalTransactionId gtid);
3434

3535
/* Function called at first access to any datanode except first one involved in distributed transaction */
3636
cid_t DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t snapshot);

0 commit comments

Comments
 (0)