@@ -52,12 +52,7 @@ using namespace arangodb;
52
52
53
53
using arangodb::basics::JsonHelper;
54
54
55
- // //////////////////////////////////////////////////////////////////////////////
56
- // / @brief single instance of ClusterInfo - will live as long as the server is
57
- // / running
58
- // //////////////////////////////////////////////////////////////////////////////
59
-
60
- static ClusterInfo Instance;
55
+ static std::unique_ptr<ClusterInfo> _instance;
61
56
62
57
// //////////////////////////////////////////////////////////////////////////////
63
58
// / @brief a local helper to report errors and messages
@@ -267,17 +262,26 @@ void CollectionInfoCurrent::copyAllJsons() {
267
262
}
268
263
}
269
264
265
+ // //////////////////////////////////////////////////////////////////////////////
266
+ // / @brief create the clusterinfo instance
267
+ // //////////////////////////////////////////////////////////////////////////////
268
+
269
+ void ClusterInfo::createInstance (AgencyCallbackRegistry* agencyCallbackRegistry) {
270
+ _instance.reset (new ClusterInfo (agencyCallbackRegistry));
271
+ }
272
+
270
273
// //////////////////////////////////////////////////////////////////////////////
271
274
// / @brief returns an instance of the cluster info class
272
275
// //////////////////////////////////////////////////////////////////////////////
273
276
274
- ClusterInfo* ClusterInfo::instance () { return &Instance ; }
277
+ ClusterInfo* ClusterInfo::instance () { return _instance. get () ; }
275
278
276
279
// //////////////////////////////////////////////////////////////////////////////
277
280
// / @brief creates a cluster info object
278
281
// //////////////////////////////////////////////////////////////////////////////
279
282
280
- ClusterInfo::ClusterInfo () : _agency(), _uniqid() {
283
+ ClusterInfo::ClusterInfo (AgencyCallbackRegistry* agencyCallbackRegistry)
284
+ : _agency(), _agencyCallbackRegistry(agencyCallbackRegistry), _uniqid() {
281
285
_uniqid._currentValue = _uniqid._upperValue = 0ULL ;
282
286
283
287
// Actual loading into caches is postponed until necessary
@@ -1060,8 +1064,6 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
1060
1064
return setErrormsg (TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
1061
1065
errorMsg);
1062
1066
}
1063
- uint64_t index = res._index ;
1064
-
1065
1067
std::vector<ServerID> DBServers = getCurrentDBServers ();
1066
1068
int count = 0 ; // this counts, when we have to reload the DBServers
1067
1069
@@ -1101,11 +1103,10 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
1101
1103
return setErrormsg (TRI_ERROR_NO_ERROR, errorMsg);
1102
1104
}
1103
1105
}
1104
-
1106
+
1105
1107
res.clear ();
1106
- res = ac.watchValue (" Current/Version" , index,
1107
- getReloadServerListTimeout () / interval, false );
1108
- index = res._index ;
1108
+ _agencyCallbackRegistry->awaitNextChange (" Current/Version" , getReloadServerListTimeout () / interval);
1109
+
1109
1110
if (++count >= static_cast <int >(getReloadServerListTimeout () / interval)) {
1110
1111
// We update the list of DBServers every minute in case one of them
1111
1112
// was taken away since we last looked. This also helps (slightly)
@@ -1177,7 +1178,6 @@ int ClusterInfo::dropDatabaseCoordinator(std::string const& name,
1177
1178
return setErrormsg (TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
1178
1179
errorMsg);
1179
1180
}
1180
- uint64_t index = res._index ;
1181
1181
1182
1182
std::string where = " Current/Databases/" + name;
1183
1183
while (TRI_microtime () <= endTime) {
@@ -1199,8 +1199,7 @@ int ClusterInfo::dropDatabaseCoordinator(std::string const& name,
1199
1199
}
1200
1200
}
1201
1201
res.clear ();
1202
- res = ac.watchValue (" Current/Version" , index, interval, false );
1203
- index = res._index ;
1202
+ _agencyCallbackRegistry->awaitNextChange (" Current/Version" , interval);
1204
1203
}
1205
1204
return setErrormsg (TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
1206
1205
}
@@ -1270,7 +1269,6 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
1270
1269
return setErrormsg (TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
1271
1270
errorMsg);
1272
1271
}
1273
- uint64_t index = res._index ;
1274
1272
1275
1273
std::string const where =
1276
1274
" Current/Collections/" + databaseName + " /" + collectionID;
@@ -1310,8 +1308,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
1310
1308
}
1311
1309
1312
1310
res.clear ();
1313
- res = ac.watchValue (" Current/Version" , index, interval, false );
1314
- index = res._index ;
1311
+ _agencyCallbackRegistry->awaitNextChange (" Current/Version" , interval);
1315
1312
}
1316
1313
1317
1314
// LOG(ERR) << "GOT TIMEOUT. NUMBEROFSHARDS: " << numberOfShards;
@@ -1367,7 +1364,6 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName,
1367
1364
return setErrormsg (TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
1368
1365
errorMsg);
1369
1366
}
1370
- uint64_t index = res._index ;
1371
1367
1372
1368
// monitor the entry for the collection
1373
1369
std::string const where =
@@ -1397,8 +1393,7 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName,
1397
1393
}
1398
1394
1399
1395
res.clear ();
1400
- res = ac.watchValue (" Current/Version" , index, interval, false );
1401
- index = res._index ;
1396
+ _agencyCallbackRegistry->awaitNextChange (" Current/Version" , interval);
1402
1397
}
1403
1398
return setErrormsg (TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
1404
1399
}
@@ -1741,7 +1736,6 @@ int ClusterInfo::ensureIndexCoordinator(
1741
1736
return setErrormsg (TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
1742
1737
errorMsg);
1743
1738
}
1744
- uint64_t index = res._index ;
1745
1739
1746
1740
std::string where =
1747
1741
" Current/Collections/" + databaseName + " /" + collectionID;
@@ -1813,8 +1807,7 @@ int ClusterInfo::ensureIndexCoordinator(
1813
1807
}
1814
1808
}
1815
1809
res.clear ();
1816
- res = ac.watchValue (" Current/Version" , index, interval, false );
1817
- index = res._index ;
1810
+ _agencyCallbackRegistry->awaitNextChange (" Current/Version" , interval);
1818
1811
}
1819
1812
1820
1813
return setErrormsg (TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
@@ -1959,7 +1952,6 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
1959
1952
return setErrormsg (TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
1960
1953
errorMsg);
1961
1954
}
1962
- uint64_t index = res._index ;
1963
1955
1964
1956
std::string where =
1965
1957
" Current/Collections/" + databaseName + " /" + collectionID;
@@ -2001,8 +1993,7 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
2001
1993
}
2002
1994
2003
1995
res.clear ();
2004
- res = ac.watchValue (" Current/Version" , index, interval, false );
2005
- index = res._index ;
1996
+ _agencyCallbackRegistry->awaitNextChange (" Current/Version" , interval);
2006
1997
}
2007
1998
2008
1999
return setErrormsg (TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
0 commit comments