Skip to content

Commit 63ec9b9

Browse files
committed
Fix a silly deadlock in DTMD/Raft. Add 'hello' message to the protocol.
1 parent 1007a22 commit 63ec9b9

File tree

8 files changed

+157
-96
lines changed

8 files changed

+157
-96
lines changed

contrib/pg_dtm/README

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ The format of all commands:
4343

4444
The commands:
4545

46+
'h': hello()
47+
The first message.
48+
49+
The arbiter replies with:
50+
[RES_OK] if ready
51+
[RES_FAILED] (or disconnection) if not ready
52+
4653
'r': reserve(minxid, minsize)
4754
Claims a sequence ≥ minsize of xids ≥ minxid for local usage. This will
4855
prevent the arbiter from using those values for global transactions.
@@ -88,7 +95,7 @@ The commands:
8895

8996
The reply and 'wait' logic is the same as for the 'status' command.
9097

91-
'h': snapshot(xid)
98+
't': snapshot(xid)
9299
Tells the arbiter to generate a snapshot for the global transaction
93100
identified by the given 'xid'. The arbiter will create a snapshot for
94101
every participant, so when each of them asks for the snapshot it will

contrib/pg_dtm/dtmd/include/dtmdlimits.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
#define HEARTBEAT_TIMEOUT_MS 20
1212
#define ELECTION_TIMEOUT_MS_MIN 150
1313
#define ELECTION_TIMEOUT_MS_MAX 300
14-
#define RAFT_LOGLEN 10240
15-
#define RAFT_KEEP_APPLIED 4096 // how many applied entries to keep during compaction
14+
#define RAFT_LOGLEN 1024
15+
#define RAFT_KEEP_APPLIED 512 // how many applied entries to keep during compaction
1616

1717
#endif

contrib/pg_dtm/dtmd/include/proto.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
#ifndef PROTO_H
22
#define PROTO_H
33

4+
#define CMD_HELLO 'h'
45
#define CMD_RESERVE 'r'
56
#define CMD_BEGIN 'b'
67
#define CMD_FOR 'y'
78
#define CMD_AGAINST 'n'
8-
#define CMD_SNAPSHOT 'h'
9+
#define CMD_SNAPSHOT 't'
910
#define CMD_STATUS 's'
1011

1112
#define RES_FAILED 0xDEADBEEF

