Skip to content

Commit 42c2152

Browse files
committed
mrg
2 parents 366c7d1 + bc92057 commit 42c2152

File tree

2 files changed

+33
-13
lines changed

2 files changed

+33
-13
lines changed

contrib/mmts/multimaster.c

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ void MtmLock(LWLockMode mode)
243243
#else
244244
LWLockAcquire((LWLockId)&Mtm->locks[MTM_STATE_LOCK_ID], mode);
245245
#endif
246+
Mtm->lastLockHolder = MyProcPid;
246247
}
247248

248249
void MtmUnlock(void)
@@ -252,6 +253,7 @@ void MtmUnlock(void)
252253
#else
253254
LWLockRelease((LWLockId)&Mtm->locks[MTM_STATE_LOCK_ID]);
254255
#endif
256+
Mtm->lastLockHolder = 0;
255257
}
256258

257259
void MtmLockNode(int nodeId)
@@ -550,16 +552,20 @@ MtmAdjustOldestXid(TransactionId xid)
550552

551553
static void MtmTransactionListAppend(MtmTransState* ts)
552554
{
553-
ts->next = NULL;
554-
ts->nSubxids = 0;
555-
*Mtm->transListTail = ts;
556-
Mtm->transListTail = &ts->next;
555+
if (!ts->isEnqueued) {
556+
ts->isEnqueued = true;
557+
ts->next = NULL;
558+
ts->nSubxids = 0;
559+
*Mtm->transListTail = ts;
560+
Mtm->transListTail = &ts->next;
561+
}
557562
}
558563

