Skip to content

Commit c57e0d1

Browse files
committed
New DDL replication mechanism
1 parent 6b07b6b commit c57e0d1

File tree

9 files changed

+134
-51
lines changed

9 files changed

+134
-51
lines changed

contrib/mmts/multimaster--1.0.sql

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
22
\echo Use "CREATE EXTENSION multimaster" to load this file. \quit
33

4-
CREATE FUNCTION mtm_start_replication() RETURNS void
4+
CREATE FUNCTION mtm.start_replication() RETURNS void
55
AS 'MODULE_PATHNAME','mtm_start_replication'
66
LANGUAGE C;
77

8-
CREATE FUNCTION mtm_stop_replication() RETURNS void
8+
CREATE FUNCTION mtm.stop_replication() RETURNS void
99
AS 'MODULE_PATHNAME','mtm_stop_replication'
1010
LANGUAGE C;
1111

12-
CREATE FUNCTION mtm_drop_node(node integer, drop_slot bool default false) RETURNS void
12+
CREATE FUNCTION mtm.drop_node(node integer, drop_slot bool default false) RETURNS void
1313
AS 'MODULE_PATHNAME','mtm_drop_node'
1414
LANGUAGE C;
1515

16-
CREATE FUNCTION mtm_get_snapshot() RETURNS bigint
16+
CREATE FUNCTION mtm.get_snapshot() RETURNS bigint
1717
AS 'MODULE_PATHNAME','mtm_get_snapshot'
1818
LANGUAGE C;
1919

20+
CREATE TABLE IF NOT EXISTS mtm.ddl_log (issued timestamp with time zone not null, query text);

contrib/mmts/multimaster.c

Lines changed: 62 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@
4848
#include "replication/slot.h"
4949
#include "port/atomics.h"
5050
#include "tcop/utility.h"
51+
#include "nodes/makefuncs.h"
52+
#include "access/htup_details.h"
53+
#include "catalog/indexing.h"
5154

5255
#include "multimaster.h"
5356

@@ -863,7 +866,7 @@ mtm_drop_node(PG_FUNCTION_ARGS)
863866
dtm->nNodes -= 1;
864867
if (!IsTransactionBlock())
865868
{
866-
MtmBroadcastUtilityStmt(psprintf("select mtm_drop_node(%d,%s)", nodeId, dropSlot ? "true" : "false"), true);
869+
MtmBroadcastUtilityStmt(psprintf("select multimaster.drop_node(%d,%s)", nodeId, dropSlot ? "true" : "false"), true);
867870
}
868871
if (dropSlot)
869872
{
@@ -878,7 +881,7 @@ mtm_get_snapshot(PG_FUNCTION_ARGS)
878881
{
879882
PG_RETURN_INT64(dtmTx.snapshot);
880883
}
881-
884+
882885
/*
883886
* Execute statement with specified parameters and check its result
884887
*/
@@ -924,7 +927,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
924927
failedNode = i;
925928
do {
926929
PQfinish(conns[i]);
927-
} while (--i >= 0);
930+
} while (--i >= 0);
928931
elog(ERROR, "Failed to establish connection '%s' to node %d", conn_str, failedNode);
929932
}
930933
}
@@ -933,7 +936,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
933936
i += 1;
934937
}
935938
Assert(i == MtmNodes);
936-
939+
937940
for (i = 0; i < MtmNodes; i++)
938941
{
939942
if (conns[i])
@@ -970,7 +973,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
970973
failedNode = i;
971974
}
972975
}
973-
}
976+
}
974977
for (i = 0; i < MtmNodes; i++)
975978
{
976979
if (conns[i])
@@ -984,6 +987,48 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
984987
}
985988
}
986989

