Skip to content

Commit 1bc810d

Browse files
committed
Fix start of pglogical_receiver
1 parent 410b5a2 commit 1bc810d

File tree

3 files changed

+37
-23
lines changed

3 files changed

+37
-23
lines changed

contrib/mmts/multimaster.c

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -278,13 +278,13 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
278278
{
279279
if (ts->csn > dtmTx.snapshot) {
280280
MTM_TUPLE_TRACE("%d: tuple with xid=%d(csn=%ld) is invisibile in snapshot %ld\n",
281-
getpid(), xid, ts->csn, dtmTx.snapshot);
281+
MyProcPid, xid, ts->csn, dtmTx.snapshot);
282282
MtmUnlock();
283283
return true;
284284
}
285285
if (ts->status == TRANSACTION_STATUS_UNKNOWN)
286286
{
287-
MTM_TRACE("%d: wait for in-doubt transaction %u in snapshot %lu\n", getpid(), xid, dtmTx.snapshot);
287+
MTM_TRACE("%d: wait for in-doubt transaction %u in snapshot %lu\n", MyProcPid, xid, dtmTx.snapshot);
288288
MtmUnlock();
289289
#if TRACE_SLEEP_TIME
290290
{
@@ -316,14 +316,14 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
316316
{
317317
bool invisible = ts->status != TRANSACTION_STATUS_COMMITTED;
318318
MTM_TUPLE_TRACE("%d: tuple with xid=%d(csn= %ld) is %s in snapshot %ld\n",
319-
getpid(), xid, ts->csn, invisible ? "rollbacked" : "committed", dtmTx.snapshot);
319+
MyProcPid, xid, ts->csn, invisible ? "rollbacked" : "committed", dtmTx.snapshot);
320320
MtmUnlock();
321321
return invisible;
322322
}
323323
}
324324
else
325325
{
326-
MTM_TUPLE_TRACE("%d: visibility check is skept for transaction %u in snapshot %lu\n", getpid(), xid, dtmTx.snapshot);
326+
MTM_TUPLE_TRACE("%d: visibility check is skept for transaction %u in snapshot %lu\n", MyProcPid, xid, dtmTx.snapshot);
327327
break;
328328
}
329329
}
@@ -493,7 +493,7 @@ MtmXactCallback(XactEvent event, void *arg)
493493
static bool
494494
MtmIsUserTransaction()
495495
{
496-
return IsNormalProcessingMode() && dtm->status == MTM_ONLINE && MtmDoReplication && !am_walsender && !IsBackgroundWorker && !IsAutoVacuumWorkerProcess();
496+
return IsNormalProcessingMode() && MtmDoReplication && !am_walsender && !IsBackgroundWorker && !IsAutoVacuumWorkerProcess();
497497
}
498498

499499
static void
@@ -504,6 +504,10 @@ MtmBeginTransaction(MtmCurrentTrans* x)
504504
x->xid = GetCurrentTransactionIdIfAny();
505505
x->isReplicated = false;
506506
x->isDistributed = MtmIsUserTransaction();
507+
if (x->isDistributed && dtm->status != MTM_ONLINE) {
508+
MtmUnlock();
509+
elog(ERROR, "Multimaster node is offline");
510+
}
507511
x->containsDML = false;
508512
x->isPrepared = false;
509513
x->snapshot = MtmAssignCSN();
@@ -614,7 +618,7 @@ static void MtmPrecommitTransaction(MtmCurrentTrans* x)
614618

615619
MtmUnlock();
616620

617-
MTM_TRACE("%d: MtmPrepareTransaction prepare commit of %d CSN=%ld\n", getpid(), x->xid, ts->csn);
621+
MTM_TRACE("%d: MtmPrepareTransaction prepare commit of %d CSN=%ld\n", MyProcPid, x->xid, ts->csn);
618622
}
619623

620624
static void
@@ -728,14 +732,14 @@ MtmCommitTransaction(TransactionId xid, int nsubxids, TransactionId *subxids)
728732
ts = hash_search(xid2state, &xid, HASH_FIND, NULL);
729733
Assert(ts != NULL); /* should be created by MtmPrepareTransaction */
730734

