10000 Merge remote-tracking branch 'origin/freeze' into experimental · rpytel1/distributed-algorithms@6a81071 · GitHub
[go: up one dir, main page]

Skip to content

Commit 6a81071

Browse files
author
rpytel1
committed
Merge remote-tracking branch 'origin/freeze' into experimental
2 parents a906c64 + c32d66d commit 6a81071

File tree

10 files changed

+94
-36
lines changed

10 files changed

+94
-36
lines changed

Assignment3/src/assignment3/link/Link.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void setDelay(int delay) {
6969
this.delay = delay;
7070
}
7171

72-
public int getReceiver(int id) {
72+
public synchronized int getReceiver(int id) {
7373
if (id == this.to)
7474
return this.from;
7575
return this.to;

Assignment3/src/assignment3/node/Node.java

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void wakeUp() throws RemoteException {
5858
public void run() {
5959
try {
6060
int receiverId = edge.getReceiver(id);
61-
Thread.sleep(10);
61+
Thread.sleep(100);
6262
nodes[receiverId].receive(msg, edge);
6363
} catch (RemoteException e) {
6464
e.printStackTrace();
@@ -72,7 +72,12 @@ public void run() {
7272
}
7373

7474
@Override
75-
public void receive(Message message, Link link) throws RemoteException {
75+
public synchronized void receive(Message message, Link link) throws RemoteException {
76+
try {
77+
Thread.sleep((long)(new Random().nextDouble()*10));
78+
} catch (InterruptedException e) {
79+
e.printStackTrace();
80+
}
7681
Link myLink = getMyLink(link);
7782
switch (message.getType()) {
7883
case TEST:
@@ -97,9 +102,12 @@ public void receive(Message message, Link link) throws RemoteException {
97102
receiveChangeRoot(message, myLink);
98103
break;
99104
}
105+
if (testEdge != null) {
106+
System.out.println(id + ": test edge " + id + " to " + testEdge.getReceiver(id) + " fragment " + fragmentName);
107+
}
100108
}
101109

102-
public void updateLinks(Link link) {
110+
public synchronized void updateLinks(Link link) {
103111
List<Link> linkLists = new ArrayList<>(links);
104112
Link linkToUpdate = new Link();
105113
for (Link linkEntry : linkLists) {
@@ -116,7 +124,7 @@ public void send(Message message, Link link) throws RemoteException {
116124

117125
}
118126

119-
private Link getMyLink(Link link) {
127+
private synchronized Link getMyLink(Link link) {
120128
List<Link> linkLists = new ArrayList<>(links);
121129
Link linkToUpdate = new Link();
122130
for (Link linkEntry : linkLists) {
@@ -135,21 +143,24 @@ public synchronized void receiveConnect(Message message, Link link) throws Remot
135143
// the case that l < l' and fragment F is absorbed by F'
136144
if (message.getLevel() < this.level) {
137145
System.out.println(id + ": changing link to " + link.getReceiver(id) + " to IN_MST");
138-
link.setState(LinkState.IN_MST); // TODO: to be checked
146+
link.setState(LinkState.IN_MST);
139147
Message msg = new Message(MessageType.INITIATE, level, fragmentName, state);
140148
new java.util.Timer().schedule(
141149
new java.util.TimerTask() {
142150
@Override
143151
public void run() {
144152
try {
145153
System.out.println(id + "Sending Receive Initiate to " + link.getReceiver(id));
154+
Thread.sleep(100);
146155
nodes[link.getReceiver(id)].receive(msg, link);
147156
} catch (RemoteException e) {
148157
e.printStackTrace();
158+
} catch (InterruptedException e) {
159+
e.printStackTrace();
149160
}
150161
}
151162
},
152-
10
163+
0
153164
);
154165
if (state == NodeState.FIND) {
155166
System.out.println(id + ": Connect Incrementing findcount");
@@ -164,7 +175,7 @@ public void run() {
164175
@Override
165176
public void run() {
166177
try {
167-
Thread.sleep(300);
178+
Thread.sleep(3000);
168179
nodes[id].receive(message, link);
169180
} catch (RemoteException e) {
170181
e.printStackTrace();
@@ -184,7 +195,7 @@ public void run() {
184195
public void run() {
185196
try {
186197
int receiverId = link.getReceiver(id);
187-
Thread.sleep(40);//It is added to preserve order of messages sended from certain node
198+
Thread.sleep(400);//It is added to preserve order of messages sended from certain node
188199
nodes[receiverId].receive(msg, link);
189200
} catch (RemoteException e) {
190201
e.printStackTrace();
@@ -217,7 +228,7 @@ private synchronized void receiveInitiate(Message message, Link link) throws Rem
217228
public void run() {
218229
try {
219230
Link copy = new Link(adjescentLink);
220-
Thread.sleep(10);
231+
Thread.sleep(100);
221232
nodes[copy.getReceiver(id)].receive(msg, copy);
222233
} catch (RemoteException e) {
223234
e.printStackTrace();
@@ -244,15 +255,16 @@ private synchronized void test() throws RemoteException {
244255
System.out.println(id + ":Test");
245256
if (links.stream().anyMatch(p -> p.getState() == LinkState.CANDIDATE_IN_MST)) {
246257
testEdge = links.stream().filter(p -> p.getState() == LinkState.CANDIDATE_IN_MST).min 10000 (new LinkComparator()).get();
258+
247259
Message msg = new Message(MessageType.TEST, level, fragmentName);
248-
System.out.println(id + ":Sending Receive test to " + testEdge.getReceiver(id));
260+
System.out.println(id + ":Sending Receive test to " + testEdge.getReceiver(id) + " fragment" + fragmentName);
249261
int reciverID = testEdge.getReceiver(id);
250262
new java.util.Timer().schedule(
251263
new java.util.TimerTask() {
252264
@Override
253265
public void run() {
254266
try {
255-
Thread.sleep(10);
267+
Thread.sleep(100);
256268
nodes[reciverID].receive(msg, testEdge);
257269
} catch (RemoteException e) {
258270
e.printStackTrace();
@@ -277,6 +289,7 @@ public void run() {
277289
@Override
278290
public synchronized void receiveTest(Message message, Link link) throws RemoteException {
279291
System.out.println(id + ":Receive Test from " + link.getReceiver(id));
292+
280293
if (state == NodeState.SLEEPING) {
281294
wakeUp();
282295
}
@@ -288,7 +301,7 @@ public synchronized void receiveTest(Message message, Link link) throws RemoteEx
288301
@Override
289302
public void run() {
290303
try {
291-
Thread.sleep(300);
304+
Thread.sleep(3000);
292305
nodes[id].receiveTest(message, link);
293306
} catch (RemoteException e) {
294307
e.printStackTrace();
@@ -310,7 +323,7 @@ public void run() {
310323
@Override
311324
public void run() {
312325
try {
313-
Thread.sleep(10);
326+
Thread.sleep(100);
314327
nodes[link.getReceiver(id)].receive(msg, link);
315328
} catch (RemoteException e) {
316329
e.printStackTrace();
@@ -324,10 +337,18 @@ public void run() {
324337
} else {
325338
if (link.getState() == LinkState.CANDIDATE_IN_MST) {
326339
System.out.println(id + ": changing link to " + link.getReceiver(id) + " to NOT_IN_MST");
327-
link.setState(LinkState.NOT_IN_MST); //TODO: think about local copies
340+
link.setState(LinkState.NOT_IN_MST); //TODO: maybe should change test edge to null?
328341
}
329342
// if the node hasn't set this edge as testEdge then it sends a reject
330343
// because they are in the same fragment with the sender
344+
if (testEdge != null) {
345+
System.out.println(id + ":" + testEdge.getWeight() + " " + link.getWeight());
346+
System.out.println(id + ":" + link.compareTo(testEdge));
347+
348+
} else {
349+
350+
System.out.println(id + ": test edge null");
351+
}
331352
if (link.compareTo(testEdge) != 0) {
332353

333354
new java.util.Timer().schedule(
@@ -337,7 +358,7 @@ public void run() {
337358
try {
338359
System.out.println(id + ":Sending Reject to " + link.getReceiver(id));
339360
Message msg = new Message(MessageType.REJECT);
340-
Thread.sleep(10);
361+
Thread.sleep(100);
341362
nodes[link.getReceiver(id)].receive(msg, link);
342363
} catch (RemoteException e) {
343364
e.printStackTrace();
@@ -367,9 +388,7 @@ private synchronized void receiveReject(Message message, Link link) throws Remot
367388

368389
private synchronized void receiveAccept(Message message, Link link) throws RemoteException {
369390
System.out.println(id + ":Receive Accept from " + link.getReceiver(id));
370-
System.out.println(id + ": Changing test edge from" + link.getReceiver(id) + " to null");
371391
testEdge = null;
372-
System.out.println(id + ": Accept " + link.getWeight() + " " + weightBestAdjacent);
373392
if (link.getWeight() < weightBestAdjacent) {
374393
bestEdge = link;
375394
weightBestAdjacent = link.getWeight();
@@ -392,7 +411,7 @@ private synchronized void report() throws RemoteException {
392411
public void run() {
393412
try {
394413
int receiverId = inBranch.getReceiver(id);
395-
Thread.sleep(10);
414+
Thread.sleep(100);
396415
nodes[receiverId].receive(msg, inBranch);
397416
} catch (RemoteException e) {
398417
e.printStackTrace();
@@ -416,20 +435,19 @@ public synchronized void receiveReport(Message message, Link link) throws Remote
416435
System.out.println(id + ": Report " + message.getWeight() + " " + weight F438 BestAdjacent);
417436
if (message.getWeight() < weightBestAdjacent) {
418437
weightBestAdjacent = message.getWeight();
419-
System.out.println(id + ": Report BestEdge changed from " + bestEdge.getReceiver(id) + "to " + bestEdge.getReceiver(id));
438+
// System.out.println(id + ": Report BestEdge changed from " + bestEdge.getReceiver(id) + "to " + bestEdge.getReceiver(id));
420439
bestEdge = link;
421440
}
422441
report();
423442
} else {
424443
if (this.state == NodeState.FIND) {
425-
// TODO: append message to the message queue
426444
System.out.println("Adding report to the queue");
427445
new java.util.Timer().schedule(
428446
new java.util.TimerTask() {
429447
@Override
430448
public void run() {
431449
try {
432-
Thread.sleep(500);
450+
Thread.sleep(5000);
433451
nodes[id].receive(message, link);
434452
} catch (RemoteException e) {
435453
e.printStackTrace();
@@ -458,14 +476,13 @@ private void changeRoot() throws RemoteException {
458476

459477
if (bestEdge.getState() == LinkState.IN_MST) {
460478
Message msg = new Message(MessageType.CHANGE_ROOT);
461-
// TODO: check if bestEdge is adjacent to the node
462479
new java.util.Timer().schedule(
463480
new java.util.TimerTask() {
464481
@Override
465482
public void run() {
466483
try {
467484
int receiverId = bestEdge.getReceiver(id);
468-
Thread.sleep(10);
485+
Thread.sleep(100);
469486
System.out.println(id + ":Sending change root to " + receiverId);
470487
nodes[receiverId].receive(msg, bestEdge);
471488

@@ -480,7 +497,6 @@ public void run() {
480497
);
481498
} else {
482499
Message msg = new Message(MessageType.CONNECT, level);
483-
// TODO: check if bestEdge is adjacent to the node
484500
System.out.println(id + ":Sending connect message " + bestEdge.getReceiver(id));
485501
new java.util.Timer().schedule(
486502
new java.util.TimerTask() {
@@ -489,7 +505,7 @@ public void run() {
489505
try {
490506
Link copy = new Link(bestEdge);
491507
int receiverId = copy.getReceiver(id);
492-
Thread.sleep(10);
508+
Thread.sleep(100);
493509
nodes[receiverId].receive(msg, copy);
494510
} catch (RemoteException e) {
495511
e.printStackTrace();

Assignment3/src/assignment3/server/Client1.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public static void main(String[] args) throws AlreadyBoundException, NotBoundExc
3333
links = new HashMap<Integer, List<Link>>();
3434
initializeEdges();
3535
// "clients" files contain the name of the remote processes used
36-
BufferedReader br = new BufferedReader(new FileReader("tests/nodes1.txt"));
36+
BufferedReader br = new BufferedReader(new FileReader("tests/nodes_cr4.txt"));
3737
String line = br.readLine();
3838
numProc = Integer.parseInt(line);
3939
localProc = 0;
@@ -68,15 +68,12 @@ public static void main(String[] args) throws AlreadyBoundException, NotBoundExc
6868
i++;
6969
}
7070
br.close();
71-
//System.out.println("Press enter to continue");
72-
//Scanner scan = new Scanner(System.in);
73-
//scan.nextLine();
7471
setRegistry();
7572
System.out.println("Client 1 started");
7673
}
7774

7875
public static void initializeEdges() throws IOException{
79-
BufferedReader br = new BufferedReader(new FileReader("tests/edges1.txt"));
76+
BufferedReader br = new BufferedReader(new FileReader("tests/edgescr4.txt"));
8077
String line;
8178
int node1;
8279
int node2;
@@ -112,7 +109,7 @@ public static void initializeEdges() throws IOException{
112109
br.close();
113110
}
114111

115-
public static void setRegistry() throws NotBoundException, NumberFormatException, IOException{
112+
public static void setRegistry() throws NotBoundException, NumberFormatException, IOException, InterruptedException {
116113
Registry registry = LocateRegistry.getRegistry("localhost", Constant.RMI_PORT);
117114
RMI_IDS = new IComponent[numProc]; // the remote process array is instantiated
118115
Thread[] myThreads = new Thread[numProc]; // and numProc number of threads are created
@@ -138,15 +135,14 @@ public static void setRegistry() throws NotBoundException, NumberFormatException
138135
RemoteProcess p = new RemoteProcess(RMI_IDS[localIDS.get(i)]);
139136
myThreads[i] = new Thread(p); // and a new thread is created
140137
}
141-
142-
//System.out.println("Press enter to continue");
143-
//Scanner scan = new Scanner(System.in);
144-
//scan.nextLine();
138+
145139
for (int i=0; i<numProc; i++){
146140
while(RMI_IDS[i].getEntities()==null){
147141
System.err.println("Node "+i+" not initialized yet");
148142
}
149143
}
144+
System.out.println("Tap when you are ready");
145+
System.in.read();
150146
myThreads[0].start();
151147
}
152148
}

tests/edges2.txt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
0 1 1.5 1000
2+
0 2 1.8 1400
3+
0 3 1.4 1000
4+
0 4 2.0 2000
5+
1 2 1.3 1400
6+
1 3 1.7 1000
7+
1 4 2.1 2000
8+
2 3 1.9 1400
9+
2 4 2.5 1000
10+
3 4 2.3 2333

tests/edges3.txt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
0 1 1.5 1000
2+
0 2 1.2 1400
3+
0 3 1.4 1000
4+
0 4 2.0 2000
5+
1 2 1.3 1400
6+
1 3 1.7 1000
7+
1 4 2.1 2000
8+
2 3 1.9 1400
9+
2 4 2.5 1000
10+
3 4 2.3 2333

tests/edgescr4.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
0 1 2.1 1000
2+
0 3 1.1 1000
3+
1 2 1.3 1000
4+
1 3 1.2 1000
5+
2 3 3.2 1000

tests/edgescr5.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
0 1 2.1 1000
2+
0 3 1.1 1000
3+
1 2 1.3 1000
4+
1 3 1.2 1000
5+
2 3 3.2 1000

tests/nodes2.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
5
2+
Client1 1
3+
Client2 2
4+
Client3 1
5+
Client4 2
6+
Client5 1

tests/nodes_cr4.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
4
2+
Client1 1
3+
Client2 1
4+
Client3 1
5+
Client4 1

tests/nodes_cr5.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
4
2+
Client1 1
3+
Client2 1
4+
Client3 1
5+
Client4 1

0 commit comments

Comments
 (0)
0