Skip to content

Commit 704c65f

Browse files
committed
Merge branch 'master' of github.com:postgrespro/postgres_cluster
2 parents 9b91909 + 08994d4 commit 704c65f

File tree

7 files changed

+116
-33
lines changed

7 files changed

+116
-33
lines changed

README

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
PostgreSQL Database Management System
1+
pPostgreSQL Database Management System
22
=====================================
33

44
This directory contains the source code distribution of the PostgreSQL

contrib/mmts/bgwpool.c

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@ typedef struct
1616
int id;
1717
} BgwPoolExecutorCtx;
1818

19-
size_t n_snapshots;
20-
size_t n_active;
21-
2219
static void BgwPoolMainLoop(Datum arg)
2320
{
2421
BgwPoolExecutorCtx* ctx = (BgwPoolExecutorCtx*)arg;
@@ -36,7 +33,8 @@ static void BgwPoolMainLoop(Datum arg)
3633
size = *(int*)&pool->queue[pool->head];
3734
Assert(size < pool->size);
3835
work = malloc(size);
39-
pool->active -= 1;
36+
pool->pending -= 1;
37+
pool->active += 1;
4038
if (pool->head + size + 4 > pool->size) {
4139
memcpy(work, pool->queue, size);
4240
pool->head = INTALIGN(size);
@@ -54,6 +52,9 @@ static void BgwPoolMainLoop(Datum arg)
5452
SpinLockRelease(&pool->lock);
5553
pool->executor(id, work, size);
5654
free(work);
55+
SpinLockAcquire(&pool->lock);
56+
pool->active -= 1;
57+
SpinLockRelease(&pool->lock);
5758
}
5859
}
5960

@@ -71,6 +72,7 @@ void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, si
7172
pool->tail = 0;
7273
pool->size = queueSize;
7374
pool->active = 0;
75+
pool->pending = 0;
7476
strcpy(pool->dbname, dbname);
7577
}
7678

