@@ -26,7 +26,6 @@ typedef struct client_data_t {
2626} client_data_t ;
2727
2828clog_t clg ;
29- static bool queue_for_transaction_finish (void * stream , void * clientdata , int node , xid_t xid , char cmd );
3029static client_data_t * create_client_data (int id );
3130static void free_client_data (client_data_t * cd );
3231static void onconnect (void * stream , void * * clientdata );
@@ -39,6 +38,7 @@ static void gen_snapshot(Snapshot *s, int node);
3938static void gen_snapshots (GlobalTransaction * gt );
4039static char * onsnapshot (void * stream , void * clientdata , cmd_t * cmd );
4140static bool queue_for_transaction_finish (void * stream , void * clientdata , int node , xid_t xid , char cmd );
41+ static void notify_listeners (GlobalTransaction * gt , int status );
4242static char * onstatus (void * stream , void * clientdata , cmd_t * cmd );
4343static char * onnoise (void * stream , void * clientdata , cmd_t * cmd );
4444static char * oncmd (void * stream , void * clientdata , cmd_t * cmd );
@@ -68,7 +68,33 @@ static void onconnect(void *stream, void **clientdata) {
6868}
6969
7070static void ondisconnect (void * stream , void * clientdata ) {
71- shout ("[%d] disconnected\n" , CLIENT_ID (clientdata ));
71+ int client_id = CLIENT_ID (clientdata );
72+ shout ("[%d] disconnected\n" , client_id );
73+
74+ int i , n ;
75+ for (i = transactions_count - 1 ; i >= 0 ; i -- ) {
76+ GlobalTransaction * gt = transactions + i ;
77+
78+ for (n = 0 ; n < MAX_NODES ; n ++ ) {
79+ Transaction * t = gt -> participants + n ;
80+ if ((t -> active ) && (t -> client_id == client_id )) {
81+ if (global_transaction_mark (clg , gt , NEGATIVE )) {
82+ notify_listeners (gt , NEGATIVE );
83+
84+ transactions [i ] = transactions [transactions_count - 1 ];
85+ transactions_count -- ;
86+ } else {
87+ shout (
88+ "[%d] DISCONNECT: global transaction failed"
89+ " to abort O_o\n" ,
90+ client_id
91+ );
92+ }
93+ break ;
94+ }
95+ }
96+ }
97+
7298 free_client_data (clientdata );
7399}
74100
@@ -149,6 +175,7 @@ static char
7440
*onbegin(void *stream, void *clientdata, cmd_t *cmd) {
149175 );
150176 return strdup ("-" );
151177 }
178+ t -> client_id = CLIENT_ID (clientdata );
152179 t -> active = true;
153180 t -> node = node ;
154181 t -> vote = DOUBT ;
@@ -172,6 +199,32 @@ static char *onbegin(void *stream, void *clientdata, cmd_t *cmd) {
172199 return strdup ("+" );
173200}
174201
202+ static void notify_listeners (GlobalTransaction * gt , int status ) {
203+ void * listener ;
204+ switch (status ) {
205+ case NEGATIVE :
206+ while ((listener = global_transaction_pop_listener (gt , 's' ))) {
207+ // notify 'status' listeners about the aborted status
208+ write_to_stream (listener , strdup ("+a" ));
209+ }
210+ while ((listener = global_transaction_pop_listener (gt , 'c' ))) {
211+ // notify 'commit' listeners about the failure
212+ write_to_stream (listener , strdup ("-" ));
213+ }
214+ break ;
215+ case POSITIVE :
216+ while ((listener = global_transaction_pop_listener (gt , 's' ))) {
217+ // notify 'status' listeners about the committed status
218+ write_to_stream (listener , strdup ("+c" ));
219+ }
220+ while ((listener = global_transaction_pop_listener (gt , 'c' ))) {
221+ // notify 'commit' listeners about the success
222+ write_to_stream (listener , strdup ("+" ));
223+ }
224+ break ;
225+ }
226+ }
227+
175228static char * onvote (void * stream , void * clientdata , cmd_t * cmd , int vote ) {
176229 assert ((vote == POSITIVE ) || (vote == NEGATIVE ));
177230
@@ -218,18 +271,11 @@ static char *onvote(void *stream, void *clientdata, cmd_t *cmd, int vote) {
218271 }
219272 transactions [i ].participants [node ].vote = vote ;
220273
221- switch (global_transaction_status (transactions + i )) {
274+ GlobalTransaction * gt = transactions + i ;
275+ switch (global_transaction_status (gt )) {
222276 case NEGATIVE :
223- if (global_transaction_mark (clg , transactions + i , NEGATIVE )) {
224- void * listener ;
225- while ((listener = global_transaction_pop_listener (transactions + i , 's' ))) {
226- // notify 'status' listeners about the aborted status
227- write_to_stream (listener , strdup ("+a" ));
228- }
229- while ((listener = global_transaction_pop_listener (transactions + i , 'c' ))) {
230- // notify 'commit' listeners about the failure
231- write_to_stream (listener , strdup ("-" ));
232- }
277+ if (global_transaction_mark (clg , gt , NEGATIVE )) {
278+ notify_listeners (gt , NEGATIVE );
233279
234280 transactions [i ] = transactions [transactions_count - 1 ];
235281 transactions_count -- ;
@@ -258,20 +304,7 @@ static char *onvote(void *stream, void *clientdata, cmd_t *cmd, int vote) {
258304 }
259305 case POSITIVE :
260306 if (global_transaction_mark (clg , transactions + i , POSITIVE )) {
261- //shout(
262- // "[%d] VOTE: global transaction committed\n",
263- // CLIENT_ID(clientdata)
264- //);
265-
266- void * listener ;
267- while ((listener = global_transaction_pop_listener (transactions + i , 's' ))) {
268- // notify 'status' listeners about the committed status
269- write_to_stream (listener , strdup ("+c" ));
270- }
271- while ((listener = global_transaction_pop_listener (transactions + i , 'c' ))) {
272- // notify 'commit' listeners about the success
273- write_to_stream (listener , strdup ("+" ));
274- }
307+ notify_listeners (gt , POSITIVE );
275308
276309 transactions [i ] = transactions [transactions_count - 1 ];
277310 transactions_count -- ;
0 commit comments