Skip to content

Commit 5f4887e

Browse files
committed
global_snapshot_xmin support in procarray
1 parent 544ad87 commit 5f4887e

File tree

5 files changed

+50
-54
lines changed

5 files changed

+50
-54
lines changed

contrib/postgres_fdw/t/001_bank_check.pl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
my $isolation_error = 0;
6868

6969

70-
$master->pgbench(-n, -c => 5, -t => 10, -f => "$TestLib::log_path/../../t/bank.pgb", 'postgres' );
70+
$master->pgbench(-n, -c => 20, -t => 30, -f => "$TestLib::log_path/../../t/bank.pgb", 'postgres' );
7171

7272
my $pgb_handle = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => "$TestLib::log_path/../../t/bank.pgb", 'postgres' );
7373

contrib/postgres_fdw/t/bank.pgb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@
22

33
BEGIN;
44
UPDATE accounts SET amount = amount - 1 WHERE id = :id;
5+
--select pg_sleep(0.5*random());
56
UPDATE accounts SET amount = amount + 1 WHERE id = (:id + 1);
67
COMMIT;

src/backend/access/transam/global_snapshot.c

Lines changed: 20 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "storage/s_lock.h"
1717
#include "storage/spin.h"
1818
#include "storage/lmgr.h"
19+
#include "storage/procarray.h"
1920
#include "storage/shmem.h"
2021
#include "storage/ipc.h"
2122
#include "access/xlogdefs.h"
@@ -60,8 +61,6 @@ typedef struct
6061
{
6162
cid_t cid; /* last assigned CSN; used to provide unique
6263
* ascending CSNs */
63-
TransactionId oldest_xid; /* XID of oldest transaction visible by any
64-
* active transaction (local or global) */
6564
long time_shift; /* correction to system time */
6665
volatile slock_t lock; /* spinlock to protect access to hash table */
6766
DtmTransStatus *trans_list_head; /* L1 list of finished transactions
@@ -94,8 +93,6 @@ static bool DtmRecordCommits = 0;
9493

9594
DtmCurrentTrans dtm_tx; // XXXX: make static
9695

97-
static Snapshot DtmGetSnapshot(Snapshot snapshot);
98-
static TransactionId DtmGetOldestXmin(Relation rel, int flags);
9996
static bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
10097
static void DtmAdjustOldestXid(void);
10198
static bool DtmDetectGlobalDeadLock(PGPROC *proc);
@@ -109,9 +106,9 @@ static void DtmDeserializeTransactionState(void* ctx);
109106
static TransactionManager DtmTM = {
110107
PgTransactionIdGetStatus,
111108
PgTransactionIdSetTreeStatus,
112-
DtmGetSnapshot,
109+
PgGetSnapshotData,
113110
PgGetNewTransactionId,
114-
DtmGetOldestXmin,
111+
PgGetOldestXmin,
115112
PgTransactionIdIsInProgress,
116113
PgGetGlobalTransactionId,
117114
DtmXidInMVCCSnapshot,
@@ -329,16 +326,16 @@ static void
329326
DtmAdjustOldestXid()
330327
{
331328
DtmTransStatus *ts,
332-
*prev = NULL;
329+
*prev = NULL;
330+
timestamp_t cutoff_time;
331+
TransactionId oldest_xid = InvalidTransactionId;
332+
int total = 0,
333+
deleted = 0;
333334

334-
timestamp_t cutoff_time = dtm_get_current_time() - DtmVacuumDelay * USEC;
335-
int total = 0, deleted = 0;
335+
cutoff_time = dtm_get_current_time() - DtmVacuumDelay * USEC;
336336

337337
SpinLockAcquire(&local->lock);
338338

339-
for (ts = local->trans_list_head; ts != NULL; ts = ts->next)
340-
total++;
341-
342339
for (ts = local->trans_list_head; ts != NULL && ts->cid < cutoff_time; prev = ts, ts = ts->next)
343340
{
344341
if (prev != NULL)
@@ -351,53 +348,24 @@ DtmAdjustOldestXid()
351348
if (prev != NULL)
352349
local->trans_list_head = prev;
353350

354-
if (ts != NULL)
355-
local->oldest_xid = ts->xid;
356-
else
357-
local->oldest_xid = InvalidTransactionId;
351+
if (local->trans_list_head)
352+
oldest_xid = local->trans_list_head->xid;
358353

359-
SpinLockRelease(&local->lock);
360-
361-
// elog(LOG, "DtmAdjustOldestXid total=%d, deleted=%d, xid=%d, prev=%p, ts=%p", total, deleted, local->oldest_xid, prev, ts);
362-
}
363-
364-
Snapshot
365-
DtmGetSnapshot(Snapshot snapshot)
366-
{
367-
snapshot = PgGetSnapshotData(snapshot);
368-
// RecentGlobalDataXmin = RecentGlobalXmin = DtmAdjustOldestXid(RecentGlobalDataXmin);
369-
SpinLockAcquire(&local->lock);
370-
371-
if (TransactionIdIsValid(local->oldest_xid) &&
372-
TransactionIdPrecedes(local->oldest_xid, RecentGlobalXmin))
373-
RecentGlobalXmin = local->oldest_xid;
374-
375-
if (TransactionIdIsValid(local->oldest_xid) &&
376-
TransactionIdPrecedes(local->oldest_xid, RecentGlobalDataXmin))
377-
RecentGlobalDataXmin = local->oldest_xid;
354+
for (ts = local->trans_list_head; ts != NULL; ts = ts->next)
355+
{
356+
if (TransactionIdPrecedes(ts->xid, oldest_xid))
357+
oldest_xid = ts->xid;
358+
total++;
359+
}
378360

379361
SpinLockRelease(&local->lock);
380-
return snapshot;
381-
}
382-
383-
TransactionId
384-
DtmGetOldestXmin(Relation rel, int flags)
385-
{
386-
TransactionId xmin = PgGetOldestXmin(rel, flags);
387362

388-
// xmin = DtmAdjustOldestXid(xmin);
363+
ProcArraySetGlobalSnapshotXmin(oldest_xid);
389364

390-
SpinLockAcquire(&local->lock);
391-
392-
if (TransactionIdIsValid(local->oldest_xid) &&
393-
TransactionIdPrecedes(local->oldest_xid, xmin))
394-
xmin = local->oldest_xid;
395-
396-
SpinLockRelease(&local->lock);
397-
398-
return xmin;
365+
// elog(LOG, "DtmAdjustOldestXid total=%d, deleted=%d, xid=%d, prev=%p, ts=%p", total, deleted, oldest_xid, prev, ts);
399366
}
400367

368+
401369
/*
402370
* Check tuple bisibility based on CSN of current transaction.
403371
* If there is no niformation about transaction with this XID, then use standard PostgreSQL visibility rules.
@@ -487,7 +455,6 @@ DtmInitialize()
487455
if (!found)
488456
{
489457
local->time_shift = 0;
490-
local->oldest_xid = FirstNormalTransactionId;
491458
local->cid = dtm_get_current_time();
492459
local->trans_list_head = NULL;
493460
local->trans_list_tail = &local->trans_list_head;

src/backend/storage/ipc/procarray.c

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ typedef struct ProcArrayStruct
9292
/* oldest catalog xmin of any replication slot */
9393
TransactionId replication_slot_catalog_xmin;
9494

95+
/* xmin of oldest active global snapshot */
96+
TransactionId global_snapshot_xmin;
97+
9598
/* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */
9699
int pgprocnos[FLEXIBLE_ARRAY_MEMBER];
97100
} ProcArrayStruct;
@@ -246,6 +249,7 @@ CreateSharedProcArray(void)
246249
procArray->lastOverflowedXid = InvalidTransactionId;
247250
procArray->replication_slot_xmin = InvalidTransactionId;
248251
procArray->replication_slot_catalog_xmin = InvalidTransactionId;
252+
procArray->global_snapshot_xmin = InvalidTransactionId;
249253
}
250254