contrib/pg_dtm/dtmd/src/main.c

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ static void notify_listeners(Transaction *t, int status) {
8282
// notify 'status' listeners about the transaction status
8383
case BLANK:
8484
while ((listener = transaction_pop_listener(t, 's'))) {
85+
debug("[%d] notifying the client about xid=%u (unknown)\n", CLIENT_ID(listener), t->xid);
8586
client_message_shortcut(
8687
(client_t)listener,
8788
RES_TRANSACTION_UNKNOWN
@@ -90,7 +91,7 @@ static void notify_listeners(Transaction *t, int status) {
9091
break;
9192
case NEGATIVE:
9293
while ((listener = transaction_pop_listener(t, 's'))) {
93-
// notify 'status' listeners about the transaction status
94+
debug("[%d] notifying the client about xid=%u (aborted)\n", CLIENT_ID(listener), t->xid);
9495
client_message_shortcut(
9596
(client_t)listener,
9697
RES_TRANSACTION_ABORTED
@@ -99,7 +100,7 @@ static void notify_listeners(Transaction *t, int status) {
99100
break;
100101
case POSITIVE:
101102
while ((listener = transaction_pop_listener(t, 's'))) {
102-
// notify 'status' listeners about the transaction status
103+
debug("[%d] notifying the client about xid=%u (committed)\n", CLIENT_ID(listener), t->xid);
103104
client_message_shortcut(
104105
(client_t)listener,
105106
RES_TRANSACTION_COMMITTED
@@ -108,7 +109,7 @@ static void notify_listeners(Transaction *t, int status) {
108109
break;
109110
case DOUBT:
110111
while ((listener = transaction_pop_listener(t, 's'))) {
111-
// notify 'status' listeners about the transaction status
112+
debug("[%d] notifying the client about xid=%u (inprogress)\n", CLIENT_ID(listener), t->xid);
112113
client_message_shortcut(
113114
(client_t)listener,
114115
RES_TRANSACTION_INPROGRESS
@@ -122,19 +123,22 @@ static void apply_clog_update(int action, int argument) {
122123
int status = action;
123124
xid_t xid = argument;
124125
assert((status == NEGATIVE) || (status == POSITIVE));
126+
debug("APPLYING: xid=%u, status=%d\n", xid, status);
125127

126128
if (!clog_write(clg, xid, status)) {
127-
shout("APPLY: failed to write to clog, xid=%d\n", xid);
129+
shout("APPLY: failed to write to clog, xid=%u\n", xid);
128130
}
129131

130-
Transaction *t = find_transaction(xid);
131-
if (t == NULL) {
132-
debug("APPLY: xid %u is not active\n", xid);
133-
return;
134-
}
132+
if (!use_raft || (raft.role == ROLE_LEADER)) {
133+
Transaction *t = find_transaction(xid);
134+
if (t == NULL) {
135+
debug("APPLY: xid=%u is not active\n", xid);
136+
return;
137+
}
135138

136-
notify_listeners(t, status);
137-
free_transaction(t);
139+
notify_listeners(t, status);
140+
free_transaction(t);
141+
}
138142
}
139143

140144
static int next_client_id = 0;
@@ -153,14 +157,16 @@ static void ondisconnect(client_t client) {
153157
// need to abort the transaction this client is participating in
154158
for (t = (Transaction*)active_transactions.next; t != (Transaction*)&active_transactions; t = (Transaction*)t->elem.next) {
155159
if (t->xid == CLIENT_XID(client)) {
156-
raft_emit(&raft, NEGATIVE, t->xid);
160+
if (use_raft && (raft.role == ROLE_LEADER)) {
161+
raft_emit(&raft, NEGATIVE, t->xid);
162+
}
157163
break;
158164
}
159165
}
160166

161167
if (t == (Transaction*)&active_transactions) {
162168
shout(
163-
"[%d] DISCONNECT: transaction %u not found O_o\n",
169+
"[%d] DISCONNECT: transaction xid=%u not found O_o\n",
164170
CLIENT_ID(client), CLIENT_XID(client)
165171
);
166172
}
@@ -175,6 +181,7 @@ static void debug_cmd(client_t client, int argc, xid_t *argv) {
175181
char *cmdname;
176182
assert(argc > 0);
177183
switch (argv[0]) {
184+
case CMD_HELLO : cmdname = "HELLO"; break;
178185
case CMD_RESERVE : cmdname = "RESERVE"; break;
179186
case CMD_BEGIN : cmdname = "BEGIN"; break;
180187
case CMD_FOR : cmdname = "FOR"; break;
@@ -203,6 +210,9 @@ static void debug_cmd(client_t client, int argc, xid_t *argv) {
203210
} \
204211
} while (0)
205212

213+
#define CHECKLEADER(CLIENT) \
214+
CHECK(raft.role == ROLE_LEADER, CLIENT, "not a leader")
215+
206216
static xid_t max_of_xids(xid_t a, xid_t b) {
207217
return a > b ? a : b;
208218
}
@@ -232,6 +242,17 @@ static void gen_snapshot(Snapshot *s) {
232242
}
233243
}
234244

245+
static void onhello(client_t client, int argc, xid_t *argv) {
246+
CHECK(argc == 1, client, "HELLO: wrong number of arguments");
247+
248+
debug("[%d] HELLO\n", CLIENT_ID(client));
249+
if (raft.role == ROLE_LEADER) {
250+
client_message_shortcut(client, RES_OK);
251+
} else {
252+
client_message_shortcut(client, RES_FAILED);
253+
}
254+
}
255+
235256
static void onreserve(client_t client, int argc, xid_t *argv) {
236257
CHECK(argc == 3, client, "RESERVE: wrong number of arguments");
237258

@@ -344,10 +365,12 @@ static void onbegin(client_t client, int argc, xid_t *argv) {
344365
static bool queue_for_transaction_finish(client_t client, xid_t xid, char cmd) {
345366
assert((cmd >= 'a') && (cmd <= 'z'));
346367

368+
debug("[%d] QUEUE for xid=%u status\n", CLIENT_ID(client), xid);
369+
347370
Transaction *t = find_transaction(xid);
348371
if (t == NULL) {
349372
shout(
350-
"[%d] QUEUE: xid %u not found\n",
373+
"[%d] QUEUE: xid=%u not found\n",
351374
CLIENT_ID(client), xid
352375
);
353376
client_message_shortcut(client, RES_FAILED);
@@ -378,7 +401,7 @@ static void onvote(client_t client, int argc, xid_t *argv, int vote) {
378401
Transaction *t = find_transaction(xid);
379402
if (t == NULL) {
380403
shout(
381-
"[%d] VOTE: xid %u not found\n",
404+
"[%d] VOTE: xid=%u not found\n",
382405
CLIENT_ID(client), xid
383406
);
384407
client_message_shortcut(client, RES_FAILED);
@@ -445,7 +468,7 @@ static void onsnapshot(client_t client, int argc, xid_t *argv) {
445468
Transaction *t = find_transaction(xid);
446469
if (t == NULL) {
447470
shout(
448-
"[%d] SNAPSHOT: xid %u not found\n",
471+
"[%d] SNAPSHOT: xid=%u not found\n",
449472
CLIENT_ID(client), xid
450473
);
451474
client_message_shortcut(client, RES_FAILED);
@@ -543,22 +566,31 @@ static void oncmd(client_t client, int argc, xid_t *argv) {
543566

544567
assert(argc > 0);
545568
switch (argv[0]) {
569+
case CMD_HELLO:
570+
onhello(client, argc, argv);
571+
break;
546572
case CMD_RESERVE:
573+
CHECKLEADER(client);
547574
onreserve(client, argc, argv);
548575
break;
549576
case CMD_BEGIN:
577+
CHECKLEADER(client);
550578
onbegin(client, argc, argv);
551579
break;
552580
case CMD_FOR:
581+
CHECKLEADER(client);
553582
onvote(client, argc, argv, POSITIVE);
554583
break;
555584
case CMD_AGAINST:
585+
CHECKLEADER(client);
556586
onvote(client, argc, argv, NEGATIVE);
557587
break;
558588
case CMD_SNAPSHOT:
589+
CHECKLEADER(client);
559590
onsnapshot(client, argc, argv);
560591
break;
561592
case CMD_STATUS:
593+
CHECKLEADER(client);
562594
onstatus(client, argc, argv);
563595
break;
564596
default:

contrib/pg_dtm/dtmd/src/raft.c

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ int raft_create_udp_socket(raft_t *r) {
143143
return -1;
144144
}
145145
me->addr.sin_port = htons(me->port);
146-
debug("binding %s:%d\n", me->host, me->port);
146+
debug("binding udp %s:%d\n", me->host, me->port);
147147
if (bind(r->sock, (struct sockaddr*)&me->addr, sizeof(me->addr)) == -1) {
148148
shout("cannot bind the socket: %s\n", strerror(errno));
149149
return -1;
@@ -321,7 +321,6 @@ static int raft_log_compact(raft_log_t *l, int keep_applied) {
321321
snap.minarg = min(snap.minarg, e->argument);
322322
snap.maxarg = max(snap.maxarg, e->argument);
323323
}
324-
e->snapshot = false; // FIXME: should not need this, find the code where it is not set on new entry insertion
325324
compacted++;
326325
}
327326
if (compacted) {
@@ -339,7 +338,7 @@ bool raft_emit(raft_t *r, int action, int argument) {
339338

340339
if (r->log.size == RAFT_LOGLEN) {
341340
int compacted = raft_log_compact(&r->log, RAFT_KEEP_APPLIED);
342-
if (compacted) {
341+
if (compacted > 1) {
343342
shout("compacted %d entries\n", compacted);
344343
} else {
345344
shout(
@@ -348,10 +347,9 @@ bool raft_emit(raft_t *r, int action, int argument) {
348347
);
349348
return false;
350349
}
351-
return false;
352350
}
353351

354-
raft_entry_t *e = &RAFT_LOG(r, r->log.size);
352+
raft_entry_t *e = &RAFT_LOG(r, r->log.first + r->log.size);
355353
e->snapshot = false;
356354
e->term = r->term;
357355
e->action = action;

contrib/pg_dtm/dtmd/src/server.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ static int create_listening_socket(const char *host, int port) {
8585
return -1;
8686
}
8787
addr.sin_port = htons(port);
88-
debug("binding %s:%d\n", host, port);
88+
debug("binding tcp %s:%d\n", host, port);
8989
if (bind(s, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
9090
shout("cannot bind the listening socket: %s\n", strerror(errno));
9191
return -1;

0 commit comments

Comments
 (0)