Skip to content

Commit de5e166

Browse files
committed
logical messages for UtilityStmts; GUC context
1 parent 96e2407 commit de5e166

File tree

5 files changed

+115
-81
lines changed

5 files changed

+115
-81
lines changed

contrib/mmts/multimaster.c

Lines changed: 52 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
#include "replication/walsender.h"
5050
#include "replication/walsender_private.h"
5151
#include "replication/slot.h"
52+
#include "replication/message.h"
5253
#include "port/atomics.h"
5354
#include "tcop/utility.h"
5455
#include "nodes/makefuncs.h"
@@ -235,8 +236,8 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
235236
ProcessUtilityContext context, ParamListInfo params,
236237
DestReceiver *dest, char *completionTag);
237238

238-
// static StringInfo MtmGUCBuffer;
239-
// static bool MtmGUCBufferAllocated = false;
239+
static StringInfo MtmGUCBuffer;
240+
static bool MtmGUCBufferAllocated = false;
240241

241242
/*
242243
* -------------------------------------------
@@ -2979,53 +2980,55 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
29792980
}
29802981
}
29812982

2982-
static bool MtmProcessDDLCommand(char const* queryString)
2983-
{
2984-
RangeVar *rv;
2985-
Relation rel;
2986-
TupleDesc tupDesc;
2987-
HeapTuple tup;
2988-
Datum values[Natts_mtm_ddl_log];
2989-
bool nulls[Natts_mtm_ddl_log];
2990-
TimestampTz ts = GetCurrentTimestamp();
2991-
2992-
rv = makeRangeVar("public", MULTIMASTER_DDL_TABLE, -1);
2993-
rel = heap_openrv_extended(rv, RowExclusiveLock, true);
2983+
static void MtmGUCBufferAppend(const char *gucQueryString){
29942984

2995-
if (rel == NULL) {
2996-
if (!MtmIsBroadcast()) {
2997-
MtmBroadcastUtilityStmt(queryString, false);
2998-
return true;
2999-
}
3000-
return false;
2985+
if (!MtmGUCBufferAllocated)
2986+
{
2987+
MemoryContext oldcontext;
2988+
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
2989+
MtmGUCBuffer = makeStringInfo();
2990+
MemoryContextSwitchTo(oldcontext);
2991+
MtmGUCBufferAllocated = true;
2992+
appendStringInfoString(MtmGUCBuffer, "RESET SESSION AUTHORIZATION; reset all;");
30012993
}
3002-
3003-
tupDesc = RelationGetDescr(rel);
30042994

3005-
/* Form a tuple. */
3006-
memset(nulls, false, sizeof(nulls));
3007-
3008-
values[Anum_mtm_ddl_log_issued - 1] = TimestampTzGetDatum(ts);
3009-
values[Anum_mtm_ddl_log_query - 1] = CStringGetTextDatum(queryString);
2995+
appendStringInfoString(MtmGUCBuffer, gucQueryString);
2996+
/* sometimes there is no ';' char at the end. */
2997+
// appendStringInfoString(MtmGUCBuffer, ";");
2998+
}
30102999

3011-
tup = heap_form_tuple(tupDesc, values, nulls);
3000+
static char * MtmGUCBufferGet(void){
3001+
if (!MtmGUCBufferAllocated)
3002+
MtmGUCBufferAppend("");
3003+
return MtmGUCBuffer->data;
3004+
}
30123005

3013-
/* Insert the tuple to the catalog. */
3014-
simple_heap_insert(rel, tup);
3006+
static bool MtmProcessDDLCommand(char const* queryString)
3007+
{
3008+
char *queryWithContext;
3009+
char *gucContext;
30153010

3016-
/* Update the indexes. */
3017-
CatalogUpdateIndexes(rel, tup);
3011+
/* Append global GUC to utility stmt. */
3012+
gucContext = MtmGUCBufferGet();
3013+
if (gucContext)
3014+
{
3015+
queryWithContext = palloc(strlen(gucContext) + strlen(queryString) + 1);
3016+
strcpy(queryWithContext, gucContext);
3017+
strcat(queryWithContext, queryString);
3018+
}
3019+
else
3020+
{
3021+
queryWithContext = (char *) queryString;
3022+
}
30183023

3019-
/* Cleanup. */
3020-
heap_freetuple(tup);
3021-
heap_close(rel, RowExclusiveLock);
3024+
MTM_LOG1("Sending utility: %s", queryWithContext);
3025+
LogLogicalMessage("MTM:GUC", queryWithContext, strlen(queryWithContext), true);
30223026

30233027
MtmTx.containsDML = true;
30243028
return false;
30253029
}
30263030

30273031

