8000 Kill remaining watchValues · lethalbrains/arangodb@b77cc77 · GitHub
[go: up one dir, main page]

Skip to content

Commit b77cc77

Browse files
author
Andreas Streichardt
committed
Kill remaining watchValues
1 parent 4c27442 commit b77cc77

File tree

7 files changed

+63
-32
lines changed

7 files changed

+63
-32
lines changed

arangod/Cluster/AgencyCallback.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
#include <chrono>
2929
#include <thread>
3030
#include "Basics/MutexLocker.h"
31-
#include <iostream>
3231

3332
using namespace arangodb;
3433

arangod/Cluster/AgencyCallbackRegistry.cpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@
2929
#include "Cluster/ServerState.h"
3030
#include "Endpoint/Endpoint.h"
3131
#include <ctime>
32+
#include <condition_variable>
33+
#include <mutex>
34+
#include <thread>
35+
#include <velocypack/Slice.h>
36+
#include <velocypack/velocypack-aliases.h>
3237

3338
using namespace arangodb;
3439

@@ -100,3 +105,27 @@ std::string AgencyCallbackRegistry::getEndpointUrl(uint32_t endpoint) {
100105

101106
return url.str();
102107
}
108+
109+
void AgencyCallbackRegistry::awaitNextChange(std::string const& key, double timeout) {
110+
auto maxWait = std::chrono::milliseconds(static_cast<int>(timeout * 1000));
111+
112+
std::condition_variable cv;
113+
114+
std::function<bool(VPackSlice const& result)> notify = [&](VPackSlice const& result) {
115+
LOG(DEBUG) << "Notifying change!";
116+
cv.notify_one();
117+
return true;
118+
};
119+
auto agencyCallback = std::make_shared<AgencyCallback>(_agency, key, notify);
120+
121+
std::mutex mtx;
122+
std::unique_lock<std::mutex> lck(mtx);
123+
// mop: hmmm if callback registering failed this will just wait for the timeout, which
124+
// should be ok I think?
125+
registerCallback(agencyCallback);
126+
LOG(DEBUG) << "Awaiting change!";
127+
if (cv.wait_for(lck, maxWait) == std::cv_status::timeout) {
128+
LOG(DEBUG) << "Reached timeout!";
129+
}
130+
unregisterCallback(agencyCallback);
131+
}

arangod/Cluster/AgencyCallbackRegistry.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,16 @@ class AgencyCallbackRegistry {
5050
//////////////////////////////////////////////////////////////////////////////
5151
bool unregisterCallback(std::shared_ptr<AgencyCallback>);
5252

53+
//////////////////////////////////////////////////////////////////////////////
54+
/// @brief get a callback by its key
55+
//////////////////////////////////////////////////////////////////////////////
5356
std::shared_ptr<AgencyCallback> getCallback(uint32_t);
5457

58+
//////////////////////////////////////////////////////////////////////////////
59+
/// @brief wait for a change of the value or timeout
60+
//////////////////////////////////////////////////////////////////////////////
61+
void awaitNextChange(std::string const& key, double timeout);
62+
5563
private:
5664
std::string getEndpointUrl(uint32_t);
5765

arangod/Cluster/ApplicationCluster.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ void ApplicationCluster::setupOptions(
101101
}
102102

103103
bool ApplicationCluster::prepare() {
104+
ClusterInfo::createInstance(_agencyCallbackRegistry);
104105
// set authentication data
105106
ServerState::instance()->setAuthentication(_username, _password);
106107

arangod/Cluster/ClusterInfo.cpp

Lines changed: 20 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,7 @@ using namespace arangodb;
5252

5353
using arangodb::basics::JsonHelper;
5454

55-
////////////////////////////////////////////////////////////////////////////////
56-
/// @brief single instance of ClusterInfo - will live as long as the server is
57-
/// running
58-
////////////////////////////////////////////////////////////////////////////////
59-
60-
static ClusterInfo Instance;
55+
static std::unique_ptr<ClusterInfo> _instance;
6156

6257
////////////////////////////////////////////////////////////////////////////////
6358
/// @brief a local helper to report errors and messages
@@ -267,17 +262,26 @@ void CollectionInfoCurrent::copyAllJsons() {
267262
}
268263
}
269264

265+
////////////////////////////////////////////////////////////////////////////////
266+
/// @brief create the clusterinfo instance
267+
////////////////////////////////////////////////////////////////////////////////
268+
269+
void ClusterInfo::createInstance(AgencyCallbackRegistry* agencyCallbackRegistry) {
270+
_instance.reset(new ClusterInfo(agencyCallbackRegistry));
271+
}
272+
270273
////////////////////////////////////////////////////////////////////////////////
271274
/// @brief returns an instance of the cluster info class
272275
////////////////////////////////////////////////////////////////////////////////
273276

274-
ClusterInfo* ClusterInfo::instance() { return &Instance; }
277+
ClusterInfo* ClusterInfo::instance() { return _instance.get(); }
275278

276279
////////////////////////////////////////////////////////////////////////////////
277280
/// @brief creates a cluster info object
278281
////////////////////////////////////////////////////////////////////////////////
279282

280-
ClusterInfo::ClusterInfo() : _agency(), _uniqid() {
283+
ClusterInfo::ClusterInfo(AgencyCallbackRegistry* agencyCallbackRegistry)
284+
: _agency(), _agencyCallbackRegistry(agencyCallbackRegistry), _uniqid() {
281285
_uniqid._currentValue = _uniqid._upperValue = 0ULL;
282286

283287
// Actual loading into caches is postponed until necessary
@@ -1060,8 +1064,6 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
10601064
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
10611065
errorMsg);
10621066
}
1063-
uint64_t index = res._index;
1064-
10651067
std::vector<ServerID> DBServers = getCurrentDBServers();
10661068
int count = 0; // this counts, when we have to reload the DBServers
10671069

@@ -1101,11 +1103,10 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
11011103
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
11021104
}
11031105
}
1104-
1106+
11051107
res.clear();
1106-
res = ac.watchValue("Current/Version", index,
1107-
getReloadServerListTimeout() / interval, false);
1108-
index = res._index;
1108+
_agencyCallbackRegistry->awaitNextChange("Current/Version", getReloadServerListTimeout() / interval);
1109+
11091110
if (++count >= static_cast<int>(getReloadServerListTimeout() / interval)) {
11101111
// We update the list of DBServers every minute in case one of them
11111112
// was taken away since we last looked. This also helps (slightly)
@@ -1177,7 +1178,6 @@ int ClusterInfo::dropDatabaseCoordinator(std::string const& name,
11771178
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
11781179
errorMsg);
11791180
}
1180-
uint64_t index = res._index;
11811181