990+
static void MtmProcessDDLCommand(char const* queryString)
991+
{
992+
RangeVar *rv;
993+
Relation rel;
994+
TupleDesc tupDesc;
995+
HeapTuple tup;
996+
Datum values[Natts_mtm_ddl_log];
997+
bool nulls[Natts_mtm_ddl_log];
998+
TimestampTz ts = GetCurrentTimestamp();
999+
1000+
rv = makeRangeVar(MULTIMASTER_SCHEMA_NAME, MULTIMASTER_DDL_TABLE, -1);
1001+
rel = heap_openrv_extended(rv, RowExclusiveLock, true);
1002+
1003+
if (rel == NULL) {
1004+
return;
1005+
}
1006+
1007+
tupDesc = RelationGetDescr(rel);
1008+
1009+
/* Form a tuple. */
1010+
memset(nulls, false, sizeof(nulls));
1011+
1012+
values[Anum_mtm_ddl_log_issued - 1] = TimestampTzGetDatum(ts);
1013+
values[Anum_mtm_ddl_log_query - 1] = CStringGetTextDatum(queryString);
1014+
1015+
tup = heap_form_tuple(tupDesc, values, nulls);
1016+
1017+
/* Insert the tuple to the catalog. */
1018+
simple_heap_insert(rel, tup);
1019+
1020+
/* Update the indexes. */
1021+
CatalogUpdateIndexes(rel, tup);
1022+
1023+
/* Cleanup. */
1024+
heap_freetuple(tup);
1025+
heap_close(rel, RowExclusiveLock);
1026+
1027+
elog(WARNING, "Replicate command: '%s'", queryString);
1028+
1029+
dtmTx.containsDML = true;
1030+
}
1031+
9871032
static void MtmProcessUtility(Node *parsetree, const char *queryString,
9881033
ProcessUtilityContext context, ParamListInfo params,
9891034
DestReceiver *dest, char *completionTag)
@@ -1011,22 +1056,18 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
10111056
skipCommand = false;
10121057
break;
10131058
}
1014-
if (skipCommand || IsTransactionBlock()) {
1015-
if (PreviousProcessUtilityHook != NULL)
1016-
{
1017-
PreviousProcessUtilityHook(parsetree, queryString, context,
1018-
params, dest, completionTag);
1019-
}
1020-
else
1021-
{
1022-
standard_ProcessUtility(parsetree, queryString, context,
1023-
params, dest, completionTag);
1024-
}
1025-
if (!skipCommand) {
1026-
dtmTx.isDistributed = false;
1027-
}
1028-
} else {
1029-
MtmBroadcastUtilityStmt(queryString, false);
1059+
if (!skipCommand && !dtmTx.isReplicated) {
1060+
MtmProcessDDLCommand(queryString);
1061+
}
1062+
if (PreviousProcessUtilityHook != NULL)
1063+
{
1064+
PreviousProcessUtilityHook(parsetree, queryString, context,
1065+
params, dest, completionTag);
1066+
}
1067+
else
1068+
{
1069+
standard_ProcessUtility(parsetree, queryString, context,
1070+
params, dest, completionTag);
10301071
}
10311072
}
10321073

contrib/mmts/multimaster.control

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
comment = 'Multimaster'
22
default_version = '1.0'
33
module_pathname = '$libdir/multimaster'
4-
relocatable = true
4+
schema = mtm
5+
relocatable = false

contrib/mmts/multimaster.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
#include "bgwpool.h"
66

77
#define MTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
8-
#define MTM_TRACE(fmt, ...)
8+
#define MTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
99
#define MTM_TUPLE_TRACE(fmt, ...)
1010
/*
1111
#define MTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
@@ -14,7 +14,14 @@
1414

1515
#define BIT_SET(mask, bit) ((mask) & ((int64)1 << (bit)))
1616

17-
#define MULTIMASTER_NAME "mmts"
17+
#define MULTIMASTER_NAME "mtm"
18+
#define MULTIMASTER_SCHEMA_NAME "mtm"
19+
#define MULTIMASTER_DDL_TABLE "ddl_log"
20+
21+
#define Natts_mtm_ddl_log 2
22+
#define Anum_mtm_ddl_log_issued 1
23+
#define Anum_mtm_ddl_log_query 2
24+
1825

1926
typedef uint64 csn_t; /* commit serial number */
2027
#define INVALID_CSN ((csn_t)-1)

contrib/mmts/pglogical_apply.c

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -472,13 +472,14 @@ process_remote_commit(StringInfo s)
472472
static void
473473
process_remote_insert(StringInfo s, Relation rel)
474474
{
475-
EState *estate;
476-
TupleData new_tuple;
475+
EState *estate;
476+
TupleData new_tuple;
477477
TupleTableSlot *newslot;
478478
TupleTableSlot *oldslot;
479479
ResultRelInfo *relinfo;
480-
ScanKey *index_keys;
481-
int i;
480+
ScanKey *index_keys;
481+
char* relname = RelationGetRelationName(rel);
482+
int i;
482483

483484
estate = create_rel_estate(rel);
484485
newslot = ExecInitExtraTupleSlot(estate);
@@ -560,6 +561,18 @@ process_remote_insert(StringInfo s, Relation rel)
560561
FreeExecutorState(estate);
561562

562563
CommandCounterIncrement();
564+
565+
if (strcmp(relname, MULTIMASTER_DDL_TABLE) == 0) {
566+
char* ddl = TextDatumGetCString(new_tuple.values[Anum_mtm_ddl_log_query-1]);
567+
int rc;
568+
SPI_connect();
569+
rc = SPI_execute(ddl, false, 0);
570+
SPI_finish();
571+
if (rc != SPI_OK_UTILITY) {
572+
elog(ERROR, "Failed to execute utility statement %s", ddl);
573+
}
574+
}
575+
563576
}
564577

565578
static void

contrib/mmts/tests/dtmbench.cpp

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -180,28 +180,20 @@ void* writer(void* arg)
180180
void initializeDatabase()
181181
{
182182
connection conn(cfg.connections[0]);
183-
printf("creating extension\n");
184-
{
185-
nontransaction txn(conn);
186-
exec(txn, "drop extension if exists multimaster");
187-
exec(txn, "create extension multimaster");
188-
}
189-
printf("extension created\n");
190-
191-
printf("creating table t\n");
183+
time_t start = getCurrentTime();
184+
printf("Creating database schema...\n");
192185
{
193186
nontransaction txn(conn);
194187
exec(txn, "drop table if exists t");
195188
exec(txn, "create table t(u int primary key, v int)");
196189
}
197-
printf("table t created\n");
198-
printf("inserting stuff into t\n");
190+
printf("Populating data...\n");
199191
{
200192
work txn(conn);
201193
exec(txn, "insert into t (select generate_series(0,%d), %d)", cfg.nAccounts-1, 0);
202194
txn.commit();
203195
}
204-
printf("stuff inserted\n");
196+
printf("Initialization completed in %f seconds\n", (start - getCurrentTime())/100000.0);
205197
}
206198

