8000 fixed issue with leadership in minority · AthonyLi/arangodb@2550dd2 · GitHub
[go: up one dir, main page]

Skip to content

Commit 2550dd2

Browse files
committed
fixed issue with leadership in minority
1 parent cbe2d51 commit 2550dd2

File tree

9 files changed

+94
-20
lines changed

9 files changed

+94
-20
lines changed

arangod/Agency/AgencyCommon.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ typedef uint64_t term_t;
4444
/// @brief Agent roles
4545
enum role_t {FOLLOWER, CANDIDATE, LEADER};
4646

47+
static const std::string NO_LEADER = "";
48+
4749

4850
/// @brief Duration type
4951
typedef std::chrono::duration<long, std::ratio<1, 1000>> duration_t;

arangod/Agency/Agent.cpp

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ void Agent::reportIn(std::string const& id, index_t index) {
200200

201201
MUTEX_LOCKER(mutexLocker, _ioLock);
202202

203+
_lastAcked[id] = std::chrono::system_clock::now();
204+
203205
if (index > _confirmed[id]) { // progress this follower?
204206
_confirmed[id] = index;
205207
}
@@ -343,7 +345,7 @@ priv_rpc_ret_t Agent::sendAppendEntriesRPC(std::string const& follower_id) {
343345
builder.close();
344346

345347
// Verbose output
346-
if (unconfirmed.size() > 1) {
348+
if (unconfirmed.size() > 1) {
347349
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Appending " << unconfirmed.size() - 1
348350
<< " entries up to index " << highest
349351
<< " to follower " << follower_id;
@@ -477,11 +479,27 @@ write_ret_t Agent::write(query_t const& query) {
477479
/// Read from store
478480
read_ret_t Agent::read(query_t const& query) {
479481

482+
MUTEX_LOCKER(mutexLocker, _ioLock);
483+
480484
// Only leader else redirect
481485
if (!_constituent.leading()) {
482486
return read_ret_t(false, _constituent.leaderID());
483487
}
484488

489+
// Still leading?
490+
size_t good = 0;
491+
for (auto const& i : _lastAcked) {
492+
std::chrono::duration<double> m =
493+
std::chrono::system_clock::now() - i.second;
494+
if(0.9*_config.minPing() > m.count()) {
495+
++good;
496+
}
497+
}
498+
499+
if (good < size() / 2) {
500+
_constituent.candidate();
501+
}
502+
485503
// Retrieve data from readDB
486504
auto result = std::make_shared<arangodb::velocypack::Builder>();
487505
std::vector<bool> success = _readDB.read(query, result);
@@ -558,8 +576,11 @@ bool Agent::lead() {
558576
CONDITION_LOCKER(guard, _appendCV);
559577
guard.broadcast();
560578
}
561-
562579

580+
for (auto const& i : _config.active()) {
581+
_lastAcked[i] = std::chrono::system_clock::now();
582+
}
583+
563584
// Agency configuration
564585
auto agency = std::make_shared<Builder>();
565586
agency->openArray();

arangod/Agency/Agent.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ class Agent : public arangodb::Thread {
224224
std::map<std::string, index_t> _confirmed;
225225
std::map<std::string, index_t> _lastHighest;
226226

227+
std::map<std::string, TimePoint> _lastAcked;
227228
std::map<std::string, TimePoint> _lastSent;
228229
arangodb::Mutex _ioLock; /**< @brief Read/Write lock */
229230

arangod/Agency/Constituent.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ using namespace arangodb::rest;
4949
using namespace arangodb::velocypack;
5050
using namespace arangodb;
5151

52-
static const std::string NO_LEADER = "";
5352
// (std::numeric_limits<std::string>::max)();
5453

5554
/// Raft role names for display purposes

arangod/Agency/Inception.cpp

Lines changed: 59 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,17 @@ bool Inception::start() { return Thread::start(); }
5151
/// - Get snapshot of gossip peers and agent pool
5252
/// - Create outgoing gossip.
5353
/// - Send to all peers
54-
void Inception::run() {
5554

56-
TRI_ASSERT(_agent != nullptr);
55+
void Inception::gossip() {
5756

5857
auto s = std::chrono::system_clock::now();
5958
std::chrono::seconds timeout(120);
6059
size_t i = 0;
61-
//bool cs = false;
60+
6261
while (!this->isStopping()) {
6362

6463
config_t config = _agent->config(); // get a copy of conf
65-
64+
6665
query_t out = std::make_shared<Builder>();
6766
out->openObject();
6867
out->add("endpoint", VPackValue(config.endpoint()));
@@ -73,7 +72,7 @@ void Inception::run() {
7372
}
7473
out->close();
7574
out->close();
76-
75+
7776
std::string path = "/_api/agency_priv/gossip";
7877

7978
for (auto const& p : config.gossipPeers()) { // gossip peers
@@ -99,9 +98,9 @@ void Inception::run() {
9998
std::make_shared<GossipCallback>(_agent), 1.0, true);
10099
}
101100
}
102-
101+
103102
std::this_thread::sleep_for(std::chrono::milliseconds(250));
104-
103+
105104
if ((std::chrono::system_clock::now()-s) > timeout) {
106105
if (config.poolComplete()) {
107106
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Stopping active gossipping!";
@@ -111,17 +110,64 @@ void Inception::run() {
111110
}
112111
break;
113112
}
114-
113+
115114
if (config.poolComplete()) {
116-
//if(!cs) {
117-
_agent->startConstituent();
118-
break;
119-
//cs = true;
120-
//}
115+
_agent->startConstituent();
116+
break;
121117
}
118+
119+
}
120+
121+
}
122122

123+
void Inception::activeAgency() { // Do we have an active agency?
124+
/*
125+
config_t config = _agent->config(); // get a copy of conf
126+
size_t i = 0;
127+
std::string const path = "/_api/agency/activeAgents";
128+
129+
for (auto const& endpoint : config.gossipPeers()) { // gossip peers
130+
if (endpoint != config.endpoint()) {
131+
std::string clientid = config.id() + std::to_string(i++);
132+
auto hf = std::make_unique<std::unordered_map<std::string, std::string>>();
133+
arangodb::ClusterComm::instance()->asyncRequest(
134+
clientid, 1, endpoint, GeneralRequest::RequestType::POST, path,
135+
std::make_shared<std::string>(out->toJson()), hf,
136+
std::make_shared<GossipCallback>(_agent), 1.0, true);
137+
}
123138
}
139+
140+
for (auto const& pair : config.pool()) { // pool entries
141+
if (pair.second != config.endpoint()) {
142+
std::string clientid = config.id() + std::to_string(i++);
143+
auto hf = std::make_unique<std::unordered_map<std::string, std::string>>();
144+
arangodb::ClusterComm::instance()->asyncRequest(
145+
clientid, 1, pair.second, GeneralRequest::RequestType::POST, path,
146+
std::make_shared<std::string>(out->toJson()), hf,
147+
std::make_shared<GossipCallback>(_agent), 1.0, true);
148+
}
149+
}
150+
*/
151+
// start in pool/gossi peers start check if active agency
152+
153+
// if not if i have persisted agency
154+
// if member
155+
// contact other agents.
156+
// if agreement raft
124157

158+
// complete pool?
159+
160+
}
161+
162+
void Inception::run() {
163+
164+
//activeAgency();
165+
166+
config_t config = _agent->config();
167+
if (!config.poolComplete()) {
168+
gossip();
169+
}
170+
125171
}
126172

127173

arangod/Agency/Inception.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class Inception : public Thread {
4545
Inception();
4646
explicit Inception(Agent*);
4747
virtual ~Inception();
48-
48+
4949
void run() override;
5050
bool start();
5151

@@ -54,6 +54,9 @@ class Inception : public Thread {
5454

5555
private:
5656

57+
void activeAgency();
58+
void gossip();
59+
5760
Agent* _agent;
5861

5962
};

arangod/Agency/RestAgencyPrivHandler.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,9 @@ RestHandler::status RestAgencyPrivHandler::execute() {
147147
if (_request->requestType() != GeneralRequest::RequestType::GET) {
148148
return reportMethodNotAllowed();
149149
}
150-
result.add("active", _agent->config().activeAgentsToBuilder()->slice());
150+
if (_agent->leaderID() != NO_LEADER) {
151+
result.add("active", _agent->config().activeAgentsToBuilder()->slice());
152+
}
151153
} else if (_request->suffix()[0] == "inform") {
152154
arangodb::velocypack::Options options;
153155
query_t query = _request->toVelocyPackBuilderPtr(&options);

arangod/Agency/Supervision.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ void Supervision::run() {
341341
// make sense at all without other ArangoDB servers, we wait pretty
342342
// long here before giving up:
343343
if (!updateAgencyPrefix(1000, 1)) {
344-
LOG_TOPIC(ERR, Logger::AGENCY)
344+
LOG_TOPIC(DEBUG, Logger::AGENCY)
345345
<< "Cannot get prefix from Agency. Stopping supervision for good.";
346346
return;
347347
}

scripts/startLocalCluster.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ for aid in `seq 0 $(( $NRAGENTS - 1 ))`; do
6969
--agency.pool-size $NRAGENTS \
7070
--agency.supervision true \
7171
--agency.supervision-frequency $SFRE \
72-
--agency.wait-for-sync true \
72+
--agency.wait-for-sync false \
7373
--agency.election-timeout-min $MINP \
7474
--agency.election-timeout-max $MAXP \
7575
--database.directory cluster/data$port \

0 commit comments

Comments
 (0)
0