@@ -126,9 +128,7 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
126128
PGSemaphoreLock(&pool->overflow);
127129
SpinLockAcquire(&pool->lock);
128130
} else {
129-
pool->active += 1;
130-
n_snapshots += 1;
131-
n_active += pool->active;
131+
pool->pending += 1;
132132
*(int*)&pool->queue[pool->tail] = size;
133133
if (pool->size - pool->tail >= size + 4) {
134134
memcpy(&pool->queue[pool->tail+4], work, size);

contrib/mmts/bgwpool.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ typedef struct
2020
size_t tail;
2121
size_t size;
2222
size_t active;
23+
size_t pending;
2324
bool producerBlocked;
2425
char dbname[MAX_DBNAME_LEN];
2526
char* queue;

contrib/mmts/multimaster--1.0.sql

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,22 @@ AS 'MODULE_PATHNAME','mtm_get_snapshot'
2424
LANGUAGE C;
2525

2626

27-
CREATE TYPE mtm.node_state AS (id integer, disabled bool, disconnected bool, catchUp bool, slotLag bigint, avgTransDelay bigint, lastStatusChange timestamp, connStr text);
27+
CREATE TYPE mtm.node_state AS ("id" integer, "disabled" bool, "disconnected" bool, "catchUp" bool, "slotLag" bigint, "avgTransDelay" bigint, "lastStatusChange" timestamp, "connStr" text);
2828

2929
CREATE FUNCTION mtm.get_nodes_state() RETURNS SETOF mtm.node_state
3030
AS 'MODULE_PATHNAME','mtm_get_nodes_state'
3131
LANGUAGE C;
3232

33-
CREATE TYPE mtm.cluster_state AS (status text, disabledNodeMask bigint, disconnectedNodeMask bigint, catchUpNodeMask bigint, nNodes integer, nActiveQueries integer, queueSize bigint, transCount bigint, timeShift bigint, recoverySlot integer);
33+
CREATE TYPE mtm.cluster_state AS ("status" text, "disabledNodeMask" bigint, "disconnectedNodeMask" bigint, "catchUpNodeMask" bigint, "nNodes" integer, "nActiveQueries" integer, "nPendingQueries" integer, "queueSize" bigint, "transCount" bigint, "timeShift" bigint, "recoverySlot" integer);
3434

3535
CREATE FUNCTION mtm.get_cluster_state() RETURNS mtm.cluster_state
3636
AS 'MODULE_PATHNAME','mtm_get_cluster_state'
3737
LANGUAGE C;
3838

39+
CREATE FUNCTION mtm.get_cluster_info() RETURNS SETOF mtm.cluster_state
40+
AS 'MODULE_PATHNAME','mtm_get_cluster_info'
41+
LANGUAGE C;
42+
3943
CREATE FUNCTION mtm.make_table_local(relation regclass) RETURNS void
4044
AS 'MODULE_PATHNAME','mtm_make_table_local'
4145
LANGUAGE C;

contrib/mmts/multimaster.c

Lines changed: 93 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ PG_FUNCTION_INFO_V1(mtm_recover_node);
108108
PG_FUNCTION_INFO_V1(mtm_get_snapshot);
109109
PG_FUNCTION_INFO_V1(mtm_get_nodes_state);
110110
PG_FUNCTION_INFO_V1(mtm_get_cluster_state);
111+
PG_FUNCTION_INFO_V1(mtm_get_cluster_info);
111112
PG_FUNCTION_INFO_V1(mtm_make_table_local);
112113
PG_FUNCTION_INFO_V1(mtm_dump_lock_graph);
113114

@@ -166,7 +167,8 @@ char const* const MtmNodeStatusMnem[] =
166167
"Connected",
167168
"Online",
168169
"Recovery",
169-
"InMinor"
170+
"InMinor",
171+
"OutOfService"
170172
};
171173

172174
bool MtmDoReplication;
@@ -1014,6 +1016,26 @@ void MtmAbortTransaction(MtmTransState* ts)
10141016
* -------------------------------------------
10151017
*/
10161018

1019+
void MtmHandleApplyError(void)
1020+
{
1021+
ErrorData *edata = CopyErrorData();
1022+
switch (edata->sqlerrcode) {
1023+
case ERRCODE_DISK_FULL:
1024+
case ERRCODE_INSUFFICIENT_RESOURCES:
1025+
case ERRCODE_IO_ERROR:
1026+
case ERRCODE_DATA_CORRUPTED:
1027+
case ERRCODE_INDEX_CORRUPTED:
1028+
case ERRCODE_SYSTEM_ERROR:
1029+
case ERRCODE_INTERNAL_ERROR:
1030+
case ERRCODE_OUT_OF_MEMORY:
1031+
elog(WARNING, "Node is excluded from cluster because of non-recoverable error %d", edata->sqlerrcode);
1032+
MtmSwitchClusterMode(MTM_OUT_OF_SERVICE);
1033+
kill(PostmasterPid, SIGQUIT);
1034+
break;
1035+
}
1036+
}
1037+
1038+
10171039
void MtmRecoveryCompleted(void)
10181040
{
10191041
MTM_LOG1("Recovery of node %d is completed", MtmNodeId);
@@ -1609,7 +1631,7 @@ _PG_init(void)
16091631
"Minamal amount of time (milliseconds) to wait 2PC confirmation from all nodes",
16101632
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)",
16111633
&Mtm2PCMinTimeout,
1612-
10000,
1634+
100000, /* 100 seconds */
16131635
0,
16141636
INT_MAX,
16151637
PGC_BACKEND,
@@ -1624,7 +1646,7 @@ _PG_init(void)
16241646
"Percent of prepare time for maximal time of second phase of two-pahse commit",
16251647
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)",
16261648
&Mtm2PCPrepareRatio,
1627-
100,
1649+
1000, /* 10 times */
16281650
0,
16291651
INT_MAX,
16301652
PGC_BACKEND,
@@ -2178,10 +2200,9 @@ mtm_get_snapshot(PG_FUNCTION_ARGS)
21782200
typedef struct
21792201
{
21802202
int nodeId;
2181-
char* connStrPtr;
21822203
TupleDesc desc;
2183-
Datum values[8];
2184-
bool nulls[8];
2204+
Datum values[Natts_mtm_nodes_state];
2205+
bool nulls[Natts_mtm_nodes_state];
21852206
} MtmGetNodeStateCtx;
21862207

