Skip to content

Commit 57002d1

Browse files
committed
cleanup DtmLocalFinish and get rid of x->gtid
1 parent 4e2fed8 commit 57002d1

File tree

4 files changed

+106
-105
lines changed

4 files changed

+106
-105
lines changed

contrib/postgres_fdw/connection.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -483,14 +483,14 @@ begin_remote_xact(ConnCacheEntry *entry)
483483
++two_phase_xact_count);
484484
MemoryContextSwitchTo(oldcxt);
485485

486-
current_global_cid = DtmLocalExtend(two_phase_xact_gid);
486+
current_global_cid = DtmLocalExtend();
487487
}
488488

489489
Assert(two_phase_xact_gid);
490490
/* join the new participant */
491491
res = PQexec(entry->conn,
492-
psprintf("SELECT pg_global_snaphot_join("UINT64_FORMAT", '%s')",
493-
current_global_cid, two_phase_xact_gid));
492+
psprintf("SELECT pg_global_snaphot_join("UINT64_FORMAT")",
493+
current_global_cid));
494494

495495
if (PQresultStatus(res) != PGRES_TUPLES_OK)
496496
{

src/backend/access/transam/global_snapshot.c

Lines changed: 99 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,29 @@ DtmAdjustSubtransactions(DtmTransStatus *ts)
295295
}
296296
}
297297

298+
/*
299+
* Add subtransactions to finished transactions list.
300+
* Copy CSN and status of parent transaction.
301+
*/
302+
static void
303+
DtmAddSubtransactions(DtmTransStatus * ts, TransactionId *subxids, int nSubxids)
304+
{
305+
int i;
306+
307+
for (i = 0; i < nSubxids; i++)
308+
{
309+
bool found;
310+
DtmTransStatus *sts;
311+
312+
Assert(TransactionIdIsValid(subxids[i]));
313+
sts = (DtmTransStatus *) hash_search(xid2status, &subxids[i], HASH_ENTER, &found);
314+
Assert(!found);
315+
sts->cid = ts->cid;
316+
sts->nSubxids = 0;
317+
DtmTransactionListInsertAfter(ts, sts);
318+
}
319+
}
320+
298321
/*
299322
* There can be different oldest XIDs at different cluster node.
300323
* Seince we do not have centralized aribiter, we have to rely in DtmVacuumDelay.
@@ -465,27 +488,19 @@ DtmLocalBegin(DtmCurrentTrans * x)
465488
* Returns snapshot of current transaction.
466489
*/
467490
cid_t
468-
DtmLocalExtend(GlobalTransactionId gtid)
491+
DtmLocalExtend()
469492
{
470-
DtmCurrentTrans *x = &dtm_tx;
471-
472-
if (gtid != NULL)
473-
{
474-
strncpy(x->gtid, gtid, MAX_GTID_SIZE);
475-
}
476493
DtmInitGlobalXmin(TransactionXmin);
477-
478494
dtm_tx.is_global = true;
479-
480-
return x->snapshot;
495+
return dtm_tx.snapshot;
481496
}
482497

483498
/*
484499
* This function is executed on all nodes joining distributed transaction.
485500
* global_cid is snapshot taken from node initiated this transaction
486501
*/
487502
cid_t
488-
DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
503+
DtmLocalAccess(cid_t global_cid)
489504
{
490505
cid_t local_cid;
491506

@@ -494,9 +509,8 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
494509
SpinLockAcquire(&local->lock);
495510
{
496511
local_cid = dtm_sync(global_cid);
497-
x->snapshot = global_cid;
512+
dtm_tx.snapshot = global_cid;
498513
}
499-
strncpy(x->gtid, gtid, MAX_GTID_SIZE);
500514
SpinLockRelease(&local->lock);
501515

502516
dtm_tx.is_global = true;
@@ -510,6 +524,37 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
510524
return global_cid;
511525
}
512526