251255
allProcs = ProcGlobal->allProcs;
@@ -1333,6 +1337,7 @@ PgGetOldestXmin(Relation rel, int flags)
13331337

13341338
volatile TransactionId replication_slot_xmin = InvalidTransactionId;
13351339
volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
1340+
volatile TransactionId global_snapshot_xmin = InvalidTransactionId;
13361341

13371342
/*
13381343
* If we're not computing a relation specific limit, or if a shared
@@ -1394,6 +1399,7 @@ PgGetOldestXmin(Relation rel, int flags)
13941399
/* fetch into volatile var while ProcArrayLock is held */
13951400
replication_slot_xmin = procArray->replication_slot_xmin;
13961401
replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
1402+
global_snapshot_xmin = procArray->global_snapshot_xmin;
13971403

13981404
if (RecoveryInProgress())
13991405
{
@@ -1435,6 +1441,10 @@ PgGetOldestXmin(Relation rel, int flags)
14351441
result = FirstNormalTransactionId;
14361442
}
14371443

1444+
if (TransactionIdIsValid(global_snapshot_xmin) &&
1445+
NormalTransactionIdPrecedes(global_snapshot_xmin, result))
1446+
result = global_snapshot_xmin;
1447+
14381448
/*
14391449
* Check whether there are replication slots requiring an older xmin.
14401450
*/
@@ -1536,6 +1546,7 @@ PgGetSnapshotData(Snapshot snapshot)
15361546
bool suboverflowed = false;
15371547
volatile TransactionId replication_slot_xmin = InvalidTransactionId;
15381548
volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
1549+
volatile TransactionId global_snapshot_xmin = InvalidTransactionId;
15391550

