@@ -51,6 +51,41 @@ const Graph = graphGeneration.Graph;
51
51
52
52
const epsilon = 0.00001 ;
53
53
54
+ const runFinishedSuccessfully = ( stats ) => stats . state === "done" ;
55
+ const runFinishedUnsuccessfully = ( stats ) => stats . state === "canceled" || stats . state === "fatal error" ;
56
+ const runFinished = ( stats ) => runFinishedSuccessfully ( stats ) || runFinishedUnsuccessfully ( stats ) ;
57
+ const runCanceled = ( stats ) => stats . state === "canceled" ;
58
+
59
+ const waitUntilRunFinishedSuccessfully = function ( pid , maxWaitSeconds = 120 , sleepIntervalSeconds = 0.2 ) {
60
+ let wakeupsLeft = maxWaitSeconds / sleepIntervalSeconds ;
61
+ var status ;
62
+ do {
63
+ internal . sleep ( 0.2 ) ;
64
+ status = pregel . status ( pid ) ;
65
+ if ( wakeupsLeft -- === 0 ) {
66
+ assertTrue ( false , "timeout in pregel execution" ) ;
67
+ return ;
68
+ }
69
+ } while ( ! runFinished ( status ) ) ;
70
+
71
+ if ( runFinishedUnsuccessfully ( status ) ) {
72
+ assertTrue ( false , "Pregel run finished unsuccessfully in state " + status . state ) ;
73
+ return ;
74
+ }
75
+
76
+ return status ;
77
+ } ;
78
+
79
+ const uniquePregelResults = function ( documentCursor ) {
80
+ let myset = new Set ( ) ;
81
+ while ( documentCursor . hasNext ( ) ) {
82
+ const doc = documentCursor . next ( ) ;
83
+ assertTrue ( doc . result !== undefined , "Found no pregel result in document " + doc ) ;
84
+ myset . add ( doc . result ) ;
85
+ }
86
+ return myset ;
87
+ } ;
88
+
54
89
/**
55
90
* Assert that expected and actual are of the same type and that they are equal up to the tolerance epsilon.
56
91
* @param expected the expected value, a number
@@ -77,34 +112,21 @@ const assertAlmostEquals = function (expected, actual, epsilon, msg, expectedDes
77
112
}
78
113
} ;
79
114
80
- const runPregelInstance = function ( algName , graphName , parameters , query , maxWaitTimeSecs = 120 ) {
81
- const pid = pregel . start ( algName , graphName , parameters ) ;
82
- const sleepIntervalSecs = 0.2 ;
83
- let wakeupsLeft = maxWaitTimeSecs / sleepIntervalSecs ;
84
- while ( pregel . status ( pid ) . state !== "done" && wakeupsLeft > 0 ) {
85
- wakeupsLeft -- ;
86
- internal . sleep ( 0.2 ) ;
87
- }
88
- const statusState = pregel . status ( pid ) . state ;
89
- assertEqual ( statusState , "done" , `Pregel Job did never succeed. Status: ${ statusState } ` ) ;
90
- return db . _query ( query ) . toArray ( ) ;
91
- } ;
92
-
93
115
const dampingFactor = 0.85 ;
94
116
95
-
96
117
const testPageRankOnGraph = function ( vertices , edges , seeded = false ) {
97
118
db [ vColl ] . save ( vertices ) ;
98
119
db [ eColl ] . save ( edges ) ;
99
- const query = `
100
- FOR v in ${ vColl }
101
- RETURN {"_key": v._key, "value": v.pagerank}
102
- ` ;
103
120
let parameters = { maxGSS : 100 , resultField : "pagerank" } ;
104
121
if ( seeded ) {
105
122
parameters . sourceField = "input" ;
106
123
}
107
- const result = runPregelInstance ( "pagerank" , graphName , parameters , query ) ;
124
+ const pid = pregel . start ( "pagerank" , graphName , parameters ) ;
125
+ waitUntilRunFinishedSuccessfully ( pid ) ;
126
+ const result = db . _query ( `
127
+ FOR v in ${ vColl }
128
+ RETURN {"_key": v._key, "value": v.pagerank}
129
+ ` ) . toArray ( ) ;
108
130
109
131
const graph = new Graph ( vertices , edges ) ;
110
132
for ( const v of result ) {
@@ -130,15 +152,15 @@ const testPageRankOnGraph = function (vertices, edges, seeded = false) {
130
152
const testHITSKleinbergOnGraph = function ( vertices , edges ) {
131
153
db [ vColl ] . save ( vertices ) ;
132
154
db [ eColl ] . save ( edges ) ;
133
- const query = `
134
- FOR v in ${ vColl }
135
- RETURN {"_key": v._key, "value": {hits_hub: v.hits_hub, hits_auth: v.hits_auth}}
136
- ` ;
137
155
const numberIterations = 50 ;
138
156
139
157
let parameters = { resultField : "hits" , maxNumIterations : numberIterations } ;
140
- let algName = "hitskleinberg" ;
141
- const result = runPregelInstance ( algName , graphName , parameters , query ) ;
158
+ const pid = pregel . start ( "hitskleinberg" , graphName , parameters ) ;
159
+ waitUntilRunFinishedSuccessfully ( pid ) ;
160
+ const result = db . _query ( `
161
+ FOR v in ${ vColl }
162
+ RETURN {"_key": v._key, "value": {hits_hub: v.hits_hub, hits_auth: v.hits_auth}}
163
+ ` ) . toArray ( ) ;
142
164
143
165
const hits = new HITS ( ) ;
144
166
const graph = new Graph ( vertices , edges ) ;
@@ -177,16 +199,16 @@ const testHITSKleinbergOnGraph = function (vertices, edges) {
177
199
const testHITSKleinbergThresholdOnGraph = function ( vertices , edges ) {
178
200
db [ vColl ] . save ( vertices ) ;
179
201
db [ eColl ] . save ( edges ) ;
180
- const query = `
181
- FOR v in ${ vColl }
182
- RETURN {"_key": v._key, "value": {hits_hub: v.hits_hub, hits_auth: v.hits_auth}}
183
- ` ;
184
202
const numberIterations = 50 ;
185
203
const threshold = 200.0 ; // must be less than 1000.0, the value added in HITSKleinberg in reportFakeDifference()
186
204
187
205
let parameters = { resultField : "hits" , maxNumIterations : numberIterations , threshold : threshold } ;
188
- let algName = "hitskleinberg" ;
189
- const result = runPregelInstance ( algName , graphName , parameters , query ) ;
206
+ const pid = pregel . start ( "hitskleinberg" , graphName , parameters ) ;
207
+ waitUntilRunFinishedSuccessfully ( pid ) ;
208
+ const result = db . _query ( `
209
+ FOR v in ${ vColl }
210
+ RETURN {"_key": v._key, "value": {hits_hub: v.hits_hub, hits_auth: v.hits_auth}}
211
+ ` ) . toArray ( ) ;
190
212
191
213
const hits = new HITS ( ) ;
192
214
const graph = new Graph ( vertices , edges ) ;
@@ -230,13 +252,14 @@ const pregelRunSmallInstanceGetComponents = function (algName, graphName, parame
230
252
assertTrue ( parameters . hasOwnProperty ( "resultField" ) ,
231
253
`Malformed test: the parameter "parameters" of pregelRunSmallInstanceGetComponents
232
254
must have an attribute "resultField"` ) ;
233
- const query = `
255
+ const pid = pregel . start ( algName , graphName , parameters ) ;
256
+ waitUntilRunFinishedSuccessfully ( pid ) ;
257
+ return db . _query ( `
234
258
FOR v IN ${ vColl }
235
259
COLLECT ${ parameters . resultField } = v.result WITH COUNT INTO size
236
260
SORT size DESC
237
261
RETURN {${ parameters . resultField } , size}
238
- ` ;
239
- return runPregelInstance ( algName , graphName , parameters , query ) ;
262
+ ` ) . toArray ( ) ;
240
263
} ;
241
264
242
265
const makeSetUp = function ( smart , smartAttribute , numberOfShards ) {
@@ -294,11 +317,12 @@ const testSubgraphs = function (algName, graphName, parameters, expectedSizes) {
294
317
} ) ;
295
318
if ( parameters . hasOwnProperty ( "test" ) && parameters . test ) {
296
319
delete parameters . test ;
297
- const query = `
298
- FOR v in ${ vColl }
299
- RETURN {"vertex": v._key, "result": v.result}
300
- ` ;
301
- runPregelInstance ( algName , graphName , parameters , query ) ;
320
+ const pid = pregel . start ( algName , graphName , parameters ) ;
321
+ waitUntilRunFinishedSuccessfully ( pid ) ;
322
+ db . _query ( `
323
+ FOR v in ${ vColl }
324
+ RETURN {"vertex": v._key, "result": v.result}
325
+ ` ) . toArray ( ) ;
302
326
return ;
303
327
}
304
328
const computedComponents = pregelRunSmallInstanceGetComponents ( algName , graphName , parameters ) ;
@@ -1035,12 +1059,14 @@ function makeLabelPropagationTestSuite(isSmart, smartAttribute, numberOfShards)
1035
1059
const { vertices, edges} = graphGenerator ( verticesEdgesGenerator ( vColl , "v" ) ) . makeDirectedCycle ( length ) ;
1036
1060
db [ vColl ] . save ( vertices ) ;
1037
1061
db [ eColl ] . save ( edges ) ;
1038
- const query = `
1062
+
1063
+ const pid = pregel . start ( "labelpropagation" , graphName , { maxGSS : 100 , resultField : "community" } ) ;
1064
+ waitUntilRunFinishedSuccessfully ( pid ) ;
1065
+ const result = db . _query ( `
1039
1066
FOR v in ${ vColl }
1040
1067
RETURN {"_key": v._key, "community": v.community}
1041
- ` ;
1042
- const result = runPregelInstance ( "labelpropagation" , graphName ,
1043
- { maxGSS : 100 , resultField : "community" } , query ) ;
1068
+ ` ) . toArray ( ) ;
1069
+
1044
1070
assertEqual ( result . length , length ) ;
1045
1071
for ( const value of result ) {
1046
1072
assertEqual ( value . community , result [ 0 ] . community ) ;
@@ -1056,14 +1082,15 @@ function makeLabelPropagationTestSuite(isSmart, smartAttribute, numberOfShards)
1056
1082
db [ vColl ] . save ( verticesEdges . vertices ) ;
1057
1083
db [ eColl ] . save ( verticesEdges . edges ) ;
1058
1084
1059
- const query = `
1085
+ const pid = pregel . start ( "labelpropagation" , graphName , { maxGSS : 100 , resultField : "community" } ) ;
1086
+ waitUntilRunFinishedSuccessfully ( pid ) ;
1087
+ const result = db . _query ( `
1060
1088
FOR v in ${ vColl }
1061
1089
COLLECT community = v.community WITH COUNT INTO size
1062
1090
SORT size DESC
1063
1091
RETURN {community, size}
1064
- ` ;
1065
- const result = runPregelInstance ( "labelpropagation" , graphName ,
1066
- { maxGSS : 100 , resultField : "community" } , query ) ;
1092
+ ` ) . toArray ( ) ;
1093
+
1067
1094
assertEqual ( result . length , 2 ) ;
1068
1095
assertEqual ( result [ 0 ] . size , length ) ;
1069
1096
assertEqual ( result [ 1 ] . size , length ) ;
@@ -1080,19 +1107,20 @@ function makeLabelPropagationTestSuite(isSmart, smartAttribute, numberOfShards)
1080
1107
1081
1108
db [ eColl ] . save ( makeEdgeBetweenVertices ( vColl , 0 , "v0" , 0 , "v1" ) ) ;
1082
1109
1083
- const query = `
1110
+ const pid = pregel . start ( "labelpropagation" , graphName , { maxGSS : 100 , resultField : "community" } ) ;
1111
+ waitUntilRunFinishedSuccessfully ( pid ) ;
1112
+ const result = db . _query ( `
1084
1113
FOR v in ${ vColl }
1085
1114
COLLECT community = v.community WITH COUNT INTO size
1086
1115
SORT size DESC
1087
1116
RETURN {community, size}
1088
- ` ;
1117
+ ` ) . toArray ( ) ;
1118
+
1089
1119
// expected (depending on the "random" distribution of initial ids) that
1090
1120
// - all vertices are in one community:
1091
1121
// if the least initial value of vertices in the component with label v0,
1092
1122
// is less than the least initial value of vertices in the component with label v1,
1093
1123
// - or two communities otherwise.
1094
- const result = runPregelInstance ( "labelpropagation" , graphName ,
1095
- { maxGSS : 100 , resultField : "community" } , query ) ;
1096
1124
assertTrue ( result . length === 1 || result . length === 2 , `Expected 1 or 2, obtained ${ result } ` ) ;
1097
1125
if ( result . length === 1 ) {
1098
1126
assertEqual ( result [ 0 ] . size , 2 * size ) ;
@@ -1480,12 +1508,14 @@ function makeSSSPTestSuite(isSmart, smartAttribute, numberOfShards) {
1480
1508
1481
1509
db [ vColl ] . save ( vertices ) ;
1482
1510
db [ eColl ] . save ( edges ) ;
1483
- const query = `
1511
+ let parameters = { source : `${ vColl } /${ source } ` , resultField : "distance" } ;
1512
+ const pid = pregel . start ( "sssp" , graphName , parameters ) ;
1513
+ waitUntilRunFinishedSuccessfully ( pid ) ;
1514
+ const result =
10000
db . _query ( `
1484
1515
FOR v in ${ vColl }
1485
1516
RETURN {"_key": v._key, "result": v.distance}
1486
- ` ;
1487
- let parameters = { source : `${ vColl } /${ source } ` , resultField : "distance" } ;
1488
- const result = runPregelInstance ( "sssp" , graphName , parameters , query ) ;
1517
+ ` ) . toArray ( ) ;
1518
+
1489
1519
const graph = new Graph ( vertices , edges ) ;
1490
1520
// assign to each vertex.value infinity
1491
1521
// (This is what Pregel, in fact, returns if a vertex is not reachable.
@@ -1712,8 +1742,12 @@ exports.makePagerankTestSuite = makePagerankTestSuite;
1712
1742
exports . makeSeededPagerankTestSuite = makeSeededPagerankTestSuite ;
1713
1743
exports . makeSSSPTestSuite = makeSSSPTestSuite ;
1714
1744
exports . makeHITSTestSuite = makeHITSTestSuite ;
1715
- exports . runPregelInstance = runPregelInstance ;
1716
1745
exports . epsilon = epsilon ;
1717
1746
exports . makeSetUp = makeSetUp ;
1718
1747
exports . makeTearDown = makeTearDown ;
1719
1748
exports . assertAlmostEquals = assertAlmostEquals ;
1749
+ exports . runFinished = runFinished ;
1750
+ exports . runCanceled = runCanceled ;
1751
+ exports . runFinishedSuccessfully = runFinishedSuccessfully ;
1752
+ exports . waitUntilRunFinishedSuccessfully = waitUntilRunFinishedSuccessfully ;
1753
+ exports . uniquePregelResults = uniquePregelResults ;
0 commit comments