8000 reduce number of agency requests a bit · lethalbrains/arangodb@5c17402 · GitHub 8000
[go: up one dir, main page]

Skip to content

Commit 5c17402

Browse files
committed
reduce number of agency requests a bit
1 parent 59aa53a commit 5c17402

File tree

6 files changed

+39
-24
lines changed

6 files changed

+39
-24
lines changed

arangod/Cluster/AgencyCallback.cpp

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,27 @@
3131

3232
using namespace arangodb;
3333

34-
AgencyCallback::AgencyCallback(AgencyComm& agency, std::string const& key, std::function<bool(VPackSlice const&)> const& cb)
34+
AgencyCallback::AgencyCallback(AgencyComm& agency,
35+
std::string const& key,
36+
std::function<bool(VPackSlice const&)> const& cb,
37+
bool needsValue)
3538
: key(key),
3639
_agency(agency),
37-
_cb(cb)
38-
{
39-
refetchAndUpdate();
40+
_cb(cb),
41+
_needsValue(needsValue) {
42+
43+
if (_needsValue) {
44+
refetchAndUpdate();
45+
}
4046
}
4147

4248
void AgencyCallback::refetchAndUpdate() {
49+
if (!_needsValue) {
50+
// no need to pass any value to the callback
51+
executeEmpty();
52+
return;
53+
}
54+
4355
AgencyCommResult result = _agency.getValues(key, true);
4456

4557
if (!result.successful()) {
@@ -50,7 +62,7 @@ void AgencyCallback::refetchAndUpdate() {
5062
LOG(ERR) << "Cannot parse body " << result.body();
5163
return;
5264
}
53-
65+
5466
std::map<std::string, AgencyCommResultEntry>::const_iterator it =
5567
result._values.begin();
5668

@@ -73,6 +85,12 @@ void AgencyCallback::checkValue(std::shared_ptr<VPackBuilder> newData) {
7385
}
7486
}
7587

88+
bool AgencyCallback::executeEmpty() {
89+
LOG(DEBUG) << "Executing (empty)";
90+
MUTEX_LOCKER(locker, _lock);
91+
return _cb(VPackSlice::noneSlice());
92+
}
93+
7694
bool AgencyCallback::execute(std::shared_ptr<VPackBuilder> newData) {
7795
LOG(DEBUG) << "Executing";
7896
MUTEX_LOCKER(locker, _lock);

arangod/Cluster/AgencyCallback.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ class AgencyCallback {
3737
//////////////////////////////////////////////////////////////////////////////
3838
/// @brief ctor
3939
//////////////////////////////////////////////////////////////////////////////
40-
AgencyCallback(AgencyComm&, std::string const&, std::function<bool(VPackSlice const&)> const&);
40+
AgencyCallback(AgencyComm&, std::string const&,
41+
std::function<bool(VPackSlice const&)> const&, bool needsValue);
4142

4243
//////////////////////////////////////////////////////////////////////////////
4344
/// @brief wait a specified timeout. execute cb if watch didn't fire
@@ -54,8 +55,13 @@ class AgencyCallback {
5455
AgencyComm& _agency;
5556
std::function<bool(VPackSlice const&)> const _cb;
5657
std::shared_ptr<VPackBuilder> _lastData;
58+
bool const _needsValue;
5759

60+
// execute callback with current value data
5861
bool execute(std::shared_ptr<VPackBuilder>);
62+
// execute callback without any data
63+
bool executeEmpty();
64+
5965
void checkValue(std::shared_ptr<VPackBuilder>);
6066
};
6167

arangod/Cluster/AgencyCallbackRegistry.cpp

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,12 @@ bool AgencyCallbackRegistry::registerCallback(std::shared_ptr<AgencyCallback> cb
5151
WRITE_LOCKER(locker, _lock);
5252
while (true) {
5353
rand = TRI_UInt32Random();
54-
if (_endpoints.count(rand) == 0) {
55-
_endpoints.emplace(rand, cb);
54+
if (_endpoints.emplace(rand, cb).second) {
5655
break;
5756
}
5857
}
5958
}
60-
59+
6160
bool ok = false;
6261
try {
6362
ok = _agency.registerCallback(cb->key, getEndpointUrl(rand));
@@ -89,10 +88,10 @@ std::shared_ptr<AgencyCallback> AgencyCallbackRegistry::getCallback(uint32_t id)
8988
bool AgencyCallbackRegistry::unregisterCallback(std::shared_ptr<AgencyCallback> cb) {
9089
WRITE_LOCKER(locker, _lock);
9190

92-
for (auto it: _endpoints) {
93-
if (it.second == cb) {
94-
_endpoints.erase(it.first);
91+
for (auto const& it: _endpoints) {
92+
if (it.second.get() == cb.get()) {
9593
_agency.unregisterCallback(cb->key, getEndpointUrl(it.first));
94+
_endpoints.erase(it.first);
9695
return true;
9796
}
9897
}
@@ -116,7 +115,7 @@ void AgencyCallbackRegistry::awaitNextChange(std::string const& key, double time
116115
cv.notify_one();
117116
return true;
118117
};
119-
auto agencyCallback = std::make_shared<AgencyCallback>(_agency, key, notify);
118+
auto agencyCallback = std::make_shared<AgencyCallback>(_agency, key, notify, false);
120119

121120
std::mutex mtx;
122121
std::unique_lock<std::mutex> lck(mtx);

arangod/Cluster/ClusterInfo.cpp

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1357,14 +1357,6 @@ int ClusterInfo::dropCollectionCoordinator(std::string const& databaseName,
13571357
// Update our own cache:
13581358
loadPlannedCollections();
13591359

1360-
// Now wait for it to appear and be complete:
1361-
res.clear();
1362-
res = ac.getValues("Current/Version", false);
1363-
if (!res.successful()) {
1364-
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
1365-
errorMsg);
1366-
}
1367-
13681360
// monitor the entry for the collection
13691361
std::string const where =
13701362
"Current/Collections/" + databaseName + "/" + collectionID;

arangod/Cluster/HeartbeatThread.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ void HeartbeatThread::runDBServer() {
125125
_numDispatchedJobs = 0;
126126
if (_lastDispatchedJobResult) {
127127
LOG(DEBUG) << "...and was successful";
128-
// mop: the dispatched version is still the same => we aer finally uptodate
128+
// mop: the dispatched version is still the same => we are finally uptodate
129129
if (!_dispatchedPlanVersion.isEmpty() && _dispatchedPlanVersion.slice().equals(result)) {
130130
LOG(DEBUG) << "Version is correct :)";
131131
return true;
@@ -147,7 +147,7 @@ void HeartbeatThread::runDBServer() {
147147
return false;
148148
};
149149

150-
auto agencyCallback = std::make_shared<AgencyCallback>(_agency, "Plan/Version", updatePlan);
150+
auto agencyCallback = std::make_shared<AgencyCallback>(_agency, "Plan/Version", updatePlan, true);
151151

152152
bool registered = false;
153153
while (!registered) {

arangod/Cluster/RestAgencyCallbacksHandler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ RestAgencyCallbacksHandler::RestAgencyCallbacksHandler(arangodb::HttpRequest* re
3535
_agencyCallbackRegistry(agencyCallbackRegistry) {
3636
}
3737

38-
bool RestAgencyCallbacksHandler::isDirect() const { return true; }
38+
bool RestAgencyCallbacksHandler::isDirect() const { return false; }
3939

4040
arangodb::rest::HttpHandler::status_t RestAgencyCallbacksHandler::execute() {
4141
std::vector<std::string> const& suffix = _request->suffix();

0 commit comments

Comments
 (0)
0