731-
MTM_TRACE("%d: MtmCommitTransaction begin commit of %d CSN=%ld\n", getpid(), xid, ts->csn);
735+
MTM_TRACE("%d: MtmCommitTransaction begin commit of %d CSN=%ld\n", MyProcPid, xid, ts->csn);
732736
MtmAddSubtransactions(ts, subxids, nsubxids);
733737

734738
MtmVoteForTransaction(ts);
735739

736740
MtmUnlock();
737741

738-
MTM_TRACE("%d: MtmCommitTransaction %d status=%d\n", getpid(), xid, ts->status);
742+
MTM_TRACE("%d: MtmCommitTransaction %d status=%d\n", MyProcPid, xid, ts->status);
739743

740744
return ts->status != TRANSACTION_STATUS_ABORTED;
741745
}
@@ -768,7 +772,7 @@ MtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, Xi
768772
MtmSendNotificationMessage(ts);
769773
}
770774
MtmUnlock();
771-
MTM_TRACE("%d: MtmFinishTransaction %d CSN=%ld, status=%d\n", getpid(), xid, ts->csn, status);
775+
MTM_TRACE("%d: MtmFinishTransaction %d CSN=%ld, status=%d\n", MyProcPid, xid, ts->csn, status);
772776
}
773777
}
774778