15401551
Assert(snapshot != NULL);
15411552

@@ -1724,6 +1735,7 @@ PgGetSnapshotData(Snapshot snapshot)
17241735
/* fetch into volatile var while ProcArrayLock is held */
17251736
replication_slot_xmin = procArray->replication_slot_xmin;
17261737
replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
1738+
global_snapshot_xmin = procArray->global_snapshot_xmin;
17271739

17281740
if (!TransactionIdIsValid(MyPgXact->xmin))
17291741
MyPgXact->xmin = TransactionXmin = xmin;
@@ -1743,6 +1755,10 @@ PgGetSnapshotData(Snapshot snapshot)
17431755
if (!TransactionIdIsNormal(RecentGlobalXmin))
17441756
RecentGlobalXmin = FirstNormalTransactionId;
17451757

1758+
if (TransactionIdIsValid(global_snapshot_xmin) &&
1759+
TransactionIdPrecedes(global_snapshot_xmin, RecentGlobalXmin))
1760+
RecentGlobalXmin = global_snapshot_xmin;
1761+
17461762
/* Check whether there's a replication slot requiring an older xmin. */
17471763
if (TransactionIdIsValid(replication_slot_xmin) &&
17481764
NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
@@ -3015,6 +3031,16 @@ ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
30153031
LWLockRelease(ProcArrayLock);
30163032
}
30173033

3034+
/*
3035+
* ProcArraySetGlobalSnapshotXmin
3036+
*/
3037+
void
3038+
ProcArraySetGlobalSnapshotXmin(TransactionId xmin)
3039+
{
3040+
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3041+
procArray->global_snapshot_xmin = xmin;
3042+
LWLockRelease(ProcArrayLock);
3043+
}
30183044

30193045
#define XidCacheRemove(i) \
30203046
do { \

src/include/storage/procarray.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,4 +124,6 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
124124
extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
125125
TransactionId *catalog_xmin);
126126

127+
extern void ProcArraySetGlobalSnapshotXmin(TransactionId xmin);
128+
127129
#endif /* PROCARRAY_H */

0 commit comments

Comments
 (0)