Skip to content

Commit c7211e7

Browse files
committed
Start implementation of lock manager
1 parent 14d6ed8 commit c7211e7

File tree

9 files changed

+159
-26
lines changed

9 files changed

+159
-26
lines changed

contrib/pg_dtm/libdtm.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,3 +499,8 @@ int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first)
499499
);
500500
return 0;
501501
}
502+
503+
bool DtmGlobalDetectDeadLock(void* data, int size)
504+
{
505+
return false;
506+
}

contrib/pg_dtm/libdtm.h

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,38 +8,57 @@
88

99
#define INVALID_XID 0
1010

11-
// Sets up the host and port for DTM connection.
12-
// The defaults are "127.0.0.1" and 5431.
11+
/**
12+
* Sets up the host and port for DTM connection.
13+
* The defaults are "127.0.0.1" and 5431.
14+
*/
1315
void DtmGlobalConfig(char *host, int port, char* sock_dir);
1416

1517
void DtmInitSnapshot(Snapshot snapshot);
1618

17-
// Starts a new global transaction. Returns the
18-
// transaction id, fills the 'snapshot' and 'gxmin' on success. 'gxmin' is the
19-
// smallest xmin among all snapshots known to DTM. Returns INVALID_XID
20-
// otherwise.
19+
/**
20+
* Starts a new global transaction. Returns the
21+
* transaction id, fills the 'snapshot' and 'gxmin' on success. 'gxmin' is the
22+
* smallest xmin among all snapshots known to DTM. Returns INVALID_XID
23+
* otherwise.
24+
*/
2125
TransactionId DtmGlobalStartTransaction(Snapshot snapshot, TransactionId *gxmin);
2226

23-
// Asks the DTM for a fresh snapshot. Fills the 'snapshot' and 'gxmin' on
24-
// success. 'gxmin' is the smallest xmin among all snapshots known to DTM.
27+
/**
28+
* Asks the DTM for a fresh snapshot. Fills the 'snapshot' and 'gxmin' on
29+
* success. 'gxmin' is the smallest xmin among all snapshots known to DTM.
30+
*/
2531
void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot, TransactionId *gxmin);
2632

27-
// Commits transaction only once all participants have called this function,
28-
// does not change CLOG otherwise. Set 'wait' to 'true' if you want this call
29-
// to return only after the transaction is considered finished by the DTM.
30-
// Returns the status on success, or -1 otherwise.
33+
/**
34+
* Commits transaction only once all participants have called this function,
35+
* does not change CLOG otherwise. Set 'wait' to 'true' if you want this call
36+
* to return only after the transaction is considered finished by the DTM.
37+
* Returns the status on success, or -1 otherwise.
38+
*/
3139
XidStatus DtmGlobalSetTransStatus(TransactionId xid, XidStatus status, bool wait);
3240

33-
// Gets the status of the transaction identified by 'xid'. Returns the status
34-
// on success, or -1 otherwise. If 'wait' is true, then it does not return
35-
// until the transaction is finished.
41+
/**
42+
* Gets the status of the transaction identified by 'xid'. Returns the status
43+
* on success, or -1 otherwise. If 'wait' is true, then it does not return
44+
* until the transaction is finished.
45+
*/
3646
XidStatus DtmGlobalGetTransStatus(TransactionId xid, bool wait);
3747

38-
// Reserves at least 'nXids' successive xids for local transactions. The xids
39-
// reserved are not less than 'xid' in value. Returns the actual number of xids
40-
// reserved, and sets the 'first' xid accordingly. The number of xids reserved
41-
// is guaranteed to be at least nXids.
42-
// In other words, *first ≥ xid and result ≥ nXids.
48+
/**
49+
* Reserves at least 'nXids' successive xids for local transactions. The xids
50+
* reserved are not less than 'xid' in value. Returns the actual number of xids
51+
* reserved, and sets the 'first' xid accordingly. The number of xids reserved
52+
* is guaranteed to be at least nXids.
53+
* In other words, *first ≥ xid and result ≥ nXids.
54+
*/
4355
int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first);
4456

57+
/**
58+
* Detect global deadlock. This funcions send serialized local resource graph to arbiter which appends them to global graph.
59+
* Once loop is detected in global resoruce graph, arbiter returns true. Otherwise false is returned.
60+
* Abiter should replace local part of resource graph if new graph is recevied from this cluster node (not backend).
61+
*/
62+
bool DtmGlobalDetectDeadLock(void* graph, int size);
63+
4564
#endif

contrib/pg_dtm/pg_dtm.c

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ typedef struct
5454
size_t nReservedXids; /* number of XIDs reserved for local transactions */
5555
} DtmState;
5656

57+
typedef struct
58+
{
59+
char* data;
60+
int size;
61+
int used;
62+
} ByteBuffer;
63+
5764