207199
int main (int argc, char* argv[])

contrib/mmts/tests/perf.results

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,3 +117,24 @@ Bench started at Пн. февр. 15 17:26:11 MSK 2016
117117
astro5:{tps:96460.088384, transactions:1000000, selects:2000000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:200, update_percent:0, accounts:500000, iterations:5000, hosts:3}
118118
Bench finished at Пн. февр. 15 17:26:22 MSK 2016
119119
Bench started at Пн. февр. 15 17:26:41 MSK 2016
120+
Bench started at Пн. февр. 15 17:58:14 MSK 2016
121+
astro5:{tps:93430.358394, transactions:1250000, selects:2500000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:250, update_percent:0, accounts:500000, iterations:5000, hosts:3}
122+
Bench finished at Пн. февр. 15 17:58:28 MSK 2016
123+
Bench started at Пн. февр. 15 17:59:11 MSK 2016
124+
astro5:{tps:81893.902409, transactions:1500000, selects:3000000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:300, update_percent:0, accounts:500000, iterations:5000, hosts:3}
125+
Bench finished at Пн. февр. 15 17:59:29 MSK 2016
126+
Bench started at Пн. февр. 15 17:59:59 MSK 2016
127+
astro5:{tps:105707.597142, transactions:1000000, selects:2000000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:200, update_percent:0, accounts:500000, iterations:5000, hosts:3}
128+
Bench finished at Пн. февр. 15 18:00:09 MSK 2016
129+
Bench started at Пн. февр. 15 18:00:54 MSK 2016
130+
astro5:{tps:92668.464039, transactions:1250000, selects:2500000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:250, update_percent:0, accounts:500000, iterations:5000, hosts:3}
131+
Bench finished at Пн. февр. 15 18:01:08 MSK 2016
132+
Bench started at Пн. февр. 15 18:06:22 MSK 2016
133+
astro5:{tps:121069.442298, transactions:125000000, selects:250000000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:250, update_percent:0, accounts:500000, iterations:500000, hosts:3}
134+
Bench finished at Пн. февр. 15 18:23:35 MSK 2016
135+
Bench started at Пн. февр. 15 18:24:11 MSK 2016
136+
astro5:{tps:122202.228254, transactions:100000000, selects:200000000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:200, update_percent:0, accounts:500000, iterations:500000, hosts:3}
137+
Bench finished at Пн. февр. 15 18:37:50 MSK 2016
138+
Bench started at Пн. февр. 15 18:44:02 MSK 2016
139+
astro5:{tps:121774.204222, transactions:100000000, selects:200000000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:200, update_percent:0, accounts:500000, iterations:500000, hosts:3}
140+
Bench finished at Пн. февр. 15 18:57:44 MSK 2016

contrib/mmts/tests/perf.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
- name: run transfers
4747
shell: >
4848
~/pg_cluster/install/bin/dtmbench {{connections}}
49-
-w {{ nconns }} -r 0 -n 5000 -a 500000 -p {{ up }} |
49+
-w {{ nconns }} -r 0 -n 500000 -a 500000 -p {{ up }} |
5050
tee -a perf.results |
5151
sed "s/^/`hostname`:/"
5252
register: transfers_result

contrib/mmts/tests/reinit-mm.sh

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,15 @@ sep=""
1010
for ((i=1;i<=n_nodes;i++))
1111
do
1212
port=$((5431+i))
13-
conn_str="$conn_str${sep}dbname=postgres host=127.0.0.1 port=$port sslmode=disable"
13+
conn_str="$conn_str${sep}dbname=postgres host=localhost port=$port sslmode=disable"
1414
sep=","
1515
initdb node$i
1616
done
1717

18-
echo Start DTM
19-
~/postgres_cluster/contrib/arbiter/bin/arbiter -r 0.0.0.0:5431 -i 0 -d dtm 2> dtm.log &
20-
sleep 2
18+
#echo Start DTM
19+
#~/postgres_cluster/contrib/arbiter/bin/arbiter -r 0.0.0.0:5431 -i 0 -d dtm 2> dtm.log &
20+
#sleep 2
21+
echo "Starting nodes..."
2122

2223
echo Start nodes
2324
for ((i=1;i<=n_nodes;i++))
@@ -31,7 +32,13 @@ do
3132
done
3233

3334
sleep 5
34-
echo Initialize database schema
35-
psql postgres -f init.sql
35+
echo "Create multimaster extension..."
36+
37+
for ((i=1;i<=n_nodes;i++))
38+
do
39+
port=$((5431+i))
40+
psql postgres -p $port -c "create extension multimaster"
41+
done
42+
3643

3744
echo Done

0 commit comments

Comments
 (0)