Skip to content

Commit c69da66

Browse files
committed
DtmAdjustSubtransactions
1 parent 2d059d1 commit c69da66

File tree

3 files changed

+49
-48
lines changed

3 files changed

+49
-48
lines changed

contrib/postgres_fdw/t/001_bank_coordinator.pl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,13 +217,14 @@ sub count_and_delete_rows
217217
($hash1, $_, $hash2) = split "\n", $node->safe_psql('postgres', qq[
218218
begin isolation level repeatable read;
219219
select md5(array_agg((t.*)::text)::text) from (select * from accounts order by id) as t;
220-
select pg_sleep(1);
220+
select pg_sleep(3);
221221
select md5(array_agg((t.*)::text)::text) from (select * from accounts order by id) as t;
222222
commit;
223223
]);
224224

225225
if ($hash1 ne $hash2)
226226
{
227+
diag("oops");
227228
$stability_errors++;
228229
}
229230
elsif ($hash1 eq '' or $hash2 eq '')

src/backend/access/transam/global_snapshot.c

Lines changed: 47 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ static HTAB *xid2status;
8888
static HTAB *gtid2xid;
8989
static DtmNodeState *local;
9090
static uint64 totalSleepInterrupts;
91-
static int DtmVacuumDelay = 10; /* sec */
91+
static int DtmVacuumDelay = 15; /* sec */
9292
static bool finishing_prepared;
9393

9494

@@ -99,6 +99,7 @@ static void DtmAdjustOldestXid(void);
9999
static void DtmInitGlobalXmin(TransactionId xid);
100100
static bool DtmDetectGlobalDeadLock(PGPROC *proc);
101101
static void DtmAddSubtransactions(DtmTransStatus * ts, TransactionId *subxids, int nSubxids);
102+
static void DtmAdjustSubtransactions(DtmTransStatus *ts);
102103
static char const *DtmGetName(void);
103104
static size_t DtmGetTransactionStateSize(void);
104105
static void DtmSerializeTransactionState(void* ctx);
@@ -283,6 +284,20 @@ DtmTransactionListInsertAfter(DtmTransStatus * after, DtmTransStatus * ts)
283284
}
284285
}
285286