11821182
std::string where = "Current/Databases/" + name;
11831183
while (TRI_microtime() <= endTime) {
@@ -1199,8 +1199,7 @@ int ClusterInfo::dropDatabaseCoordinator(std::string const& name,
11991199
}
12001200
}
12011201
res.clear();
1202-
res = ac.watchValue("Current/Version", index, interval, false);
1203-
index = res._index;
1202+
_agencyCallbackRegistry->awaitNextChange("Current/Version", interval);
12041203
}
12051204
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
12061205
}
@@ -1270,7 +1269,6 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
12701269
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
12711270
errorMsg);
12721271
}
1273-
uint64_t index = res._index;
12741272

12751273
std::string const where =
12761274
"Current/Collections/" + databaseName + "/" + collectionID;
@@ -1310,8 +1308,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
13101308
}
13111309

13121310
res.clear();
1313-
res = ac.watchValue("Current/Version", index, interval, false);
1314-
index = res._index;
1311+
_agencyCallbackRegistry->awaitNextChange("Current/Version", interval);
13151312
}
13161313

13171314
// LOG(ERR) << "GOT TIMEOUT. NUMBEROFSHARDS: " << numberOfShards;
@@ -1367,7 +1364,6 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName,
13671364
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
13681365
errorMsg);
13691366
}
1370-
uint64_t index = res._index;
13711367

13721368
// monitor the entry for the collection
13731369
std::string const where =
@@ -1397,8 +1393,7 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName,
13971393
}
13981394

13991395
res.clear();
1400-
res = ac.watchValue("Current/Version", index, interval, false);
1401-
index = res._index;
1396+
_agencyCallbackRegistry->awaitNextChange("Current/Version", interval);
14021397
}
14031398
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
14041399
}
@@ -1741,7 +1736,6 @@ int ClusterInfo::ensureIndexCoordinator(
17411736
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
17421737
errorMsg);
17431738
}
1744-
uint64_t index = res._index;
17451739

17461740
std::string where =
17471741
"Current/Collections/" + databaseName + "/" + collectionID;
@@ -1813,8 +1807,7 @@ int ClusterInfo::ensureIndexCoordinator(
18131807
}
18141808
}
18151809
res.clear();
1816-
res = ac.watchValue("Current/Version", index, interval, false);
1817-
index = res._index;
1810+
_agencyCallbackRegistry->awaitNextChange("Current/Version", interval);
18181811
}
18191812

18201813
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
@@ -1959,7 +1952,6 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
19591952
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
19601953
errorMsg);
19611954
}
1962-
uint64_t index = res._index;
19631955

19641956
std::string where =
19651957
"Current/Collections/" + databaseName + "/" + collectionID;
@@ -2001,8 +1993,7 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
20011993
}
20021994

20031995
res.clear();
2004-
res = ac.watchValue("Current/Version", index, interval, false);
2005-
index = res._index;
1996+
_agencyCallbackRegistry->awaitNextChange("Current/Version", interval);
20061997
}
20071998

20081999
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);

arangod/Cluster/ClusterInfo.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "Basics/Mutex.h"
3131
#include "Basics/ReadWriteLock.h"
3232
#include "Cluster/AgencyComm.h"
33+
#include "Cluster/AgencyCallbackRegistry.h"
3334
#include "VocBase/voc-types.h"
3435
#include "VocBase/vocbase.h"
3536

@@ -540,7 +541,7 @@ class ClusterInfo {
540541
/// @brief creates library
541542
//////////////////////////////////////////////////////////////////////////////
542543

543-
ClusterInfo();
544+
explicit ClusterInfo(AgencyCallbackRegistry*);
544545

545546
//////////////////////////////////////////////////////////////////////////////
546547
/// @brief shuts down library
@@ -549,6 +550,7 @@ class ClusterInfo {
549550
~ClusterInfo();
550551

551552
public:
553+
static void createInstance(AgencyCallbackRegistry*);
552554
//////////////////////////////////////////////////////////////////////////////
553555
/// @brief get the unique instance
554556
//////////////////////////////////////////////////////////////////////////////
@@ -867,6 +869,8 @@ class ClusterInfo {
867869

868870
AgencyComm _agency;
869871

872+
AgencyCallbackRegistry* _agencyCallbackRegistry;
873+
870874
// Cached data from the agency, we reload whenever necessary:
871875

872876
// We group the data, each group has an atomic "valid-flag"

arangod/Cluster/HeartbeatThread.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
#include "VocBase/server.h"
4242
#include "VocBase/vocbase.h"
4343
#include <functional>
44-
#include <iostream>
4544

4645
using namespace arangodb;
4746

0 commit comments

Comments
 (0)
0