@@ -777,7 +781,7 @@ MtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, Xi
777781
static void
778782
MtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
779783
{
780-
MTM_TRACE("%d: MtmSetTransactionStatus %u(%u) = %u, isDistributed=%d\n", getpid(), xid, dtmTx.xid, status, dtmTx.isDistributed);
784+
MTM_TRACE("%d: MtmSetTransactionStatus %u(%u) = %u, isDistributed=%d\n", MyProcPid, xid, dtmTx.xid, status, dtmTx.isDistributed);
781785
if (xid == dtmTx.xid && dtmTx.isDistributed && !dtmTx.isPrepared)
782786
{
783787
if (status == TRANSACTION_STATUS_ABORTED || !dtmTx.containsDML || dtm->status == MTM_RECOVERY)
@@ -1100,7 +1104,7 @@ void MtmReceiverStarted(int nodeId)
11001104
{
11011105
SpinLockAcquire(&dtm->spinlock);
11021106
if (!BIT_CHECK(dtm->pglogicalNodeMask, nodeId-1)) {
1103-
dtm->pglogicalNodeMask |= (int64)1 << (nodeId-1);
1107+
dtm->pglogicalNodeMask |= (nodemask_t)1 << (nodeId-1);
11041108
if (++dtm->nReceivers == dtm->nNodes-1) {
11051109
elog(WARNING, "All receivers are started, switch to normal mode");
11061110
Assert(dtm->status == MTM_CONNECTED);
@@ -1478,14 +1482,14 @@ MtmVoteForTransaction(MtmTransState* ts)
14781482
MtmSendNotificationMessage(ts);
14791483
}
14801484
}
1481-
MTM_TRACE("%d: Node %d waiting latch...\n", getpid(), MtmNodeId);
1485+
MTM_TRACE("%d: Node %d waiting latch...\n", MyProcPid, MtmNodeId);
14821486
while (!ts->done) {
14831487
MtmUnlock();
14841488
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1);
14851489
ResetLatch(&MyProc->procLatch);
14861490
MtmLock(LW_SHARED);
14871491
}
1488-
MTM_TRACE("%d: Node %d receives response...\n", getpid(), MtmNodeId);
1492+
MTM_TRACE("%d: Node %d receives response...\n", MyProcPid, MtmNodeId);
14891493
}
14901494

14911495
HTAB* MtmCreateHash(void)

contrib/mmts/pglogical_receiver.c

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <unistd.h>
1818
#include "postgres.h"
1919
#include "fmgr.h"
20+
#include "miscadmin.h"
2021
#include "libpq-fe.h"
2122
#include "pqexpbuffer.h"
2223
#include "access/xact.h"
@@ -38,7 +39,8 @@
3839
/* Allow load of this module in shared libs */
3940

4041
typedef struct ReceiverArgs {
41-
int receiver_node;
42+
int local_node;
43+
int remote_node;
4244
char* receiver_conn_string;
4345
char receiver_slot[16];
4446
} ReceiverArgs;
@@ -55,7 +57,7 @@ static bool receiver_sync_mode = false;
5557

5658
/* Worker name */
5759
static char *worker_name = "multimaster";
58-
char worker_proc[16];
60+
char worker_proc[BGW_MAXLEN];
5961

6062
/* Lastly written positions */
6163
static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
@@ -216,7 +218,7 @@ pglogical_receiver_main(Datum main_arg)
216218
pqsignal(SIGHUP, receiver_raw_sighup);
217219
pqsignal(SIGTERM, receiver_raw_sigterm);
218220

219-
sprintf(worker_proc, "mtm_recv_%d", getpid());
221+
sprintf(worker_proc, "mtm_pglogical_receiver_%d_%d", args->local_node, args->remote_node);
220222

221223
/* We're now ready to receive signals */
222224
BackgroundWorkerUnblockSignals();
@@ -229,7 +231,7 @@ pglogical_receiver_main(Datum main_arg)
229231
* Druing recovery we need to open only one replication slot from which node should receive all transactions.
230232
* Slots at other nodes should be removed
231233
*/
232-
mode = MtmReceiverSlotMode(args->receiver_node);
234+
mode = MtmReceiverSlotMode(args->remote_node);
233235

234236
/* Establish connection to remote server */
235237
conn = PQconnectdb(args->receiver_conn_string);
@@ -266,11 +268,18 @@ pglogical_receiver_main(Datum main_arg)
266268
PQclear(res);
267269
resetPQExpBuffer(query);
268270
}
271+
269272
/* Start logical replication at specified position */
273+
StartTransactionCommand();
270274
originId = replorigin_by_name(args->receiver_slot, true);
271275
if (originId != InvalidRepOriginId) {
272276
originStartPos = replorigin_get_progress(originId, false);
277+
elog(WARNING, "Restart logical receiver at position %lx from node %d", originStartPos, args->remote_node);
278+
} else {
279+
elog(WARNING, "Start logical receiver from node %d", args->remote_node);
273280
}
281+
CommitTransactionCommand();
282+
274283
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %u/%u (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d')",
275284
args->receiver_slot,
276285
(uint32) (originStartPos >> 32),
@@ -282,14 +291,14 @@ pglogical_receiver_main(Datum main_arg)
282291
if (PQresultStatus(res) != PGRES_COPY_BOTH)
283292
{
284293
PQclear(res);
285-
ereport(LOG, (errmsg("%s: Could not start logical replication",
286-
worker_proc)));
294+
ereport(WARNING, (errmsg("%s: Could not start logical replication",
295+
worker_proc)));
287296
proc_exit(1);
288297
}
289298
PQclear(res);
290299
resetPQExpBuffer(query);
291300

292-
MtmReceiverStarted(args->receiver_node);
301+
MtmReceiverStarted(args->remote_node);
293302
ByteBufferAlloc(&buf);
294303
ds = MtmGetState();
295304

@@ -576,10 +585,11 @@ int MtmStartReceivers(char* conns, int node_id)
576585
}
577586
ctx->receiver_conn_string = psprintf("replication=database %.*s", (int)(p - conn_str), conn_str);
578587
sprintf(ctx->receiver_slot, "mtm_slot_%d", node_id);
579-
ctx->receiver_node = node_id;
588+
ctx->local_node = node_id;
589+
ctx->remote_node = i;
580590

581591
/* Worker parameter and registration */
582-
snprintf(worker.bgw_name, BGW_MAXLEN, "mtm_worker_%d_%d", node_id, i);
592+
snprintf(worker.bgw_name, BGW_MAXLEN, "mtm_pglogical_receiver_%d_%d", node_id, i);
583593

584594
worker.bgw_main_arg = (Datum)ctx;
585595
RegisterBackgroundWorker(&worker);

contrib/mmts/tests/dtmbench.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ void initializeDatabase()
184184
printf("Creating database schema...\n");
185185
{
186186
nontransaction txn(conn);
187-
exec(txn, "drop extension if exists multimsater");
187+
exec(txn, "drop extension if exists multimaster");
188188
exec(txn, "create extension multimaster");
189189
exec(txn, "drop table if exists t");
190190
exec(txn, "create table t(u int primary key, v int)");

0 commit comments

Comments
 (0)