527+
528+
/*
529+
* Save state of parepared transaction
530+
*/
531+
void
532+
DtmLocalSavePreparedState(DtmCurrentTrans * x)
533+
{
534+
535+
if (dtm_tx.is_global)
536+
{
537+
TransactionId *subxids;
538+
TransactionId xid = GetCurrentTransactionId();
539+
int nSubxids = xactGetCommittedChildren(&subxids);
540+
541+
SpinLockAcquire(&local->lock);
542+
{
543+
DtmTransStatus *ts;
544+
bool found;
545+
546+
ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_ENTER, &found);
547+
Assert(!found);
548+
ts->cid = InDoubtGlobalCSN;
549+
ts->nSubxids = nSubxids;
550+
DtmTransactionListAppend(ts);
551+
DtmAddSubtransactions(ts, subxids, nSubxids);
552+
}
553+
SpinLockRelease(&local->lock);
554+
}
555+
}
556+
557+
513558
/*
514559
* Set transaction status to in-doubt. Now all transactions accessing tuples updated by this transaction have to
515560
* wait until it is either committed either aborted
@@ -587,10 +632,8 @@ DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
587632
return; // global ro tx
588633
}
589634

590-
591635
dtm_tx.xid = xid;
592636
dtm_tx.csn = cid;
593-
594637
}
595638

596639
/*
@@ -599,43 +642,57 @@ DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
599642
void
600643
DtmLocalFinish(bool is_commit)
601644
{
602-
DtmCurrentTrans * x = &dtm_tx;
603645
TransactionId xid = GetCurrentTransactionIdIfAny();
646+
bool found;
647+
DtmTransStatus *ts;
604648

605-
if (x->gtid[0] && finishing_prepared)
649+
// We can't check just TransactionIdIsValid(dtm_tx.xid) because
650+
// then we catch commit of `select pg_global_snaphot_end_prepare(...)`
651+
if (TransactionIdIsValid(dtm_tx.xid) &&
652+
(finishing_prepared || // commit prepared of global
653+
TransactionIdIsValid(xid))) // ordinary commit of global
606654
{
655+
// Commit of global prepared tx
656+
607657
xid = dtm_tx.xid;
658+
Assert(GlobalCSNIsNormal(dtm_tx.csn));
659+
660+
SpinLockAcquire(&local->lock);
661+
ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_FIND, &found);
662+
Assert(found);
663+
ts->cid = is_commit ? dtm_tx.csn : AbortedGlobalCSN;
664+
DtmAdjustSubtransactions(ts); // !
665+
SpinLockRelease(&local->lock);
666+
667+
dtm_tx.xid = InvalidTransactionId;
668+
dtm_tx.csn = InvalidGlobalCSN;
669+
dtm_tx.is_global = false;
608670
}
609-
else if (!TransactionIdIsValid(xid))
671+
else if (TransactionIdIsValid(xid))
610672
{
611-
return;
612-
}
673+
// Commit of local tx
613674

614-
SpinLockAcquire(&local->lock);
615-
{
616-
bool found;
617-
DtmTransStatus *ts;
675+
TransactionId *subxids;
676+
int nSubxids = xactGetCommittedChildren(&subxids);
618677

619-
ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_ENTER, &found);
678+
Assert(!GlobalCSNIsNormal(dtm_tx.csn));
679+
Assert(!TransactionIdIsValid(dtm_tx.xid));
620680

621-
if (found)
681+
if (dtm_tx.is_global)
622682
{
623-
Assert(GlobalCSNIsNormal(dtm_tx.csn));
624-
ts->cid = is_commit ? dtm_tx.csn : AbortedGlobalCSN;
625-
626-
dtm_tx.xid = InvalidTransactionId;
627-
dtm_tx.csn = InvalidGlobalCSN;
628-
}
629-
else
630-
{
631-
Assert(!GlobalCSNIsNormal(dtm_tx.csn));
632-
ts->cid = is_commit ? dtm_get_cid() : AbortedGlobalCSN;
633-
DtmTransactionListAppend(ts);
683+
Assert(!is_commit);
684+
dtm_tx.is_global = false;
634685
}
635-
DtmAdjustSubtransactions(ts);
636-
}
637-
SpinLockRelease(&local->lock);
638686

687+
SpinLockAcquire(&local->lock);
688+
ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_ENTER, &found);
689+
Assert(!found);
690+
ts->cid = is_commit ? dtm_get_cid() : AbortedGlobalCSN;
691+
ts->nSubxids = nSubxids;
692+
DtmTransactionListAppend(ts);
693+
DtmAddSubtransactions(ts, subxids, nSubxids);
694+
SpinLockRelease(&local->lock);
695+
}
639696
}
640697

641698
/*
@@ -669,59 +726,6 @@ DtmDeserializeTransactionState(void* ctx)
669726
}
670727

671728

672-
/*
673-
* Save state of parepared transaction
674-
*/
675-
void
676-
DtmLocalSavePreparedState(DtmCurrentTrans * x)
677-
{
678-
679-
if (dtm_tx.is_global)
680-
{
681-
TransactionId *subxids;
682-
TransactionId xid = GetCurrentTransactionId();
683-
int nSubxids = xactGetCommittedChildren(&subxids);
684-
685-
SpinLockAcquire(&local->lock);
686-
{
687-
DtmTransStatus *ts;
688-
bool found;
689-
690-
ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_ENTER, &found);
691-
Assert(!found);
692-
ts->cid = InDoubtGlobalCSN;
693-
ts->nSubxids = nSubxids;
694-
DtmTransactionListAppend(ts);
695-
DtmAddSubtransactions(ts, subxids, nSubxids);
696-
}
697-
SpinLockRelease(&local->lock);
698-
}
699-
}
700-
701-
/*
702-
* Add subtransactions to finished transactions list.
703-
* Copy CSN and status of parent transaction.
704-
*/
705-
static void
706-
DtmAddSubtransactions(DtmTransStatus * ts, TransactionId *subxids, int nSubxids)
707-
{
708-
int i;
709-
710-
for (i = 0; i < nSubxids; i++)
711-
{
712-
bool found;
713-
DtmTransStatus *sts;
714-
715-
Assert(TransactionIdIsValid(subxids[i]));
716-
sts = (DtmTransStatus *) hash_search(xid2status, &subxids[i], HASH_ENTER, &found);
717-
Assert(!found);
718-
sts->cid = ts->cid;
719-
sts->nSubxids = 0;
720-
DtmTransactionListInsertAfter(ts, sts);
721-
}
722-
}
723-
724-
725729
/*
726730
*
727731
* SQL functions for global snapshot mamagement.
@@ -731,8 +735,7 @@ DtmAddSubtransactions(DtmTransStatus * ts, TransactionId *subxids, int nSubxids)
731735
Datum
732736
pg_global_snaphot_create(PG_FUNCTION_ARGS)
733737
{
734-
GlobalTransactionId gtid = text_to_cstring(PG_GETARG_TEXT_PP(0));
735-
cid_t cid = DtmLocalExtend(gtid);
738+
cid_t cid = DtmLocalExtend();
736739

737740
DTM_TRACE((stderr, "Backend %d extends transaction %u(%s) to global with cid=%lu\n", getpid(), dtm_tx.xid, gtid, cid));
738741
PG_RETURN_INT64(cid);
@@ -742,10 +745,9 @@ Datum
742745
pg_global_snaphot_join(PG_FUNCTION_ARGS)
743746
{
744747
cid_t cid = PG_GETARG_INT64(0);
745-
GlobalTransactionId gtid = text_to_cstring(PG_GETARG_TEXT_PP(1));
746748

747749
DTM_TRACE((stderr, "Backend %d joins transaction %u(%s) with cid=%lu\n", getpid(), dtm_tx.xid, gtid, cid));
748-
cid = DtmLocalAccess(&dtm_tx, gtid, cid);
750+
cid = DtmLocalAccess(cid);
749751
PG_RETURN_INT64(cid);
750752
}
751753

src/include/access/global_snapshot.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ typedef struct
1111
{
1212
cid_t snapshot;
1313
GlobalCSN csn;
14-
char gtid[MAX_GTID_SIZE];
1514
TransactionId xid;
1615
bool is_global;
1716
} DtmCurrentTrans;
@@ -36,10 +35,10 @@ void DtmInitialize(void);
3635
void DtmLocalBegin(DtmCurrentTrans * x);
3736

3837
/* Extend local transaction to global by assigning upper bound CSN which is returned to coordinator */
39-
extern cid_t DtmLocalExtend(GlobalTransactionId gtid);
38+
extern cid_t DtmLocalExtend(void);
4039