21872208
Datum
@@ -2190,7 +2211,6 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
21902211
FuncCallContext* funcctx;
21912212
MtmGetNodeStateCtx* usrfctx;
21922213
MemoryContext oldcontext;
2193-
char* p;
21942214
int64 lag;
21952215
bool is_first_call = SRF_IS_FIRSTCALL();
21962216

@@ -2200,7 +2220,6 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
22002220
usrfctx = (MtmGetNodeStateCtx*)palloc(sizeof(MtmGetNodeStateCtx));
22012221
get_call_result_type(fcinfo, NULL, &usrfctx->desc);
22022222
usrfctx->nodeId = 1;
2203-
usrfctx->connStrPtr = pstrdup(MtmConnStrs);
22042223
memset(usrfctx->nulls, false, sizeof(usrfctx->nulls));
22052224
funcctx->user_fctx = usrfctx;
22062225
MemoryContextSwitchTo(oldcontext);
@@ -2219,23 +2238,19 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
22192238
usrfctx->nulls[4] = lag < 0;
22202239
usrfctx->values[5] = Int64GetDatum(Mtm->transCount ? Mtm->nodes[usrfctx->nodeId-1].transDelay/Mtm->transCount : 0);
22212240
usrfctx->values[6] = TimestampTzGetDatum(time_t_to_timestamptz(Mtm->nodes[usrfctx->nodeId-1].lastStatusChangeTime));
2222-
p = strchr(usrfctx->connStrPtr, ',');
2223-
if (p != NULL) {
2224-
*p++ = '\0';
2225-
}
2226-
usrfctx->values[7] = CStringGetTextDatum(usrfctx->connStrPtr);
2227-
usrfctx->connStrPtr = p;
2241+
usrfctx->values[7] = CStringGetTextDatum(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
22282242
usrfctx->nodeId += 1;
22292243

22302244
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(heap_form_tuple(usrfctx->desc, usrfctx->values, usrfctx->nulls)));
22312245
}
22322246

2247+
22332248
Datum
22342249
mtm_get_cluster_state(PG_FUNCTION_ARGS)
22352250
{
22362251
TupleDesc desc;
2237-
Datum values[10];
2238-
bool nulls[10] = {false};
2252+
Datum values[Natts_mtm_cluster_state];
2253+
bool nulls[Natts_mtm_cluster_state] = {false};
22392254
get_call_result_type(fcinfo, NULL, &desc);
22402255

22412256
values[0] = CStringGetTextDatum(MtmNodeStatusMnem[Mtm->status]);
@@ -2244,16 +2259,73 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
22442259
values[3] = Int64GetDatum(Mtm->nodeLockerMask);
22452260
values[4] = Int32GetDatum(Mtm->nNodes);
22462261
values[5] = Int32GetDatum((int)Mtm->pool.active);
2247-
values[6] = Int64GetDatum(BgwPoolGetQueueSize(&Mtm->pool));
2248-
values[7] = Int64GetDatum(Mtm->transCount);
2249-
values[8] = Int64GetDatum(Mtm->timeShift);
2250-
values[9] = Int32GetDatum(Mtm->recoverySlot);
2251-
nulls[9] = Mtm->recoverySlot == 0;
2262+
values[6] = Int32GetDatum((int)Mtm->pool.pending);
2263+
values[7] = Int64GetDatum(BgwPoolGetQueueSize(&Mtm->pool));
2264+
values[8] = Int64GetDatum(Mtm->transCount);
2265+
values[9] = Int64GetDatum(Mtm->timeShift);
2266+
values[10] = Int32GetDatum(Mtm->recoverySlot);
22522267

22532268
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(desc, values, nulls)));
22542269
}
22552270

22562271

