8000 Better agency pool update (#7040) · botnick/arangodb@d23aaa2 · GitHub
[go: up one dir, main page]

Skip to content

Commit d23aaa2

Browse files
graetzerjsteemann
authored andcommitted
Better agency pool update (arangodb#7040)
1 parent c2ce154 commit d23aaa2

File tree

4 files changed

+45
-36
lines changed

4 files changed

+45
-36
lines changed

arangod/Agency/AgencyComm.cpp

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -692,19 +692,41 @@ void AgencyCommManager::addEndpoint(std::string const& endpoint) {
692692
if (iter == _endpoints.end()) {
693693
LOG_TOPIC(DEBUG, Logger::AGENCYCOMM) << "using agency endpoint '"
694694
<< normalized << "'";
695-
696695
_endpoints.emplace_back(normalized);
697696
}
698697
}
699698

700-
void AgencyCommManager::removeEndpoint(std::string const& endpoint) {
699+
void AgencyCommManager::updateEndpoints(std::vector<std::string> const& newEndpoints) {
700+
std::set<std::string> updatedSet;
701+
for (std::string const& endp : newEndpoints) {
702+
updatedSet.emplace(Endpoint::unifiedForm(endp));
703+
}
704+
701705
MUTEX_LOCKER(locker, _lock);
702-
703-
std::string normalized = Endpoint::unifiedForm(endpoint);
704-
705-
_endpoints.erase(
706-
std::remove(_endpoints.begin(), _endpoints.end(), normalized),
707-
_endpoints.end());
706+
707+
std::set<std::string> currentSet;
708+
currentSet.insert(_endpoints.begin(), _endpoints.end());
709+
710+
std::set<std::string> toRemove;
711+
std::set_difference(currentSet.begin(), currentSet.end(),
712+
updatedSet.begin(), updatedSet.end(),
713+
std::inserter(toRemove, toRemove.begin()));
714+
715+
std::set<std::string> toAdd;
716+
std::set_difference(updatedSet.begin(), updatedSet.end(),
717+
currentSet.begin(), currentSet.end(),
718+
std::inserter(toAdd, toAdd.begin()));
719+
720+
for (std::string const& rem : toRemove) {
721+
LOG_TOPIC(INFO, Logger::AGENCYCOMM) << "Removing endpoint " << rem << " from agent pool";
722+
_endpoints.erase(std::remove(_endpoints.begin(), _endpoints.end(), rem),
723+
_endpoints.end());
724+
}
725+
726+
for (std::string const& add : toAdd) {
727+
LOG_TOPIC(INFO, Logger::AGENCYCOMM) << "Adding endpoint " << add << " to agent pool";
728+
_endpoints.emplace_back(add);
729+
}
708730
}
709731

710732
std::string AgencyCommManager::endpointsString() const {
@@ -745,6 +767,7 @@ AgencyCommManager::createNewConnection() {
745767
}
746768

747769
void AgencyCommManager::switchCurrentEndpoint() {
770+
_lock.assertLockedByCurrentThread();
748771
if (_endpoints.empty()) {
749772
return;
750773
}
@@ -130 10000 1,29 +1324,6 @@ bool AgencyComm::unlock(std::string const& key, VPackSlice const& slice,
13011324
}
13021325

13031326

1304-
void AgencyComm::updateEndpoints(arangodb::velocypack::Slice const& current) {
1305-
1306-
auto stored = AgencyCommManager::MANAGER->endpoints();
1307-
1308-
for (const auto& i : VPackObjectIterator(current)) {
1309-
auto const endpoint = Endpoint::unifiedForm(i.value.copyString());
1310-
if (std::find(stored.begin(), stored.end(), endpoint) == stored.end()) {
1311-
LOG_TOPIC(INFO, Logger::AGENCYCOMM)
1312-
<< "Adding endpoint " << endpoint << " to agent pool";
1313-
AgencyCommManager::MANAGER->addEndpoint(endpoint);
1314-
}
1315-
stored.erase(
1316-
std::remove(stored.begin(), stored.end(), endpoint), stored.end());
1317-
}
1318-
1319-
for (const auto& i : stored) {
1320-
LOG_TOPIC(INFO, Logger::AGENCYCOMM)
1321-
<< "Removing endpoint " << i << " from agent pool";
1322-
AgencyCommManager::MANAGER->removeEndpoint(i);
1323-
}
1324-
}
1325-
1326-
13271327
AgencyCommResult AgencyComm::sendWithFailover(
13281328
arangodb::rest::RequestType method, double const timeout,
13291329
std::string const& initialUrl, VPackSlice inBody) {

arangod/Agency/AgencyComm.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,8 @@ class AgencyCommManager {
566566
std::string& url);
567567

568568
void addEndpoint(std::string const&);
569-
void removeEndpoint(std::string const&);
569+
/// removes old endpoints, adds new ones
570+
void updateEndpoints(std::vector<std::string> const& endpoints);
570571
std::string endpointsString() const;
571572
std::vector<std::string> endpoints() const;
572573
std::shared_ptr<VPackBuilder> summery() const;
@@ -676,8 +677,6 @@ class AgencyComm {
676677
AgencyCommResult unregisterCallback(
677678
std::string const& key, std::string const& endpoint);
678679

679-
void updateEndpoints(arangodb::velocypack::Slice const&);
680-
681680
bool lockRead(std::string const&, double, double);
682681

683682
bool lockWrite(std::string const&, double, double);

arangod/Cluster/HeartbeatThread.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1280,7 +1280,17 @@ bool HeartbeatThread::sendServerState() {
12801280
void HeartbeatThread::updateAgentPool(VPackSlice const& agentPool) {
12811281
if (agentPool.isObject() && agentPool.get("pool").isObject() &&
12821282
agentPool.hasKey("size") && agentPool.get("size").getUInt() > 0) {
1283-
_agency.updateEndpoints(agentPool.get("pool"));
1283+
try {
1284+
std::vector<std::string> values;
1285+
for (auto pair : VPackObjectIterator(agentPool.get("pool"))) {
1286+
values.emplace_back(pair.value.copyString());
1287+
}
1288+
AgencyCommManager::MANAGER->updateEndpoints(values);
1289+
} catch(basics::Exception const& e) {
1290+
LOG_TOPIC(WARN, Logger::HEARTBEAT) << "Error updating agency pool: " << e.message();
1291+
} catch(std::exception const& e) {
1292+
LOG_TOPIC(WARN, Logger::HEARTBEAT) << "Error updating agency pool: " << e.what();
1293+
} catch(...) {}
12841294
} else {
12851295
LOG_TOPIC(ERR, Logger::AGENCYCOMM) << "Cannot find an agency persisted in RAFT 8|";
12861296
}

arangod/VocBase/Methods/Databases.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ namespace {
345345

346346
vocbase->release();
347347
// sleep
348-
std::this_thread::sleep_for(std::chrono::microseconds(10000));
348+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
349349
}
350350
return TRI_ERROR_NO_ERROR;
351351
}

0 commit comments

Comments
 (0)
0