4140
/* Function called at first access to any datanode except first one involved in distributed transaction */
42-
cid_t DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t snapshot);
41+
cid_t DtmLocalAccess(cid_t snapshot);
4342

4443
/* Mark transaction as in-doubt */
4544
void DtmLocalBeginPrepare(GlobalTransactionId gtid);

src/include/catalog/pg_proc.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5534,9 +5534,9 @@ DATA(insert OID = 5028 ( satisfies_hash_partition PGNSP PGUID 12 1 0 2276 0 f f
55345534
DESCR("hash partition CHECK constraint");
55355535

55365536
/* global snapshot management functions */
5537-
DATA(insert OID = 3434 ( pg_global_snaphot_create PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 20 "25" _null_ _null_ _null_ _null_ _null_ pg_global_snaphot_create _null_ _null_ _null_ ));
5537+
DATA(insert OID = 3434 ( pg_global_snaphot_create PGNSP PGUID 12 1 0 0 0 f f f f t f v u 0 0 20 "" _null_ _null_ _null_ _null_ _null_ pg_global_snaphot_create _null_ _null_ _null_ ));
55385538
DESCR("create global transaction snapshot");
5539-
DATA(insert OID = 3435 ( pg_global_snaphot_join PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 20 "20 25" _null_ _null_ _null_ _null_ _null_ pg_global_snaphot_join _null_ _null_ _null_ ));
5539+
DATA(insert OID = 3435 ( pg_global_snaphot_join PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 20 "20" _null_ _null_ _null_ _null_ _null_ pg_global_snaphot_join _null_ _null_ _null_ ));
55405540
DESCR("set given global snapshot for current transaction");
55415541
DATA(insert OID = 3436 ( pg_global_snaphot_begin_prepare PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "25" _null_ _null_ _null_ _null_ _null_ pg_global_snaphot_begin_prepare _null_ _null_ _null_ ));
55425542
DESCR("start prepare of global transaction");

0 commit comments

Comments
 (0)