Skip to content

Commit b34d59e

Browse files
committed
Fix some bugs in dtmd.
1 parent eb08af1 commit b34d59e

File tree

4 files changed

+26
-19
lines changed

4 files changed

+26
-19
lines changed

contrib/pg_dtm/dtmd/src/main.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ static void free_client_userdata(client_userdata_t *cd) {
5252

5353
static int next_client_id = 0;
5454
static void onconnect(client_t client) {
55-
debug("[%d] connected\n", CLIENT_ID(client));
5655
client_userdata_t *cd = create_client_userdata(next_client_id++);
5756
client_set_userdata(client, cd);
57+
debug("[%d] connected\n", CLIENT_ID(client));
5858
}
5959

6060
static void notify_listeners(Transaction *t, int status) {
@@ -266,7 +266,7 @@ static void onbegin(client_t client, int argc, xid_t *argv) {
266266
);
267267

268268
CHECK(
269-
argc == 0,
269+
argc == 1,
270270
client,
271271
"BEGIN: wrong number of arguments"
272272
);
@@ -643,12 +643,12 @@ void kill_the_elder(char *datadir) {
643643
break;
644644
}
645645
}
646-
debug("SIGTERM sent to pid=%d\n" pid);
647-
debug("waiting for pid=%d to die\n" pid);
646+
debug("SIGTERM sent to pid=%d\n", pid);
647+
debug("waiting for pid=%d to die\n", pid);
648648
waitpid(pid, NULL, 0);
649-
debug("pid=%d died\n" pid);
649+
debug("pid=%d died\n", pid);
650650
} else {
651-
debug("no elder to kill\n" pid);
651+
debug("no elder to kill\n");
652652
}
653653
}
654654

contrib/pg_dtm/dtmd/src/server.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,14 @@ static void stream_init(stream_t stream, int fd) {
173173
int i;
174174
stream->input.data = malloc(BUFFER_SIZE);
175175
assert(stream->input.data);
176+
stream->input.curmessage = NULL;
176177
stream->input.ready = 0;
178+
177179
stream->output.data = malloc(BUFFER_SIZE);
178180
assert(stream->output.data);
181+
stream->output.curmessage = NULL;
179182
stream->output.ready = 0;
183+
180184
stream->fd = fd;
181185
stream->good = true;
182186

@@ -458,8 +462,8 @@ void server_loop(server_t server) {
458462
stream_t stream = server->streams + i;
459463
if (FD_ISSET(stream->fd, &readfds)) {
460464
server_stream_handle(server, stream);
465+
numready--;
461466
}
462-
numready--;
463467
}
464468

465469
server_flush(server);

contrib/pg_dtm/libdtm.c

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -161,14 +161,19 @@ static bool dtm_send_command(DTMConn dtm, xid_t cmd, int argc, ...)
161161
char *cursor = buf;
162162

163163
ShubMessageHdr *msg = (ShubMessageHdr*)cursor;
164+
msg->chan = 0;
164165
msg->code = MSG_FIRST_USER_CODE;
165166
msg->size = sizeof(xid_t) * (argc + 1);
166167
cursor += sizeof(ShubMessageHdr);
167168

169+
*(xid_t*)cursor = cmd;
170+
cursor += sizeof(xid_t);
171+
168172
va_start(argv, argc);
169173
for (i = 0; i < argc; i++)
170174
{
171175
*(xid_t*)cursor = va_arg(argv, xid_t);
176+
cursor += sizeof(xid_t);
172177
}
173178
va_end(argv);
174179

@@ -258,19 +263,19 @@ TransactionId DtmGlobalStartTransaction(Snapshot snapshot, TransactionId *gxmin)
258263

259264
// results
260265
reslen = dtm_recv_results(dtm, RESULTS_SIZE, results);
261-
if (reslen < 6) goto failure;
266+
if (reslen < 5) goto failure;
262267
if (results[0] != RES_OK) goto failure;
263268
xid = results[1];
264269
*gxmin = results[2];
270+
271+
DtmInitSnapshot(snapshot);
265272
snapshot->xmin = results[3];
266273
snapshot->xmax = results[4];
267-
snapshot->xcnt = results[5];
268-
269-
if (reslen != 6 + snapshot->xcnt) goto failure;
274+
snapshot->xcnt = reslen - 5;
270275

271276
for (i = 0; i < snapshot->xcnt; i++)
272277
{
273-
snapshot->xip[i] = results[6 + i];
278+
snapshot->xip[i] = results[5 + i];
274279
}
275280

276281
return xid;
@@ -295,18 +300,17 @@ void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot, TransactionId *g
295300

296301
// response
297302
reslen = dtm_recv_results(dtm, RESULTS_SIZE, results);
298-
if (reslen < 5) goto failure;
303+
if (reslen < 4) goto failure;
299304
if (results[0] != RES_OK) goto failure;
300305
*gxmin = results[1];
306+
DtmInitSnapshot(snapshot);
301307
snapshot->xmin = results[2];
302308
snapshot->xmax = results[3];
303-
snapshot->xcnt = results[4];
304-
305-
if (reslen != 5 + snapshot->xcnt) goto failure;
309+
snapshot->xcnt = reslen - 4;
306310

307311
for (i = 0; i < snapshot->xcnt; i++)
308312
{
309-
snapshot->xip[i] = results[5 + i];
313+
snapshot->xip[i] = results[4 + i];
310314
}
311315

312316
return;

contrib/pg_dtm/tests/daemons.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,7 @@ func postgres(bin string, datadir string, port int, nodeid int, wg *sync.WaitGro
9191
bin,
9292
"-D", datadir,
9393
"-p", strconv.Itoa(port),
94-
"-c", "dtm.node_id=" + strconv.Itoa(nodeid),
95-
"-c", "dtm.host=127.0.0.2",
94+
"-c", "dtm.host=127.0.0.1",
9695
"-c", "dtm.port=" + strconv.Itoa(5431),
9796
"-c", "autovacuum=off",
9897
"-c", "fsync=off",

0 commit comments

Comments
 (0)