3028-
30293032
/*
30303033
* Genenerate global transaction identifier for two-pahse commit.
30313034
* It should be unique for all nodes
@@ -3129,43 +3132,28 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
31293132
DiscardStmt *stmt = (DiscardStmt *) parsetree;
31303133
skipCommand = stmt->target == DISCARD_TEMP;
31313134

3132-
// skipCommand = true;
3133-
3134-
// if (MtmGUCBufferAllocated)
3135-
// {
3136-
// // XXX: move allocation somewhere to backend startup and check
3137-
// // where buffer is empty in send routines.
3138-
// MtmGUCBufferAllocated = false;
3139-
// pfree(MtmGUCBuffer);
3140-
// }
3141-
3135+
if (!IsTransactionBlock())
3136+
{
3137+
skipCommand = true;
3138+
MtmGUCBufferAppend(queryString);
3139+
}
31423140
}
31433141
break;
31443142
case T_VariableSetStmt:
31453143
{
31463144
VariableSetStmt *stmt = (VariableSetStmt *) parsetree;
31473145

3148-
skipCommand = true;
3146+
// skipCommand = true;
31493147

31503148
/* Prevent SET TRANSACTION from replication */
31513149
if (stmt->kind == VAR_SET_MULTI)
3152-
// break;
31533150
skipCommand = true;
31543151

3155-
// if (!MtmGUCBufferAllocated)
3156-
// {
3157-
// MemoryContext oldcontext;
3158-
3159-
// oldcontext = MemoryContextSwitchTo(TopMemoryContext);
3160-
// MtmGUCBuffer = makeStringInfo();
3161-
// MemoryContextSwitchTo(oldcontext);
3162-
// MtmGUCBufferAllocated = true;
3163-
// }
3164-
3165-
// appendStringInfoString(MtmGUCBuffer, queryString);
3166-
3167-
// sometimes there is no ';' char at the end.
3168-
// appendStringInfoString(MtmGUCBuffer, ";");
3152+
if (!IsTransactionBlock())
3153+
{
3154+
skipCommand = true;
3155+
MtmGUCBufferAppend(queryString);
3156+
}
31693157
}
31703158
break;
31713159
case T_CreateTableAsStmt:
@@ -3191,7 +3179,8 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
31913179

31923180
viewParse = parse_analyze((Node *) copyObject(stmt->query),
31933181
queryString, NULL, 0);
3194-
skipCommand = isQueryUsingTempRelation(viewParse);
3182+
skipCommand = isQueryUsingTempRelation(viewParse) ||
3183+
stmt->view->relpersistence == RELPERSISTENCE_TEMP;
31953184
// ||
31963185
// (stmt->relation->schemaname && strcmp(stmt->relation->schemaname, "pg_temp") == 0);
31973186
}

contrib/mmts/pglogical_apply.c

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ static void UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot);
7070
static void UserTableUpdateIndexes(EState *estate, TupleTableSlot *slot);
7171

7272
static void process_remote_begin(StringInfo s);
73+
static void process_remote_message(StringInfo s);
7374
static void process_remote_commit(StringInfo s);
7475
static void process_remote_insert(StringInfo s, Relation rel);
7576
static void process_remote_update(StringInfo s, Relation rel);
@@ -338,7 +339,31 @@ process_remote_begin(StringInfo s)
338339
StartTransactionCommand();
339340
MtmJoinTransaction(&gtid, snapshot);
340341

341-
MTM_LOG3("REMOTE begin node=%d xid=%d snapshot=%ld", gtid.node, gtid.xid, snapshot);
342+
MTM_LOG1("REMOTE begin node=%d xid=%d snapshot=%ld", gtid.node, gtid.xid, snapshot);
343+
}
344+
345+
static void
346+
process_remote_message(StringInfo s)
347+
{
348+
const char *stmt;
349+
int rc;
350+
351+
stmt = pq_getmsgstring(s);
352+
MTM_LOG1("utility: %s", stmt);
353+
MTM_LOG3("%d: Execute utility statement %s", MyProcPid, stmt);
354+
355+
SPI_connect();
356+
rc = SPI_execute(stmt, false, 0);
357+
SPI_finish();
358+
if (rc < 0)
359+
elog(ERROR, "Failed to execute utility statement %s", stmt);
360+
361+
//XXX: create messages for tables localization too.
362+
// if (strcmp(relname, MULTIMASTER_LOCAL_TABLES_TABLE) == 0) {
363+
// char* schema = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_schema-1]);
364+
// char* name = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_name-1]);
365+
// MtmMakeTableLocal(schema, name);
366+
// }
342367
}
343368

344369
static void
@@ -610,7 +635,6 @@ process_remote_insert(StringInfo s, Relation rel)
610635
TupleTableSlot *oldslot;
611636
ResultRelInfo *relinfo;
612637
ScanKey *index_keys;
613-
char* relname = RelationGetRelationName(rel);
614638
int i;
615639

616640
estate = create_rel_estate(rel);
@@ -693,22 +717,6 @@ process_remote_insert(StringInfo s, Relation rel)
693717
FreeExecutorState(estate);
694718

