Skip to content

Commit 4e2fed8

Browse files
committed
unify csn and status
1 parent 19422a9 commit 4e2fed8

File tree

2 files changed

+72
-69
lines changed

2 files changed

+72
-69
lines changed

src/backend/access/transam/global_snapshot.c

Lines changed: 63 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ typedef uint64 timestamp_t;
4949
typedef struct DtmTransStatus
5050
{
5151
TransactionId xid;
52-
XidStatus status;
5352
int nSubxids;
5453
cid_t cid; /* CSN */
5554
struct DtmTransStatus *next;/* pointer to next element in finished
@@ -292,8 +291,7 @@ DtmAdjustSubtransactions(DtmTransStatus *ts)
292291

293292
for (i = 0; i < nSubxids; i++) {
294293
sts = sts->next;
295-
sts->status = ts->status;
296-
Assert(sts->cid == ts->cid);
294+
sts->cid = ts->cid;
297295
}
298296
}
299297

@@ -380,14 +378,14 @@ DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
380378

381379
if (ts != NULL)
382380
{
383-
if (ts->cid > dtm_tx.snapshot)
381+
if (GlobalCSNIsNormal(ts->cid) && ts->cid > dtm_tx.snapshot)
384382
{
385383
DTM_TRACE((stderr, "%d: tuple with xid=%d(csn=%lld) is invisibile in snapshot %lld\n",
386384
getpid(), xid, ts->cid, dtm_tx.snapshot));
387385
SpinLockRelease(&local->lock);
388386
return true;
389387
}
390-
if (ts->status == TRANSACTION_STATUS_UNKNOWN)
388+
if (ts->cid == InDoubtGlobalCSN)
391389
{
392390
DTM_TRACE((stderr, "%d: wait for in-doubt transaction %u in snapshot %lu\n", getpid(), xid, dtm_tx.snapshot));
393391
SpinLockRelease(&local->lock);
@@ -400,7 +398,10 @@ DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
400398
}
401399
else
402400
{
403-
bool invisible = ts->status == TRANSACTION_STATUS_ABORTED;
401+
bool invisible = ts->cid == AbortedGlobalCSN;
402+
403+
if (!invisible)
404+
Assert(GlobalCSNIsNormal(ts->cid));
404405

405406
DTM_TRACE((stderr, "%d: tuple with xid=%d(csn= %lld) is %s in snapshot %lld\n",
406407
getpid(), xid, ts->cid, invisible ? "rollbacked" : "committed", dtm_tx.snapshot));
@@ -473,6 +474,9 @@ DtmLocalExtend(GlobalTransactionId gtid)
473474
strncpy(x->gtid, gtid, MAX_GTID_SIZE);
474475
}
475476
DtmInitGlobalXmin(TransactionXmin);
477+
478+
dtm_tx.is_global = true;
479+
476480
return x->snapshot;
477481
}
478482

@@ -495,6 +499,8 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
495499
strncpy(x->gtid, gtid, MAX_GTID_SIZE);
496500
SpinLockRelease(&local->lock);
497501

502+
dtm_tx.is_global = true;
503+
498504
if (global_cid < local_cid - DtmVacuumDelay * USEC)
499505
{
500506
elog(ERROR, "Too old snapshot: requested %ld, current %ld", global_cid, local_cid);
@@ -511,27 +517,34 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
511517
void
512518
DtmLocalBeginPrepare(GlobalTransactionId gtid)
513519
{
514-
TransactionId xid = TwoPhaseGetTransactionId(gtid);
515-
516-
if (!TransactionIdIsValid(xid))
517-
{
518-
// XXX: check that it is global tx with the same xid, XactTopTransactionId?
519-
xid = GetCurrentTransactionId();
520-
}
520+
TransactionId xid = GetCurrentTransactionIdIfAny();
521521

522-
SpinLockAcquire(&local->lock);
522+
if (TransactionIdIsValid(xid)) // XXX: decide based on empty gtid?
523523
{
524+
// inside global 1pc tx
525+
TransactionId *subxids;
526+
int nSubxids = xactGetCommittedChildren(&subxids);
524527
DtmTransStatus *ts;
525528
bool found;
526529

527-
ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_ENTER, &found);
528-
ts->status = TRANSACTION_STATUS_UNKNOWN;
529-
ts->cid = dtm_get_cid();
530-
if (!found)
531-
ts->nSubxids = 0;
532-
DtmAdjustSubtransactions(ts);
530+
Assert(dtm_tx.is_global); // XXX: change to error
531+
532+
SpinLockAcquire(&local->lock);
533+
{
534+
ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_ENTER, &found);
535+
Assert(!found);
536+
ts->cid = InDoubtGlobalCSN;
537+
ts->nSubxids = nSubxids;
538+
DtmTransactionListAppend(ts);
539+
DtmAddSubtransactions(ts, subxids, nSubxids);
540+
}
541+
SpinLockRelease(&local->lock);
533542
}
534-
SpinLockRelease(&local->lock);
543+
else
544+
{
545+
// inside after-prepare fx
546+
}
547+
535548
}
536549

537550
/*
@@ -559,31 +572,24 @@ DtmLocalPrepare(GlobalTransactionId gtid, cid_t global_cid)
559572
void
560573
DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
561574
{
562-
TransactionId xid = TwoPhaseGetTransactionId(gtid);
575+
TransactionId xid = GetCurrentTransactionIdIfAny();
563576

564-
if (!TransactionIdIsValid(xid))
577+
if (TransactionIdIsValid(xid))
565578
{
566-
// XXX: check that it is global tx with the same xid, XactTopTransactionId?
567-
xid = GetCurrentTransactionId();
579+
Assert(dtm_tx.is_global);
568580
}
569-
570-
dtm_tx.xid = xid;
571-
572-
SpinLockAcquire(&local->lock);
581+
else
573582
{
574-
DtmTransStatus *ts;
575-
DtmTransId *id;
576-
int i;
583+
// inside after-prepare fx
584+
xid = TwoPhaseGetTransactionId(gtid);
585+
// Assert(TransactionIdIsValid(xid));
586+
if (!TransactionIdIsValid(xid))
587+
return; // global ro tx
588+
}
577589

578-
ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_FIND, NULL);
579-
Assert(ts != NULL);
580-
ts->cid = cid;
581-
DtmAdjustSubtransactions(ts);
582-
dtm_sync(cid);
583590

584-
DTM_TRACE((stderr, "Prepare transaction %u(%s) with CSN %lu\n", id->xid, gtid, cid));
585-
}
586-
SpinLockRelease(&local->lock);
591+
dtm_tx.xid = xid;
592+
dtm_tx.csn = cid;
587593

588594
}
589595

@@ -598,7 +604,7 @@ DtmLocalFinish(bool is_commit)
598604

599605
if (x->gtid[0] && finishing_prepared)
600606
{
601-
xid = x->xid;
607+
xid = dtm_tx.xid;
602608
}
603609
else if (!TransactionIdIsValid(xid))
604610
{
@@ -611,19 +617,25 @@ DtmLocalFinish(bool is_commit)
611617
DtmTransStatus *ts;
612618

613619
ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_ENTER, &found);
614-
ts->status = is_commit ? TRANSACTION_STATUS_COMMITTED : TRANSACTION_STATUS_ABORTED;
615620

616-
if (!found)
621+
if (found)
622+
{
623+
Assert(GlobalCSNIsNormal(dtm_tx.csn));
624+
ts->cid = is_commit ? dtm_tx.csn : AbortedGlobalCSN;
625+
626+
dtm_tx.xid = InvalidTransactionId;
627+
dtm_tx.csn = InvalidGlobalCSN;
628+
}
629+
else
617630
{
618-
ts->cid = dtm_get_cid();
619-
ts->nSubxids = 0;
631+
Assert(!GlobalCSNIsNormal(dtm_tx.csn));
632+
ts->cid = is_commit ? dtm_get_cid() : AbortedGlobalCSN;
620633
DtmTransactionListAppend(ts);
621634
}
622635
DtmAdjustSubtransactions(ts);
623636
}
624637
SpinLockRelease(&local->lock);
625638

626-
// DtmAdjustOldestXid();
627639
}
628640

629641
/*
@@ -657,32 +669,14 @@ DtmDeserializeTransactionState(void* ctx)
657669
}
658670

659671

660-
cid_t
661-
DtmGetCsn(TransactionId xid)
662-
{
663-
cid_t csn = 0;
664-
665-
SpinLockAcquire(&local->lock);
666-
{
667-
DtmTransStatus *ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_FIND, NULL);
668-
669-
if (ts != NULL)
670-
{
671-
csn = ts->cid;
672-
}
673-
}
674-
SpinLockRelease(&local->lock);
675-
return csn;
676-
}
677-
678672
/*
679673
* Save state of parepared transaction
680674
*/
681675
void
682676
DtmLocalSavePreparedState(DtmCurrentTrans * x)
683677
{
684678

685-
if (x->gtid[0])
679+
if (dtm_tx.is_global)
686680
{
687681
TransactionId *subxids;
688682
TransactionId xid = GetCurrentTransactionId();
@@ -691,10 +685,11 @@ DtmLocalSavePreparedState(DtmCurrentTrans * x)
691685
SpinLockAcquire(&local->lock);
692686
{
693687
DtmTransStatus *ts;
688+
bool found;
694689

695-
ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_ENTER, NULL);
696-
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
697-
ts->cid = dtm_get_cid();
690+
ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_ENTER, &found);
691+
Assert(!found);
692+
ts->cid = InDoubtGlobalCSN;
698693
ts->nSubxids = nSubxids;
699694
DtmTransactionListAppend(ts);
700695
DtmAddSubtransactions(ts, subxids, nSubxids);
@@ -720,7 +715,6 @@ DtmAddSubtransactions(DtmTransStatus * ts, TransactionId *subxids, int nSubxids)
720715
Assert(TransactionIdIsValid(subxids[i]));
721716
sts = (DtmTransStatus *) hash_search(xid2status, &subxids[i], HASH_ENTER, &found);
722717
Assert(!found);
723-
sts->status = ts->status;
724718
sts->cid = ts->cid;
725719
sts->nSubxids = 0;
726720
DtmTransactionListInsertAfter(ts, sts);

src/include/access/global_snapshot.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@
55

66
typedef int nodeid_t;
77
typedef uint64 cid_t;
8+
typedef cid_t GlobalCSN;
89

910
typedef struct
1011
{
1112
cid_t snapshot;
13+
GlobalCSN csn;
1214
char gtid[MAX_GTID_SIZE];
1315
TransactionId xid;
16+
bool is_global;
1417
} DtmCurrentTrans;
1518

1619
typedef char const *GlobalTransactionId;
@@ -19,6 +22,12 @@ Size GlobalSnapshotShmemSize(void);
1922

2023
extern DtmCurrentTrans dtm_tx;
2124

25+
#define InvalidGlobalCSN 0x0
26+
#define AbortedGlobalCSN 0x2
27+
#define InDoubtGlobalCSN 0x3
28+
#define FirstNormalGlobalCSN 0x4
29+
30+
#define GlobalCSNIsNormal(csn) ((csn) >= FirstNormalGlobalCSN)
2231

2332
/* Initialize DTM extension */
2433
void DtmInitialize(void);

0 commit comments

Comments
 (0)