5865
#define DTM_SHMEM_SIZE (1024*1024)
5966
#define DTM_HASH_SIZE 1003
@@ -72,13 +79,21 @@ static TransactionId DtmGetNextXid(void);
7279
static TransactionId DtmGetNewTransactionId(bool isSubXact);
7380
static TransactionId DtmGetOldestXmin(Relation rel, bool ignoreVacuum);
7481
static TransactionId DtmGetGlobalTransactionId(void);
82+
static bool DtmDetectGlobalDeadLock(void);
83+
84+
static void DtmSerializeLock(PROCLOCK* lock, void* arg);
7585

7686
static bool TransactionIdIsInSnapshot(TransactionId xid, Snapshot snapshot);
7787
static bool TransactionIdIsInDoubt(TransactionId xid);
7888

7989
static void DtmShmemStartup(void);
8090
static void DtmBackgroundWorker(Datum arg);
8191

92+
static void ByteBufferAlloc(ByteBuffer* buf);
93+
static void ByteBufferAppend(ByteBuffer* buf, void* data, int len);
94+
static void ByteBufferFree(ByteBuffer* buf);
95+
96+
8297
static shmem_startup_hook_type prev_shmem_startup_hook;
8398
static HTAB* xid_in_doubt;
8499
static DtmState* dtm;
@@ -99,7 +114,8 @@ static TransactionManager DtmTM = {
99114
DtmGetOldestXmin,
100115
PgTransactionIdIsInProgress,
101116
DtmGetGlobalTransactionId,
102-
PgXidInMVCCSnapshot
117+
PgXidInMVCCSnapshot,
118+
DtmDetectGlobalDeadLock
103119
};
104120

105121
static char *DtmHost;
@@ -942,3 +958,49 @@ void DtmBackgroundWorker(Datum arg)
942958
ShubInitialize(&shub, &params);
943959
ShubLoop(&shub);
944960
}
961+
962+
static void ByteBufferAlloc(ByteBuffer* buf)
963+
{
964+
buf->size = 1024;
965+
buf->data = palloc(buf->size);
966+
buf->used = 0;
967+
}
968+
969+
static void ByteBufferAppend(ByteBuffer* buf, void* data, int len)
970+
{
971+
if (buf->used + len > buf->size) {
972+
buf->size = buf->used + len > buf->size*2 ? buf->used + len : buf->size*2;
973+
buf->data = (char*)repalloc(buf->data, buf->size);
974+
}
975+
memcpy(&buf->data[buf->used], data, len);
976+
buf->used += len;
977+
}
978+
979+
static void ByteBufferFree(ByteBuffer* buf)
980+
{
981+
pfree(buf->data);
982+
}
983+
984+
#define APPEND(buf, x) ByteBufferAppend(buf, &x, sizeof(x))
985+
986+
static void DtmSerializeLock(PROCLOCK* proclock, void* arg)
987+
{
988+
ByteBuffer* buf = (ByteBuffer*)arg;
989+
LOCK* lock = proclock->tag.myLock;
990+
if (lock != NULL) {
991+
APPEND(buf, proclock->tag.myProc->lxid);
992+
APPEND(buf, proclock->holdMask);
993+
APPEND(buf, lock->tag.locktag_lockmethodid);
994+
}
995+
}
996+
997+
bool DtmDetectGlobalDeadLock(void)
998+
{
999+
bool hasDeadlock;
1000+
ByteBuffer buf;
1001+
ByteBufferAlloc(&buf);
1002+
EnumerateLocks(DtmSerializeLock, &buf);
1003+
hasDeadlock = DtmGlobalDetectDeadLock(buf.data, buf.used);
1004+
ByteBufferFree(&buf);
1005+
return hasDeadlock;
1006+
}

src/backend/access/transam/xtm.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ TransactionId PgGetGlobalTransactionId(void)
2323
return InvalidTransactionId;
2424
}
2525

26+
bool PgDetectGlobalDeadLock()
27+
{
28+
return false;
29+
}
30+
2631
TransactionManager PgTM = {
2732
PgTransactionIdGetStatus,
2833
PgTransactionIdSetTreeStatus,
@@ -31,7 +36,8 @@ TransactionManager PgTM = {
3136
PgGetOldestXmin,
3237
PgTransactionIdIsInProgress,
3338
PgGetGlobalTransactionId,
34-
PgXidInMVCCSnapshot
39+
PgXidInMVCCSnapshot,
40+
PgDetectGlobalDeadLock
3541
};
3642

3743
TransactionManager* TM = &PgTM;

src/backend/storage/lmgr/deadlock.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "pgstat.h"
3131
#include "storage/lmgr.h"
3232
#include "storage/proc.h"
33+
#include "access/xtm.h"
3334
#include "utils/memutils.h"
3435

3536

@@ -268,7 +269,7 @@ DeadLockCheck(PGPROC *proc)
268269
else if (blocking_autovacuum_proc != NULL)
269270
return DS_BLOCKED_BY_AUTOVACUUM;
270271
else
271-
return DS_NO_DEADLOCK;
272+
return TM->DetectGlobalDeadLock() ? DS_DISTRIBUTED_DEADLOCK : DS_NO_DEADLOCK;
272273
}
273274

