@@ -82,6 +82,7 @@ static void notify_listeners(Transaction *t, int status) {
82
82
// notify 'status' listeners about the transaction status
83
83
case BLANK :
84
84
while ((listener = transaction_pop_listener (t , 's' ))) {
85
+ debug ("[%d] notifying the client about xid=%u (unknown)\n" , CLIENT_ID (listener ), t -> xid );
85
86
client_message_shortcut (
86
87
(client_t )listener ,
87
88
RES_TRANSACTION_UNKNOWN
@@ -90,7 +91,7 @@ static void notify_listeners(Transaction *t, int status) {
90
91
break ;
91
92
case NEGATIVE :
92
93
while ((listener = transaction_pop_listener (t , 's' ))) {
93
- // notify 'status' listeners about the transaction status
94
+ debug ( "[%d] notifying the client about xid=%u (aborted)\n" , CLIENT_ID ( listener ), t -> xid );
94
95
client_message_shortcut (
95
96
(client_t )listener ,
96
97
RES_TRANSACTION_ABORTED
@@ -99,7 +100,7 @@ static void notify_listeners(Transaction *t, int status) {
99
100
break ;
100
101
case POSITIVE :
101
102
while ((listener = transaction_pop_listener (t , 's' ))) {
102
- // notify 'status' listeners about the transaction status
103
+ debug ( "[%d] notifying the client about xid=%u (committed)\n" , CLIENT_ID ( listener ), t -> xid );
103
104
client_message_shortcut (
104
105
(client_t )listener ,
105
106
RES_TRANSACTION_COMMITTED
@@ -108,7 +109,7 @@ static void notify_listeners(Transaction *t, int status) {
108
109
break ;
109
110
case DOUBT :
110
111
while ((listener = transaction_pop_listener (t , 's' ))) {
111
- // notify 'status' listeners about the transaction status
112
+ debug ( "[%d] notifying the client about xid=%u (inprogress)\n" , CLIENT_ID ( listener ), t -> xid );
112
113
client_message_shortcut (
113
114
(client_t )listener ,
114
115
RES_TRANSACTION_INPROGRESS
@@ -122,19 +123,22 @@ static void apply_clog_update(int action, int argument) {
122
123
int status = action ;
123
124
xid_t xid = argument ;
124
125
assert ((status == NEGATIVE ) || (status == POSITIVE ));
126
+ debug ("APPLYING: xid=%u, status=%d\n" , xid , status );
125
127
126
128
if (!clog_write (clg , xid , status )) {
127
- shout ("APPLY: failed to write to clog, xid=%d \n" , xid );
129
+ shout ("APPLY: failed to write to clog, xid=%u \n" , xid );
128
130
}
129
131
130
- Transaction * t = find_transaction (xid );
131
- if (t == NULL ) {
132
- debug ("APPLY: xid %u is not active\n" , xid );
133
- return ;
134
- }
132
+ if (!use_raft || (raft .role == ROLE_LEADER )) {
133
+ Transaction * t = find_transaction (xid );
134
+ if (t == NULL ) {
135
+ debug ("APPLY: xid=%u is not active\n" , xid );
136
+ return ;
137
+ }
135
138
136
- notify_listeners (t , status );
137
- free_transaction (t );
139
+ notify_listeners (t , status );
140
+ free_transaction (t );
141
+ }
138
142
}
139
143
140
144
static int next_client_id = 0 ;
@@ -153,14 +157,16 @@ static void ondisconnect(client_t client) {
153
157
// need to abort the transaction this client is participating in
154
158
for (t = (Transaction * )active_transactions .next ; t != (Transaction * )& active_transactions ; t = (Transaction * )t -> elem .next ) {
155
159
if (t -> xid == CLIENT_XID (client )) {
156
- raft_emit (& raft , NEGATIVE , t -> xid );
160
+ if (use_raft && (raft .role == ROLE_LEADER )) {
161
+ raft_emit (& raft , NEGATIVE , t -> xid );
162
+ }
157
163
break ;
158
164
}
159
165
}
160
166
161
167
if (t == (Transaction * )& active_transactions ) {
162
168
shout (
163
- "[%d] DISCONNECT: transaction %u not found O_o\n" ,
169
+ "[%d] DISCONNECT: transaction xid= %u not found O_o\n" ,
164
170
CLIENT_ID (client ), CLIENT_XID (client )
165
171
);
166
172
}
@@ -175,6 +181,7 @@ static void debug_cmd(client_t client, int argc, xid_t *argv) {
175
181
char * cmdname ;
176
182
assert (argc > 0 );
177
183
switch (argv [0 ]) {
184
+ case CMD_HELLO : cmdname = "HELLO" ; break ;
178
185
case CMD_RESERVE : cmdname = "RESERVE" ; break ;
179
186
case CMD_BEGIN : cmdname = "BEGIN" ; break ;
180
187
case CMD_FOR : cmdname = "FOR" ; break ;
@@ -203,6 +210,9 @@ static void debug_cmd(client_t client, int argc, xid_t *argv) {
203
210
} \
204
211
} while (0)
205
212
213
+ #define CHECKLEADER (CLIENT ) \
214
+ CHECK(raft.role == ROLE_LEADER, CLIENT, "not a leader")
215
+
206
216
static xid_t max_of_xids (xid_t a , xid_t b ) {
207
217
return a > b ? a : b ;
208
218
}
@@ -232,6 +242,17 @@ static void gen_snapshot(Snapshot *s) {
232
242
}
233
243
}
234
244
245
+ static void onhello (client_t client , int argc , xid_t * argv ) {
246
+ CHECK (argc == 1 , client , "HELLO: wrong number of arguments" );
247
+
248
+ debug ("[%d] HELLO\n" , CLIENT_ID (client ));
249
+ if (raft .role == ROLE_LEADER ) {
250
+ client_message_shortcut (client , RES_OK );
251
+ } else {
252
+ client_message_shortcut (client , RES_FAILED );
253
+ }
254
+ }
255
+
235
256
static void onreserve (client_t client , int argc , xid_t * argv ) {
236
257
CHECK (argc == 3 , client , "RESERVE: wrong number of arguments" );
237
258
@@ -344,10 +365,12 @@ static void onbegin(client_t client, int argc, xid_t *argv) {
344
365
static bool queue_for_transaction_finish (client_t client , xid_t xid , char cmd ) {
345
366
assert ((cmd >= 'a' ) && (cmd <= 'z' ));
346
367
368
+ debug ("[%d] QUEUE for xid=%u status\n" , CLIENT_ID (client ), xid );
369
+
347
370
Transaction * t = find_transaction (xid );
348
371
if (t == NULL ) {
349
372
shout (
350
- "[%d] QUEUE: xid %u not found\n" ,
373
+ "[%d] QUEUE: xid= %u not found\n" ,
351
374
CLIENT_ID (client ), xid
352
375
);
353
376
client_message_shortcut (client , RES_FAILED );
@@ -378,7 +401,7 @@ static void onvote(client_t client, int argc, xid_t *argv, int vote) {
378
401
Transaction * t = find_transaction (xid );
379
402
if (t == NULL ) {
380
403
shout (
381
- "[%d] VOTE: xid %u not found\n" ,
404
+ "[%d] VOTE: xid= %u not found\n" ,
382
405
CLIENT_ID (client ), xid
383
406
);
384
407
client_message_shortcut (client , RES_FAILED );
@@ -445,7 +468,7 @@ static void onsnapshot(client_t client, int argc, xid_t *argv) {
445
468
Transaction * t = find_transaction (xid );
446
469
if (t == NULL ) {
447
470
shout (
448
- "[%d] SNAPSHOT: xid %u not found\n" ,
471
+ "[%d] SNAPSHOT: xid= %u not found\n" ,
449
472
CLIENT_ID (client ), xid
450
473
);
451
474
client_message_shortcut (client , RES_FAILED );
@@ -543,22 +566,31 @@ static void oncmd(client_t client, int argc, xid_t *argv) {
543
566
544
567
assert (argc > 0 );
545
568
switch (argv [0 ]) {
569
+ case CMD_HELLO :
570
+ onhello (client , argc , argv );
571
+ break ;
546
572
case CMD_RESERVE :
573
+ CHECKLEADER (client );
547
574
onreserve (client , argc , argv );
548
575
break ;
549
576
case CMD_BEGIN :
577
+ CHECKLEADER (client );
550
578
onbegin (client , argc , argv );
551
579
break ;
552
580
case CMD_FOR :
581
+ CHECKLEADER (client );
553
582
onvote (client , argc , argv , POSITIVE );
554
583
break ;
555
584
case CMD_AGAINST :
585
+ CHECKLEADER (client );
556
586
onvote (client , argc , argv , NEGATIVE );
557
587
break ;
558
588
case CMD_SNAPSHOT :
589
+ CHECKLEADER (client );
559
590
onsnapshot (client , argc , argv );
560
591
break ;
561
592
case CMD_STATUS :
593
+ CHECKLEADER (client );
562
594
onstatus (client , argc , argv );
563
595
break ;
564
596
default :
0 commit comments