559564
static void MtmTransactionListInsertAfter(MtmTransState* after, MtmTransState* ts)
560565
{
561566
ts->next = after->next;
562567
after->next = ts;
568+
ts->isEnqueued = true;
563569
if (Mtm->transListTail == &after->next) {
564570
Mtm->transListTail = &ts->next;
565571
}
@@ -700,6 +706,9 @@ MtmCreateTransState(MtmCurrentTrans* x)
700706
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
701707
ts->snapshot = x->snapshot;
702708
ts->isLocal = true;
709+
if (!found) {
710+
ts->isEnqueued = false;
711+
}
703712
if (TransactionIdIsValid(x->gtid.xid)) {
704713
Assert(x->gtid.node != MtmNodeId);
705714
ts->gtid = x->gtid;
@@ -833,6 +842,9 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
833842
Assert(x->gid[0]);
834843
tm->state = ts;
835844
ts->votingCompleted = true;
845+
if (!found) {
846+
ts->isEnqueued = false;
847+
}
836848
if (Mtm->status != MTM_RECOVERY) {
837849
MtmSendNotificationMessage(ts, MSG_READY); /* send notification to coordinator */
838850
if (!MtmUseDtm) {
@@ -945,8 +957,12 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
945957
*/
946958
MTM_LOG1("%d: send ABORT notification abort transaction %d to coordinator %d", MyProcPid, x->gtid.xid, x->gtid.node);
947959
if (ts == NULL) {
960+
bool found;
948961
Assert(TransactionIdIsValid(x->xid));
949-
ts = hash_search(MtmXid2State, &x->xid, HASH_ENTER, NULL);
962+
ts = hash_search(MtmXid2State, &x->xid, HASH_ENTER, &found);
963+
if (!found) {
964+
ts->isEnqueued = false;
965+
}
950966
ts->status = TRANSACTION_STATUS_ABORTED;
951967
ts->isLocal = true;
952968
ts->snapshot = x->snapshot;
@@ -1364,7 +1380,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
13641380
*/
13651381
bool MtmRefreshClusterStatus(bool nowait)
13661382
{
1367-
nodemask_t mask, clique, disabled, enabled;
1383+
nodemask_t mask, clique, disabled;
13681384
nodemask_t matrix[MAX_NODES];
13691385
MtmTransState *ts;
13701386
int clique_size;
@@ -1391,28 +1407,29 @@ bool MtmRefreshClusterStatus(bool nowait)
13911407
MTM_LOG1("Find clique %lx, disabledNodeMask %lx", (long) clique, (long) Mtm->disabledNodeMask);
13921408
MtmLock(LW_EXCLUSIVE);
13931409
disabled = ~clique & (((nodemask_t)1 << Mtm->nAllNodes)-1) & ~Mtm->disabledNodeMask; /* new disabled nodes mask */
1394-
enabled = clique & Mtm->disabledNodeMask; /* new enabled nodes mask */
13951410

13961411
for (i = 0, mask = disabled; mask != 0; i++, mask >>= 1) {
13971412
if (mask & 1) {
13981413
MtmDisableNode(i+1);
13991414
}
1400-
}
1401-
1415+
}
1416+
#if 0 /* Do not enable nodes here: them will be enabled after completion of recovery */
1417+
enabled = clique & Mtm->disabledNodeMask; /* new enabled nodes mask */
14021418
for (i = 0, mask = enabled; mask != 0; i++, mask >>= 1) {
14031419
if (mask & 1) {
14041420
MtmEnableNode(i+1);
14051421
}
14061422
}
1407-
if (disabled|enabled) {
1423+
#endif
1424+
if (disabled) {
14081425
MtmCheckQuorum();
14091426
}
14101427
/* Interrupt voting for active transaction and abort them */
14111428
for (ts = Mtm->transListHead; ts != NULL; ts = ts->next) {
14121429
MTM_LOG3("Active transaction gid='%s', coordinator=%d, xid=%d, status=%d, gtid.xid=%d",
14131430
ts->gid, ts->gtid.node, ts->xid, ts->status, ts->gtid.xid);
14141431
if (MtmIsCoordinator(ts)) {
1415-
if (!ts->votingCompleted && (disabled|enabled) != 0 && ts->status != TRANSACTION_STATUS_ABORTED) {
1432+
if (!ts->votingCompleted && disabled != 0 && ts->status != TRANSACTION_STATUS_ABORTED) {
14161433
MtmAbortTransaction(ts);
14171434
MtmWakeUpBackend(ts);
14181435
}
@@ -2222,6 +2239,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
22222239
{
22232240
if (nodeId <= 0 || nodeId > Mtm->nLiveNodes)
22242241
{
2242+
MtmUnlock();
22252243
elog(ERROR, "NodeID %d is out of range [1,%d]", nodeId, Mtm->nLiveNodes);
22262244
}
22272245
MtmDisableNode(nodeId);
@@ -2287,6 +2305,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
22872305
MtmEnableNode(MtmReplicationNodeId);
22882306
MtmCheckQuorum();
22892307
} else {
2308+
MtmUnlock();
22902309
elog(ERROR, "Disabled node %d tries to reconnect without recovery", MtmReplicationNodeId);
22912310
}
22922311
} else {

contrib/mmts/multimaster.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ typedef struct MtmTransState
163163
struct MtmTransState* next; /* Next element in L1 list of all finished transaction present in xid2state hash */
164164
bool votingCompleted; /* 2PC voting is completed */
165165
bool isLocal; /* Transaction is either replicated, either doesn't contain DML statements, so it shoudl be ignored by pglogical replication */
166-
TransactionId xids[1]; /* [Mtm->nAllNodes]: transaction ID at replicas */
166+
bool isEnqueued; /* Transaction is inserted in queue */
167+
TransactionId xids[1]; /* [Mtm->nAllNodes]: transaction ID at replicas */
167168
} MtmTransState;
168169

169170
typedef struct
@@ -180,7 +181,7 @@ typedef struct
180181
nodemask_t walSenderLockerMask; /* Mask of WAL-senders IDs locking the cluster */
181182
nodemask_t nodeLockerMask; /* Mask of node IDs which WAL-senders are locking the cluster */
182183
nodemask_t reconnectMask; /* Mask of nodes connection to which has to be reestablished by sender */
183-
184+
int lastLockHolder; /* PID of process last obtaning the node lock */
184185
bool localTablesHashLoaded; /* Whether data from local_tables table is loaded in shared memory hash table */
185186
int inject2PCError; /* Simulate error during 2PC commit at this node */
186187
int nLiveNodes; /* Number of active nodes */

0 commit comments

Comments
 (0)