287+
static void
288+
DtmAdjustSubtransactions(DtmTransStatus *ts)
289+
{
290+
int i;
291+
int nSubxids = ts->nSubxids;
292+
DtmTransStatus* sts = ts;
293+
294+
for (i = 0; i < nSubxids; i++) {
295+
sts = sts->next;
296+
sts->status = ts->status;
297+
Assert(sts->cid == ts->cid);
298+
}
299+
}
300+
286301
/*
287302
* There can be different oldest XIDs at different cluster node.
288303
* Seince we do not have centralized aribiter, we have to rely in DtmVacuumDelay.
@@ -521,20 +536,23 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
521536
void
522537
DtmLocalBeginPrepare(GlobalTransactionId gtid)
523538
{
539+
// TransactionId xid = TwoPhaseGetTransactionId(gtid);
540+
524541
SpinLockAcquire(&local->lock);
525542
{
526543
DtmTransStatus *ts;
527544
DtmTransId *id;
545+
bool found;
528546

529547
id = (DtmTransId *) hash_search(gtid2xid, gtid, HASH_FIND, NULL);
530548
Assert(id != NULL);
531549
Assert(TransactionIdIsValid(id->xid));
532-
ts = (DtmTransStatus *) hash_search(xid2status, &id->xid, HASH_ENTER, NULL);
550+
ts = (DtmTransStatus *) hash_search(xid2status, &id->xid, HASH_ENTER, &found);
533551
ts->status = TRANSACTION_STATUS_UNKNOWN;
534552
ts->cid = dtm_get_cid();
535-
ts->nSubxids = id->nSubxids;
536-
DtmTransactionListAppend(ts);
537-
DtmAddSubtransactions(ts, id->subxids, id->nSubxids);
553+
if (!found)
554+
ts->nSubxids = 0;
555+
DtmAdjustSubtransactions(ts);
538556
}
539557
SpinLockRelease(&local->lock);
540558
}
@@ -575,11 +593,7 @@ DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
575593
ts = (DtmTransStatus *) hash_search(xid2status, &id->xid, HASH_FIND, NULL);
576594
Assert(ts != NULL);
577595
ts->cid = cid;
578-
for (i = 0; i < ts->nSubxids; i++)
579-
{
580-
ts = ts->next;
581-
ts->cid = cid;
582-
}
596+
DtmAdjustSubtransactions(ts);
583597
dtm_sync(cid);
584598

585599
DTM_TRACE((stderr, "Prepare transaction %u(%s) with CSN %lu\n", id->xid, gtid, cid));
@@ -625,39 +639,14 @@ DtmLocalFinish(bool is_commit)
625639

626640
ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_ENTER, &found);
627641
ts->status = is_commit ? TRANSACTION_STATUS_COMMITTED : TRANSACTION_STATUS_ABORTED;
628-
if (found)
629-
{
630642

631-
if (is_commit) // XXX: why only for commit?
632-
{
633-
int i;
634-
DtmTransStatus *sts = ts;
635-
636-
for (i = 0; i < ts->nSubxids; i++)
637-
{
638-
sts = sts->next;
639-
Assert(sts->cid == ts->cid);
640-
sts->status = TRANSACTION_STATUS_COMMITTED;
641-
}
642-
}
643-
}
644-
else
643+
if (!found)
645644
{
646-
TransactionId *subxids;
647-
648-
Assert(!found);
649645
ts->cid = dtm_get_cid();
646+
ts->nSubxids = 0;
650647
DtmTransactionListAppend(ts);
651-
if (is_commit) // XXX: why?
652-
{
653-
ts->nSubxids = xactGetCommittedChildren(&subxids);
654-
DtmAddSubtransactions(ts, subxids, ts->nSubxids);
655-
}
656-
else
657-
{
658-
ts->nSubxids = 0;
659-
}
660648
}
649+
DtmAdjustSubtransactions(ts);
661650
}
662651
SpinLockRelease(&local->lock);
663652

@@ -722,24 +711,35 @@ DtmLocalSavePreparedState(DtmCurrentTrans * x)
722711

723712
if (x->gtid[0])
724713
{
714+
TransactionId *subxids;
715+
TransactionId xid = GetCurrentTransactionId();
716+
int nSubxids = xactGetCommittedChildren(&subxids);
717+
725718
SpinLockAcquire(&local->lock);
726719
{
727720
DtmTransId *id = (DtmTransId *) hash_search(gtid2xid, x->gtid, HASH_FIND, NULL);
728721

729722
if (id != NULL)
730723
{
731-
TransactionId *subxids;
732-
int nSubxids = xactGetCommittedChildren(&subxids);
733-
734724
id->xid = GetCurrentTransactionId();
735-
if (nSubxids != 0)
736-
{
737-
id->subxids = (TransactionId *) malloc(nSubxids * sizeof(TransactionId));
738-
id->nSubxids = nSubxids;
739-
memcpy(id->subxids, subxids, nSubxids * sizeof(TransactionId));
740-
}
725+
741726
}
742727
}
728+
// SpinLockRelease(&local->lock);
729+
730+
731+
732+
// SpinLockAcquire(&local->lock);
733+
{
734+
DtmTransStatus *ts;
735+
736+
ts = (DtmTransStatus *) hash_search(xid2status, &xid, HASH_ENTER, NULL);
737+
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
738+
ts->cid = dtm_get_cid();
739+
ts->nSubxids = nSubxids;
740+
DtmTransactionListAppend(ts);
741+
DtmAddSubtransactions(ts, subxids, nSubxids);
742+
}
743743
SpinLockRelease(&local->lock);
744744
}
745745
}

0 commit comments

Comments
 (0)