695719
CommandCounterIncrement();
696-
697-
if (strcmp(relname, MULTIMASTER_DDL_TABLE) == 0) {
698-
char* ddl = TextDatumGetCString(new_tuple.values[Anum_mtm_ddl_log_query-1]);
699-
int rc;
700-
SPI_connect();
701-
MTM_LOG3("%d: Execute utility statement %s", MyProcPid, ddl);
702-
rc = SPI_execute(ddl, false, 0);
703-
SPI_finish();
704-
if (rc < 0)
705-
elog(ERROR, "Failed to execute utility statement %s", ddl);
706-
} else if (strcmp(relname, MULTIMASTER_LOCAL_TABLES_TABLE) == 0) {
707-
char* schema = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_schema-1]);
708-
char* name = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_name-1]);
709-
MtmMakeTableLocal(schema, name);
710-
}
711-
712720
}
713721

714722
static void
@@ -987,6 +995,11 @@ void MtmExecutor(int id, void* work, size_t size)
987995
s.len = save_len;
988996
continue;
989997
}
998+
case 'G':
999+
{
1000+
process_remote_message(&s);
1001+
continue;
1002+
}
9901003
default:
9911004
elog(ERROR, "unknown action of type %c", action);
9921005
}

contrib/mmts/pglogical_output.c

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
#include "replication/output_plugin.h"
3535
#include "replication/logical.h"
36+
#include "replication/message.h"
3637
#include "replication/origin.h"
3738

3839
#include "utils/builtins.h"
@@ -64,6 +65,11 @@ static void pg_decode_change(LogicalDecodingContext *ctx,
6465
static bool pg_decode_origin_filter(LogicalDecodingContext *ctx,
6566
RepOriginId origin_id);
6667

68+
static void pg_decode_message(LogicalDecodingContext *ctx,
69+
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
70+
bool transactional, const char *prefix,
71+
Size sz, const char *message);
72+
6773
static void send_startup_message(LogicalDecodingContext *ctx,
6874
PGLogicalOutputData *data, bool last_message);
6975

@@ -81,6 +87,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
8187
cb->commit_cb = pg_decode_commit_txn;
8288
cb->filter_by_origin_cb = pg_decode_origin_filter;
8389
cb->shutdown_cb = pg_decode_shutdown;
90+
cb->message_cb = pg_decode_message;
8491
}
8592

8693
static bool
@@ -499,6 +506,18 @@ pg_decode_origin_filter(LogicalDecodingContext *ctx,
499506
return false;
500507
}
501508

509+
static void
510+
pg_decode_message(LogicalDecodingContext *ctx,
511+
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
512+
const char *prefix, Size sz, const char *message)
513+
{
514+
PGLogicalOutputData* data = (PGLogicalOutputData*)ctx->output_plugin_private;
515+
516+
OutputPluginPrepareWrite(ctx, true);
517+
data->api->write_message(ctx->out, prefix, sz, message);
518+
OutputPluginWrite(ctx, true);
519+
}
520+
502521
static void
503522
send_startup_message(LogicalDecodingContext *ctx,
504523
PGLogicalOutputData *data, bool last_message)

contrib/mmts/pglogical_proto.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,15 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
119119
}
120120
}
121121

122+
static void
123+
pglogical_write_message(StringInfo out,
124+
const char *prefix, Size sz, const char *message)
125+
{
126+
pq_sendbyte(out, 'G');
127+
pq_sendbytes(out, message, sz);
128+
pq_sendbyte(out, '\0');
129+
}
130+
122131
/*
123132
* Write COMMIT to the output stream.
124133
*/
@@ -429,6 +438,7 @@ pglogical_init_api(PGLogicalProtoType typ)
429438
MTM_LOG1("%d: PRGLOGICAL init API for slot %s node %d", MyProcPid, MyReplicationSlot->data.name.data, MtmReplicationNodeId);
430439
res->write_rel = pglogical_write_rel;
431440
res->write_begin = pglogical_write_begin;
441+
res->write_message = pglogical_write_message;
432442
res->write_commit = pglogical_write_commit;
433443
res->write_insert = pglogical_write_insert;
434444
res->write_update = pglogical_write_update;

contrib/mmts/pglogical_proto.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ typedef void (*pglogical_write_rel_fn)(StringInfo out, struct PGLogicalOutputDat
2121

2222
typedef void (*pglogical_write_begin_fn)(StringInfo out, struct PGLogicalOutputData *data,
2323
ReorderBufferTXN *txn);
24+
typedef void (*pglogical_write_message_fn)(StringInfo out,
25+
const char *prefix, Size sz, const char *message);
2426
typedef void (*pglogical_write_commit_fn)(StringInfo out, struct PGLogicalOutputData *data,
2527
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
2628

@@ -43,6 +45,7 @@ typedef struct PGLogicalProtoAPI
4345
{
4446
pglogical_write_rel_fn write_rel;
4547
pglogical_write_begin_fn write_begin;
48+
pglogical_write_message_fn write_message;
4649
pglogical_write_commit_fn write_commit;
4750
pglogical_write_origin_fn write_origin;
4851
pglogical_write_insert_fn write_insert;

0 commit comments

Comments
 (0)