Skip to content

Commit 84d6f47

Browse files
committed
unify commit logic; some refactor
1 parent ce2e104 commit 84d6f47

File tree

2 files changed

+62
-201
lines changed

2 files changed

+62
-201
lines changed

src/backend/access/transam/global_snapshot.c

Lines changed: 62 additions & 180 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ static HTAB *gtid2xid;
8989
static DtmNodeState *local;
9090
static uint64 totalSleepInterrupts;
9191
static int DtmVacuumDelay = 10; /* sec */
92-
static bool DtmRecordCommits = 0;
92+
static bool finishing_prepared;
93+
9394

9495
DtmCurrentTrans dtm_tx; // XXXX: make static
9596

@@ -103,6 +104,7 @@ static size_t DtmGetTransactionStateSize(void);
103104
static void DtmSerializeTransactionState(void* ctx);
104105
static void DtmDeserializeTransactionState(void* ctx);
105106

107+
static void DtmLocalFinish(bool is_commit);
106108

107109
static TransactionManager DtmTM = {
108110
PgTransactionIdGetStatus,
@@ -207,7 +209,8 @@ GlobalSnapshotShmemSize(void)
207209
Size size;
208210

209211
size = MAXALIGN(sizeof(DtmNodeState));
210-
size = add_size(size, (sizeof(DtmTransId) + sizeof(DtmTransStatus) + HASH_PER_ELEM_OVERHEAD * 2) * DTM_HASH_INIT_SIZE);
212+
size = add_size(size, DTM_HASH_INIT_SIZE *
213+
(sizeof(DtmTransId) + sizeof(DtmTransStatus) + HASH_PER_ELEM_OVERHEAD * 2));
211214

212215
return size;
213216
}
@@ -222,22 +225,28 @@ dtm_xact_callback(XactEvent event, void *arg)
222225
DtmLocalBegin(&dtm_tx);
223226
break;
224227

225-
case XACT_EVENT_ABORT:
226-
DtmLocalAbort(&dtm_tx);
227-
DtmLocalEnd(&dtm_tx);
228+
case XACT_EVENT_ABORT_PREPARED:
229+
// DtmLocalAbortPrepared(&dtm_tx);
230+
finishing_prepared = true;
231+
DtmAdjustOldestXid();
228232
break;
229233

230-
case XACT_EVENT_COMMIT:
231-
DtmLocalCommit(&dtm_tx);
232-
DtmLocalEnd(&dtm_tx);
234+
case XACT_EVENT_COMMIT_PREPARED:
235+
// DtmLocalCommitPrepared(&dtm_tx);
236+
finishing_prepared = true;
237+
DtmAdjustOldestXid();
233238
break;
234239

235-
case XACT_EVENT_ABORT_PREPARED:
236-
DtmLocalAbortPrepared(&dtm_tx);
240+
case XACT_EVENT_COMMIT:
241+
DtmLocalFinish(true);
242+
DtmLocalEnd(&dtm_tx);
243+
finishing_prepared = false;
237244
break;
238245

239-
case XACT_EVENT_COMMIT_PREPARED:
240-
DtmLocalCommitPrepared(&dtm_tx);
246+
case XACT_EVENT_ABORT:
247+
DtmLocalFinish(false);
248+
DtmLocalEnd(&dtm_tx);
249+
finishing_prepared = false;
241250
break;
242251

243252
case XACT_EVENT_PRE_PREPARE:
@@ -254,43 +263,6 @@ dtm_xact_callback(XactEvent event, void *arg)
254263
* ***************************************************************************
255264
*/
256265