274275
/*

src/backend/storage/lmgr/lock.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3559,6 +3559,20 @@ GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode)
35593559
return LockMethods[lockmethodid]->lockModeNames[mode];
35603560
}
35613561

3562+
void
3563+
EnumerateLocks(LockIterator iterator, void* arg)
3564+
{
3565+
PROCLOCK *proclock;
3566+
HASH_SEQ_STATUS status;
3567+
3568+
hash_seq_init(&status, LockMethodProcLockHash);
3569+
3570+
while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
3571+
{
3572+
iterator(proclock, arg);
3573+
}
3574+
}
3575+
35623576
#ifdef LOCK_DEBUG
35633577
/*
35643578
* Dump all locks in the given proc's myProcLocks lists.

src/backend/storage/lmgr/proc.c

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1306,6 +1306,22 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
13061306
"Processes holding the lock: %s. Wait queue: %s.",
13071307
lockHoldersNum, lock_holders_sbuf.data, lock_waiters_sbuf.data))));
13081308
}
1309+
else if (deadlock_state == DS_DISTRIBUTED_DEADLOCK)
1310+
{
1311+
/*
1312+
* This message is a bit redundant with the error that will be
1313+
* reported subsequently, but in some cases the error report
1314+
* might not make it to the log (eg, if it's caught by an
1315+
* exception handler), and we want to ensure all long-wait
1316+
* events get logged.
1317+
*/
1318+
ereport(LOG,
1319+
(errmsg("process %d detected distributed deadlock while waiting for %s on %s after %ld.%03d ms",
1320+
MyProcPid, modename, buf.data, msecs, usecs),
1321+
(errdetail_log_plural("Process holding the lock: %s. Wait queue: %s.",
1322+
"Processes holding the lock: %s. Wait queue: %s.",
1323+
lockHoldersNum, lock_holders_sbuf.data, lock_waiters_sbuf.data))));
1324+
}
13091325

13101326
if (myWaitStatus == STATUS_WAITING)
13111327
ereport(LOG,
@@ -1330,7 +1346,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
13301346
* future-proofing, print a message if it looks like someone
13311347
* else kicked us off the lock.
13321348
*/
1333-
if (deadlock_state != DS_HARD_DEADLOCK)
1349+
if (deadlock_state != DS_HARD_DEADLOCK && deadlock_state != DS_DISTRIBUTED_DEADLOCK)
13341350
ereport(LOG,
13351351
(errmsg("process %d failed to acquire %s on %s after %ld.%03d ms",
13361352
MyProcPid, modename, buf.data, msecs, usecs),
@@ -1547,7 +1563,7 @@ CheckDeadLock(void)
15471563
/* Run the deadlock check, and set deadlock_state for use by ProcSleep */
15481564
deadlock_state = DeadLockCheck(MyProc);
15491565

1550-
if (deadlock_state == DS_HARD_DEADLOCK)
1566+
if (deadlock_state == DS_HARD_DEADLOCK || deadlock_state == DS_DISTRIBUTED_DEADLOCK)
15511567
{
15521568
/*
15531569
* Oops. We have a deadlock.

src/include/access/xtm.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ typedef struct
4040

4141
/* Is the given XID still-in-progress according to the snapshot (encapsulation of XidInMVCCSnapshot in tqual.c) */
4242
bool (*IsInSnapshot)(TransactionId xid, Snapshot snapshot);
43+
44+
/* Detect distributed deadlock */
45+
bool (*DetectGlobalDeadLock)(void);
4346
} TransactionManager;
4447

4548
/* Get pointer to transaction manager: actually returns content of TM variable */
@@ -65,4 +68,6 @@ extern TransactionId PgGetGlobalTransactionId(void);
6568

6669
extern TransactionId PgGetNewTransactionId(bool isSubXact);
6770

71+
extern bool PgDetectGlobalDeadLock(void);
72+
6873
#endif

src/include/storage/lock.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,8 +453,9 @@ typedef enum
453453
DS_NO_DEADLOCK, /* no deadlock detected */
454454
DS_SOFT_DEADLOCK, /* deadlock avoided by queue rearrangement */
455455
DS_HARD_DEADLOCK, /* deadlock, no way out but ERROR */
456-
DS_BLOCKED_BY_AUTOVACUUM /* no deadlock; queue blocked by autovacuum
456+
DS_BLOCKED_BY_AUTOVACUUM, /* no deadlock; queue blocked by autovacuum
457457
* worker */
458+
DS_DISTRIBUTED_DEADLOCK /* distributed deadlock detected by DTM */
458459
} DeadLockState;
459460

460461

@@ -536,6 +537,10 @@ extern void DumpLocks(PGPROC *proc);
536537
extern void DumpAllLocks(void);
537538
#endif
538539

540+
typedef void(*LockIterator)(PROCLOCK* lock, void* arg);
541+
542+
extern void EnumerateLocks(LockIterator iterator, void* arg);
543+
539544
/* Lock a VXID (used to wait for a transaction to finish) */
540545
extern void VirtualXactLockTableInsert(VirtualTransactionId vxid);
541546
extern void VirtualXactLockTableCleanup(void);

0 commit comments

Comments
 (0)