2272+
typedef struct
2273+
{
2274+
int nodeId;
2275+
} MtmGetClusterInfoCtx;
2276+
2277+
2278+
Datum
2279+
mtm_get_cluster_info(PG_FUNCTION_ARGS)
2280+
{
2281+
2282+
FuncCallContext* funcctx;
2283+
MtmGetClusterInfoCtx* usrfctx;
2284+
MemoryContext oldcontext;
2285+
TupleDesc desc;
2286+
bool is_first_call = SRF_IS_FIRSTCALL();
2287+
int i;
2288+
PGconn* conn;
2289+
PGresult *result;
2290+
char* values[Natts_mtm_cluster_state];
2291+
HeapTuple tuple;
2292+
2293+
if (is_first_call) {
2294+
funcctx = SRF_FIRSTCALL_INIT();
2295+
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
2296+
usrfctx = (MtmGetClusterInfoCtx*)palloc(sizeof(MtmGetNodeStateCtx));
2297+
get_call_result_type(fcinfo, NULL, &desc);
2298+
funcctx->attinmeta = TupleDescGetAttInMetadata(desc);
2299+
usrfctx->nodeId = 1;
2300+
funcctx->user_fctx = usrfctx;
2301+
MemoryContextSwitchTo(oldcontext);
2302+
}
2303+
funcctx = SRF_PERCALL_SETUP();
2304+
usrfctx = (MtmGetClusterInfoCtx*)funcctx->user_fctx;
2305+
if (usrfctx->nodeId > MtmNodes) {
2306+
SRF_RETURN_DONE(funcctx);
2307+
}
2308+
conn = PQconnectdb(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
2309+
if (PQstatus(conn) != CONNECTION_OK) {
2310+
elog(ERROR, "Failed to establish connection '%s' to node %d", Mtm->nodes[usrfctx->nodeId-1].con.connStr, usrfctx->nodeId);
2311+
}
2312+
result = PQexec(conn, "select * from mtm.get_cluster_state()");
2313+
2314+
if (PQresultStatus(result) != PGRES_TUPLES_OK || PQntuples(result) != 1) {
2315+
elog(ERROR, "Failed to receive data from %d", usrfctx->nodeId);
2316+
}
2317+
2318+
for (i = 0; i < Natts_mtm_cluster_state; i++) {
2319+
values[i] = PQgetvalue(result, 0, i);
2320+
}
2321+
tuple = BuildTupleFromCStrings(funcctx->attinmeta, values);
2322+
PQclear(result);
2323+
PQfinish(conn);
2324+
usrfctx->nodeId += 1;
2325+
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
2326+
}
2327+
2328+
22572329
Datum mtm_make_table_local(PG_FUNCTION_ARGS)
22582330
{
22592331
Oid reloid = PG_GETARG_OID(1);

contrib/mmts/multimaster.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@
5656
#define Anum_mtm_local_tables_rel_schema 1
5757
#define Anum_mtm_local_tables_rel_name 2
5858

59+
#define Natts_mtm_cluster_state 11
60+
#define Natts_mtm_nodes_state 8
61+
5962
typedef uint64 csn_t; /* commit serial number */
6063
#define INVALID_CSN ((csn_t)-1)
6164

@@ -94,11 +97,12 @@ typedef enum
9497
typedef enum
9598
{
9699
MTM_INITIALIZATION, /* Initial status */
97-
MTM_OFFLINE, /* Node is out of quorum */
100+
MTM_OFFLINE, /* Node is excluded from cluster */
98101
MTM_CONNECTED, /* Arbiter is established connections with other nodes */
99102
MTM_ONLINE, /* Ready to receive client's queries */
100103
MTM_RECOVERY, /* Node is in recovery process */
101-
MTM_IN_MINORITY /* Node is out of quorum */
104+
MTM_IN_MINORITY, /* Node is out of quorum */
105+
MTM_OUT_OF_SERVICE /* Node is not avaiable to to critical, non-recoverable error */
102106
} MtmNodeStatus;
103107

104108
typedef enum
@@ -235,5 +239,6 @@ extern void MtmCheckQuorum(void);
235239
extern bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN);
236240
extern void MtmRecoveryCompleted(void);
237241
extern void MtmMakeTableLocal(char* schema, char* name);
242+
extern void MtmHandleApplyError(void);
238243

239244
#endif

contrib/mmts/pglogical_apply.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -951,6 +951,7 @@ void MtmExecutor(int id, void* work, size_t size)
951951
}
952952
PG_CATCH();
953953
{
954+
MtmHandleApplyError();
954955
EmitErrorReport();
955956
FlushErrorState();
956957
MTM_LOG2("%d: REMOTE begin abort transaction %d", MyProcPid, MtmGetCurrentTransactionId());

0 commit comments

Comments
 (0)