257-
static uint32
258-
dtm_xid_hash_fn(const void *key, Size keysize)
259-
{
260-
return (uint32) *(TransactionId *) key;
261-
}
262-
263-
static int
264-
dtm_xid_match_fn(const void *key1, const void *key2, Size keysize)
265-
{
266-
return *(TransactionId *) key1 - *(TransactionId *) key2;
267-
}
268-
269-
static uint32
270-
dtm_gtid_hash_fn(const void *key, Size keysize)
271-
{
272-
GlobalTransactionId id = (GlobalTransactionId) key;
273-
uint32 h = 0;
274-
275-
while (*id != 0)
276-
{
277-
h = h * 31 + *id++;
278-
}
279-
return h;
280-
}
281-
282-
static void *
283-
dtm_gtid_keycopy_fn(void *dest, const void *src, Size keysize)
284-
{
285-
return strcpy((char *) dest, (GlobalTransactionId) src);
286-
}
287-
288-
static int
289-
dtm_gtid_match_fn(const void *key1, const void *key2, Size keysize)
290-
{
291-
return strcmp((GlobalTransactionId) key1, (GlobalTransactionId) key2);
292-
}
293-
294266
static char const *
295267
DtmGetName(void)
296268
{
@@ -480,17 +452,11 @@ DtmInitialize()
480452
void
481453
DtmLocalBegin(DtmCurrentTrans * x)
482454
{
483-
if (!TransactionIdIsValid(x->xid))
484-
{
485455
SpinLockAcquire(&local->lock);
486-
// x->xid = GetCurrentTransactionIdIfAny();
487456
x->cid = INVALID_CID;
488-
x->is_global = false;
489-
x->is_prepared = false;
490457
x->snapshot = dtm_get_cid();
491458
SpinLockRelease(&local->lock);
492459
DTM_TRACE((stderr, "DtmLocalBegin: transaction %u uses local snapshot %lu\n", x->xid, x->snapshot));
493-
}
494460
}
495461

496462
/*
@@ -516,7 +482,6 @@ DtmLocalExtend(GlobalTransactionId gtid)
516482
strncpy(x->gtid, gtid, MAX_GTID_SIZE);
517483
SpinLockRelease(&local->lock);
518484
}
519-
x->is_global = true;
520485
DtmInitGlobalXmin(TransactionXmin);
521486
return x->snapshot;
522487
}
@@ -543,7 +508,6 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
543508
}
544509
local_cid = dtm_sync(global_cid);
545510
x->snapshot = global_cid;
546-
x->is_global = true;
547511
}
548512
strncpy(x->gtid, gtid, MAX_GTID_SIZE);
549513
SpinLockRelease(&local->lock);
@@ -627,85 +591,59 @@ DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
627591
}
628592
SpinLockRelease(&local->lock);
629593

630-
/*
631-
* Record commit in pg_committed_xact table to be make it possible to
632-
* perform recovery in case of crash of some of cluster nodes
633-
*/
634-
if (DtmRecordCommits)
635-
{
636-
char stmt[MAX_GTID_SIZE + 64];
637-
int rc;
638-
639-
sprintf(stmt, "insert into pg_committed_xacts values ('%s')", gtid);
640-
SPI_connect();
641-
rc = SPI_execute(stmt, true, 0);
642-
SPI_finish();
643-
if (rc != SPI_OK_INSERT)
644-
{
645-
elog(ERROR, "Failed to insert GTID %s in table pg_committed_xacts", gtid);
646-
}
647-
}
648594
}
649595

650596
/*
651-
* Mark tranasction as prepared
597+
* Set transaction status to committed
652598
*/
653599
void
654-
DtmLocalCommitPrepared(DtmCurrentTrans * x)
600+
DtmLocalFinish(bool is_commit)
655601
{
656-
if (!x->gtid[0])
657-
return;
658-
659-
Assert(x->gtid != NULL);
602+
DtmCurrentTrans * x = &dtm_tx;
603+
TransactionId xid = GetCurrentTransactionIdIfAny();
660604

661-
SpinLockAcquire(&local->lock);
605+
if (x->gtid[0] && finishing_prepared)
662606
{
663-
DtmTransId *id = (DtmTransId *) hash_search(gtid2xid, x->gtid, HASH_REMOVE, NULL);
607+
// Assert(!TransactionIdIsValid(xid));
664608

665-
Assert(id != NULL);
609+
SpinLockAcquire(&local->lock);
610+
{
611+
DtmTransId *id = (DtmTransId *) hash_search(gtid2xid, x->gtid, HASH_REMOVE, NULL);
666612

667-
x->is_global = true;
668-
x->is_prepared = true;
669-
x->xid = id->xid;
670-
free(id->subxids);
613+
Assert(id != NULL);
614+
Assert(TransactionIdIsValid(id->xid));
671615

672-
DTM_TRACE((stderr, "Global transaction %u(%s) is precommitted\n", x->xid, gtid));
616+
xid = id->xid;
617+
free(id->subxids);
618+
}
619+
SpinLockRelease(&local->lock);
620+
}
621+
else if (!TransactionIdIsValid(xid))
622+
{
623+
return;
673624
}
674-
SpinLockRelease(&local->lock);
675-
676-
DtmAdjustOldestXid();
677-
// elog(LOG, "DtmLocalCommitPrepared %d", x->xid);
678-
}
679-
680-
/*
681-
* Set transaction status to committed
682-
*/
683-
void
684-
DtmLocalCommit(DtmCurrentTrans * x)
685-
{
686-
// if (!x->is_global)
687-
// return;
688625

689626
SpinLockAcquire(&local->lock);
690-
if (TransactionIdIsValid(x->xid))
691627
{
692628
bool found;
693629
DtmTransStatus *ts;
694630

695-
ts = (DtmTransStatus *) hash_search(xid2status, &x->xid, HASH_ENTER, &found);
696-
ts->status = TRANSACTION_STATUS_COMMITTED;
631+
ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_ENTER, &found);
632+
ts->status = is_commit ? TRANSACTION_STATUS_COMMITTED : TRANSACTION_STATUS_ABORTED;
697633
if (found)
698634
{
699-
int i;
700-
DtmTransStatus *sts = ts;
701635

702-
Assert(found);
703-
Assert(x->is_global);
704-
for (i = 0; i < ts->nSubxids; i++)
636+
if (is_commit) // XXX: why only for commit?
705637
{
706-
sts = sts->next;
707-
Assert(sts->cid == ts->cid);
708-
sts->status = TRANSACTION_STATUS_COMMITTED;
638+
int i;
639+
DtmTransStatus *sts = ts;
640+
641+
for (i = 0; i < ts->nSubxids; i++)
642+
{
643+
sts = sts->next;
644+
Assert(sts->cid == ts->cid);
645+
sts->status = TRANSACTION_STATUS_COMMITTED;
646+
}
709647
}
710648
}
711649
else
@@ -715,86 +653,31 @@ DtmLocalCommit(DtmCurrentTrans * x)
715653
Assert(!found);
716654
ts->cid = dtm_get_cid();
717655
DtmTransactionListAppend(ts);
718-
ts->nSubxids = xactGetCommittedChildren(&subxids);
719-
DtmAddSubtransactions(ts, subxids, ts->nSubxids);
656+
if (is_commit) // XXX: why?
657+
{
658+
ts->nSubxids = xactGetCommittedChildren(&subxids);
659+
DtmAddSubtransactions(ts, subxids, ts->nSubxids);
660+
}
661+
else
662+
{
663+
ts->nSubxids = 0;
664+
}
720665
}
721666
x->cid = ts->cid;
722667
DTM_TRACE((stderr, "Local transaction %u is committed at %lu\n", x->xid, x->cid));
723668
}
724669
SpinLockRelease(&local->lock);
725670

