@@ -57,13 +57,17 @@ public void wakeUp() throws RemoteException {
57
57
@ Override
58
58
public void run () {
59
59
try {
60
- nodes [edge .getReceiver (id )].receive (msg , edge );
60
+ int receiverId = edge .getReceiver (id );
61
+ Thread .sleep (10 );
62
+ nodes [receiverId ].receive (msg , edge );
61
63
} catch (RemoteException e ) {
62
64
e .printStackTrace ();
65
+ } catch (InterruptedException e ) {
66
+ e .printStackTrace ();
63
67
}
64
68
}
65
69
},
66
- 10
70
+ 0
67
71
);
68
72
}
69
73
@@ -148,6 +152,7 @@ public void run() {
148
152
10
149
153
);
150
154
if (state == NodeState .FIND ) {
155
+ System .out .println (id + ": Connect Incrementing findcount" );
151
156
findCount .getAndIncrement ();
152
157
}
153
158
} else {
@@ -159,13 +164,16 @@ public void run() {
159
164
@ Override
160
165
public void run () {
161
166
try {
162
- nodes [id ].receiveConnect (message , link );
167
+ Thread .sleep (300 );
168
+ nodes [id ].receive (message , link );
163
169
} catch (RemoteException e ) {
164
170
e .printStackTrace ();
171
+ } catch (InterruptedException e ) {
172
+ e .printStackTrace ();
165
173
}
166
174
}
167
175
},
168
- 300 );
176
+ 0 );
169
177
} else {
170
178
// the merging case: l == l' and MOE == MOE'
171
179
Message msg = new Message (MessageType .INITIATE , level + 1 , link .getWeight (), NodeState .FIND );
@@ -175,16 +183,17 @@ public void run() {
175
183
@ Override
176
184
public void run () {
177
185
try {
178
- Thread .sleep (30 );//It is added to preserve order of messages sended from certain node
179
- nodes [link .getReceiver (id )].receive (msg , link );
186
+ int receiverId = link .getReceiver (id );
187
+ Thread .sleep (40 );//It is added to preserve order of messages sended from certain node
188
+ nodes [receiverId ].receive (msg , link );
180
189
} catch (RemoteException e ) {
181
190
e .printStackTrace ();
182
191
} catch (InterruptedException e ) {
183
192
e .printStackTrace ();
184
193
}
185
194
}
186
195
},
187
- 10 );
196
+ 0 );
188
197
}
189
198
}
190
199
}
@@ -198,25 +207,29 @@ private synchronized void receiveInitiate(Message message, Link link) throws Rem
198
207
bestEdge = null ;
199
208
weightBestAdjacent = Double .POSITIVE_INFINITY ;
200
209
201
- for (Link adjescentLink : this .links ) {//TODO: Not sure if this is the correct one list of links
210
+ for (Link adjescentLink : this .links ) {
202
211
if (adjescentLink .compareTo (link ) != 0 && adjescentLink .getState () == LinkState .IN_MST ) {
203
212
Message msg = new Message (MessageType .INITIATE , level , fragmentName , state );
204
213
System .out .println (id + ": Sending Initiate to " + adjescentLink .getReceiver (id ));
205
- Link copy = new Link (adjescentLink );
206
214
new java .util .Timer ().schedule (
207
215
new java .util .TimerTask () {
208
216
@ Override
209
217
public void run () {
210
218
try {
219
+ Link copy = new Link (adjescentLink );
220
+ Thread .sleep (10 );
211
221
nodes [copy .getReceiver (id )].receive (msg , copy );
212
222
} catch (RemoteException e ) {
213
223
e .printStackTrace ();
224
+ } catch (InterruptedException e ) {
225
+ e .printStackTrace ();
214
226
}
215
227
}
216
228
},
217
- 10
229
+ 0
218
230
);
219
231
if (state == NodeState .FIND ) {
232
+ System .out .println (id + ": Initiate Incrementing findcount" );
220
233
findCount .getAndIncrement ();
221
234
// the messages sent
222
235
}
@@ -232,20 +245,23 @@ private synchronized void test() throws RemoteException {
232
245
if (links .stream ().anyMatch (p -> p .getState () == LinkState .CANDIDATE_IN_MST )) {
233
246
testEdge = links .stream ().filter (p -> p .getState () == LinkState .CANDIDATE_IN_MST ).min (new LinkComparator ()).get ();
234
247
Message msg = new Message (MessageType .TEST , level , fragmentName );
235
- Link copy = new Link (testEdge );
236
248
System .out .println (id + ":Sending Receive test to " + testEdge .getReceiver (id ));
249
+ int reciverID = testEdge .getReceiver (id );
237
250
new java .util .Timer ().schedule (
238
251
new java .util .TimerTask () {
239
252
@ Override
240
253
public void run () {
241
254
try {
242
- nodes [copy .getReceiver (id )].receive (msg , testEdge );
255
+ Thread .sleep (10 );
256
+ nodes [reciverID ].receive (msg , testEdge );
243
257
} catch (RemoteException e ) {
244
258
e .printStackTrace ();
259
+ } catch (InterruptedException e ) {
260
+ e .printStackTrace ();
245
261
}
246
262
}
247
263
},
248
- 10
264
+ 0
249
265
);
250
266
} else {
251
267
if (testEdge != null ) {
@@ -272,13 +288,16 @@ public synchronized void receiveTest(Message message, Link link) throws RemoteEx
272
288
@ Override
273
289
public void run () {
274
290
try {
291
+ Thread .sleep (300 );
275
292
nodes [id ].receiveTest (message , link );
276
293
} catch (RemoteException e ) {
277
294
e .printStackTrace ();
295
+ } catch (InterruptedException e ) {
296
+ e .printStackTrace ();
278
297
}
279
298
}
280
299
},
281
- 300 );
300
+ 0 );
282
301
// receiving again with a delay
283
302
} else {
284
303
// if the name of the fragment is different then a possible MOE is identified
@@ -291,13 +310,16 @@ public void run() {
291
310
@ Override
292
311
public void run () {
293
312
try {
313
+ Thread .sleep (10 );
294
314
nodes [link .getReceiver (id )].receive (msg , link );
295
315
} catch (RemoteException e ) {
296
316
e .printStackTrace ();
317
+ } catch (InterruptedException e ) {
318
+ e .printStackTrace ();
297
319
}
298
320
}
299
321
},
300
- 10
322
+ 0
301
323
);
302
324
} else {
303
325
if (link .getState () == LinkState .CANDIDATE_IN_MST ) {
@@ -307,20 +329,24 @@ public void run() {
307
329
// if the node hasn't set this edge as testEdge then it sends a reject
308
330
// because they are in the same fragment with the sender
309
331
if (link .compareTo (testEdge ) != 0 ) {
310
- System .out .println (id + ":Sending Reject to " + link .getReceiver (id ));
311
- Message msg = new Message (MessageType .REJECT );
332
+
312
333
new java .util .Timer ().schedule (
313
334
new java .util .TimerTask () {
314
335
@ Override
315
336
public void run () {
316
337
try {
338
+ System .out .println (id + ":Sending Reject to " + link .getReceiver (id ));
339
+ Message msg = new Message (MessageType .REJECT );
340
+ Thread .sleep (10 );
317
341
nodes [link .getReceiver (id )].receive (msg , link );
318
342
} catch (RemoteException e ) {
319
343
e .printStackTrace ();
344
+ } catch (InterruptedException e ) {
345
+ e .printStackTrace ();
320
346
}
321
347
}
322
348
},
323
- 10
349
+ 0
324
350
);
325
351
} else {
326
352
test ();
@@ -341,6 +367,7 @@ private synchronized void receiveReject(Message message, Link link) throws Remot
341
367
342
368
private synchronized void receiveAccept (Message message , Link link ) throws RemoteException {
343
369
System .out .println (id + ":Receive Accept from " + link .getReceiver (id ));
370
+ System .out .println (id + ": Changing test edge from" + link .getReceiver (id ) + " to null" );
344
371
testEdge = null ;
345
372
System .out .println (id + ": Accept " + link .getWeight () + " " + weightBestAdjacent );
346
373
if (link .getWeight () < weightBestAdjacent ) {
@@ -352,6 +379,9 @@ private synchronized void receiveAccept(Message message, Link link) throws Remot
352
379
353
380
private synchronized void report () throws RemoteException {
354
381
System .out .println (id + ":Report find_count:" + findCount .get () + " testEdge:" + testEdge );
382
+ if (testEdge != null ) {
383
+ System .out .println (id + ": " + testEdge .getReceiver (id ) + " " + testEdge .getWeight ());
384
+ }
355
385
if (findCount .get () == 0 && testEdge == null ) {
356
386
this .state = NodeState .FOUND ;
357
387
Message msg = new Message (MessageType .REPORT , weightBestAdjacent );
@@ -361,22 +391,27 @@ private synchronized void report() throws RemoteException {
361
391
@ Override
362
392
public void run () {
363
393
try {
364
- nodes [inBranch .getReceiver (id )].receive (msg , inBranch );
394
+ int receiverId = inBranch .getReceiver (id );
395
+ Thread .sleep (10 );
396
+ nodes [receiverId ].receive (msg , inBranch );
365
397
} catch (RemoteException e ) {
366
398
e .printStackTrace ();
399
+ } catch (InterruptedException e ) {
400
+ e .printStackTrace ();
367
401
}
368
402
}
369
403
},
370
- 10
404
+ 0
371
405
);
372
406
}
373
407
}
374
408
375
409
@ Override
376
410
public synchronized void receiveReport (Message message , Link link ) throws RemoteException {
377
- System .out .println (id + ":Receive Report from " + link .getReceiver (id )+ " state:" + state );
378
- System .out .println (findCount . get ());
411
+ System .out .println (id + ":Receive Report from " + link .getReceiver (id ) + " state:" + state + " find_count:" + findCount );
412
+ System .out .println (id + " " + link . getWeight () + " " + inBranch . getWeight ());
379
413
if (link .compareTo (inBranch ) != 0 ) {
414
+ System .out .println (id + ": Decrementing findcount" );
380
415
findCount .getAndDecrement ();
381
416
System .out .println (id + ": Report " + message .getWeight () + " " + weightBestAdjacent );
382
417
if (message .getWeight () < weightBestAdjacent ) {
@@ -388,18 +423,22 @@ public synchronized void receiveReport(Message message, Link link) throws Remote
388
423
} else {
389
424
if (this .state == NodeState .FIND ) {
390
425
// TODO: append message to the message queue
426
+ System .out .println ("Adding report to the queue" );
391
427
new java .util .Timer ().schedule (
392
428
new java .util .TimerTask () {
393
429
@ Override
394
430
public void run () {
395
431
try {
396
- nodes [id ].receiveReport (message , link );
432
+ Thread .sleep (500 );
433
+ nodes [id ].receive (message , link );
397
434
} catch (RemoteException e ) {
398
435
e .printStackTrace ();
436
+ } catch (InterruptedException e ) {
437
+ e .printStackTrace ();
399
438
}
400
439
}
401
440
},
402
- 500 );
441
+ 0 );
403
442
} else {
404
443
if (message .getWeight () > weightBestAdjacent )
405
444
changeRoot ();
@@ -414,7 +453,7 @@ public void run() {
414
453
}
415
454
}
416
455
417
- private synchronized void changeRoot () throws RemoteException {
456
+ private void changeRoot () throws RemoteException {
418
457
System .out .println (id + ":Change Root" );
419
458
420
459
if (bestEdge .getState () == LinkState .IN_MST ) {
@@ -425,34 +464,44 @@ private synchronized void changeRoot() throws RemoteException {
425
464
@ Override
426
465
public void run () {
427
466
try {
428
- System .out .println (id + ":Sending change root to " + bestEdge .getReceiver (id ));
429
- nodes [bestEdge .getReceiver (id )].receiveChangeRoot (msg , bestEdge );
467
+ int receiverId = bestEdge .getReceiver (id );
468
+ Thread .sleep (10 );
469
+ System .out .println (id + ":Sending change root to " + receiverId );
470
+ nodes [receiverId ].receive (msg , bestEdge );
430
471
431
472
} catch (RemoteException e ) {
432
473
e .printStackTrace ();
474
+ } catch (InterruptedException e ) {
475
+ e .printStackTrace ();
433
476
}
434
477
}
435
478
},
436
- 10
479
+ 0
437
480
);
438
481
} else {
439
482
Message msg = new Message (MessageType .CONNECT , level );
440
483
// TODO: check if bestEdge is adjacent to the node
484
+ System .out .println (id + ":Sending connect message " + bestEdge .getReceiver (id ));
441
485
new java .util .Timer ().schedule (
442
486
new java .util .TimerTask () {
443
487
@ Override
444
488
public void run () {
445
489
try {
446
- nodes [bestEdge .getReceiver (id )].receiveChangeRoot (msg , bestEdge );
447
- System .out .println (id + ": changing bestEdge to " + bestEdge .getReceiver (id ) + " to IN_MST" );
448
- bestEdge .setState (LinkState .IN_MST );
490
+ Link copy = new Link (bestEdge );
491
+ int receiverId = copy .getReceiver (id );
492
+ Thread .sleep (10 );
493
+ nodes [receiverId ].receive (msg , copy );
449
494
} catch (RemoteException e ) {
450
495
e .printStackTrace ();
496
+ } catch (InterruptedException e ) {
497
+ e .printStackTrace ();
451
498
}
452
499
}
453
500
},
454
- 10
501
+ 0
455
502
);
503
+ System .out .println (id + ": changing bestEdge to " + bestEdge .getReceiver (id ) + " to IN_MST" );
504
+ bestEdge .setState (LinkState .IN_MST );
456
505
457
506
}
458
507
}
0 commit comments