726-
DtmAdjustOldestXid();
671+
// DtmAdjustOldestXid();
727672
// elog(LOG, "DtmLocalCommit %d", x->xid);
728673
}
729674

730-
/*
731-
* Mark tranasction as prepared
732-
*/
733-
void
734-
DtmLocalAbortPrepared(DtmCurrentTrans * x)
735-
{
736-
if (!x->gtid[0])
737-
return;
738-
739-
Assert(x->gtid != NULL);
740-
741-
SpinLockAcquire(&local->lock);
742-
{
743-
DtmTransId *id = (DtmTransId *) hash_search(gtid2xid, x->gtid, HASH_REMOVE, NULL);
744-
745-
Assert(id != NULL);
746-
x->is_global = true;
747-
x->is_prepared = true;
748-
x->xid = id->xid;
749-
free(id->subxids);
750-
DTM_TRACE((stderr, "Global transaction %u(%s) is preaborted\n", x->xid, gtid));
751-
}
752-
SpinLockRelease(&local->lock);
753-
}
754-
755-
/*
756-
* Set transaction status to aborted
757-
*/
758-
void
759-
DtmLocalAbort(DtmCurrentTrans * x)
760-
{
761-
if (!TransactionIdIsValid(x->xid))
762-
return;
763-
764-
SpinLockAcquire(&local->lock);
765-
{
766-
bool found;
767-
DtmTransStatus *ts;
768-
769-
Assert(TransactionIdIsValid(x->xid));
770-
ts = (DtmTransStatus *) hash_search(xid2status, &x->xid, HASH_ENTER, &found);
771-
if (found)
772-
{
773-
Assert(found);
774-
Assert(x->is_global);
775-
}
776-
else
777-
{
778-
Assert(!found);
779-
ts->cid = dtm_get_cid();
780-
ts->nSubxids = 0;
781-
DtmTransactionListAppend(ts);
782-
}
783-
x->cid = ts->cid;
784-
ts->status = TRANSACTION_STATUS_ABORTED;
785-
DTM_TRACE((stderr, "Local transaction %u is aborted at %lu\n", x->xid, x->cid));
786-
}
787-
SpinLockRelease(&local->lock);
788-
}
789-
790675
/*
791676
* Cleanup dtm_tx structure
792677
*/
793678
void
794679
DtmLocalEnd(DtmCurrentTrans * x)
795680
{
796-
x->is_global = false;
797-
x->is_prepared = false;
798681
x->xid = InvalidTransactionId;
799682
x->cid = INVALID_CID;
800683
}
@@ -854,7 +737,6 @@ DtmGetCsn(TransactionId xid)
854737
void
855738
DtmLocalSavePreparedState(DtmCurrentTrans * x)
856739
{
857-
// x->is_prepared = true;
858740

859741
if (x->gtid[0])
860742
{

0 commit comments

Comments
 (0)