8000 devel: fixed the missed changes to plan after agency callback is regi… · mnemosdev/arangodb@2e2d947 · GitHub
[go: up one dir, main page]

Skip to content

Commit 2e2d947

Browse files
kvahedneunhoef
authored andcommitted
devel: fixed the missed changes to plan after agency callback is registred f… (arangodb#4775)
* fixed the missed changes to plan after agency callback is registred for create collection * Force check in timeout case. * Sort out RestAgencyHandler behaviour for inquire. * Take "ongoing" stuff out of AgencyComm.
1 parent 73b6975 commit 2e2d947

12 files changed

+191
-77
lines changed

CHANGELOG

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ devel
152152

153153
* fixed a bug where supervision tried to deal with shards of virtual collections
154154

155+
* fixed a bug where clusterinfo missed changes to plan after agency callback is registred for create collection
155156

156157
v3.3.4 (XXXX-XX-XX)
157158
-------------------

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -841,6 +841,7 @@ if (OPENSSL_VERSION)
841841
string(REPLACE "." ";" OPENSSL_VERSION_LIST ${OPENSSL_VERSION})
842842
list(GET OPENSSL_VERSION_LIST 0 OPENSSL_VERSION_MAJOR)
843843
list(GET OPENSSL_VERSION_LIST 1 OPENSSL_VERSION_MINOR)
844+
844845
if ("${OPENSSL_VERSION_MAJOR}" GREATER 0 AND "${OPENSSL_VERSION_MINOR}" GREATER 0)
845846
option(USE_OPENSSL_NO_SSL2
846847
"do not use OPENSSL_NO_SSL2"

arangod/Agency/AgencyComm.cpp

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1496,17 +1496,13 @@ AgencyCommResult AgencyComm::sendWithFailover(
14961496
result = send(
14971497
connection.get(), method, conTimeout, url, b.toJson());
14981498

1499-
// Inquire returns a body like write or if the write is still ongoing
1500-
// We check, if the operation is still ongoing then body is {"ongoing:true"}
1499+
// Inquire returns a body like write, if the transactions are not known,
1500+
// the list of results is empty.
15011501
// _statusCode can be 200 or 412
15021502
if (result.successful() || result._statusCode == 412) {
15031503
std::shared_ptr<VPackBuilder> resultBody
15041504
= VPackParser::fromJson(result._body);
15051505
VPackSlice outer = resultBody->slice();
1506 6D4E -
// If the operation is still ongoing, simply ask again later:
1507-
if (outer.isObject() && outer.hasKey("ongoing")) {
1508-
continue;
1509-
}
15101506

15111507
// If we get an answer, and it contains a "results" key,
15121508
// we release the connection and break out of the loop letting the

arangod/Agency/Agent.cpp

Lines changed: 83 additions & 44 deletions
1017
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,21 @@ AgentInterface::raft_commit_t Agent::waitFor(index_t index, double timeout) {
255255
return Agent::raft_commit_t::UNKNOWN;
256256
}
257257

258+
// Check if log is committed up to index.
259+
bool Agent::isCommitted(index_t index) {
260+
261+
if (size() == 1) { // single host agency
262+
return true;
263+
}
264+
265+
CONDITION_LOCKER(guard, _waitForCV);
266+
if (leading()) {
267+
return _commitIndex >= index;
268+
} else {
269+
return false;
270+
}
271+
}
272+
258273
// AgentCallback reports id of follower and its highest processed index
259274
void Agent::reportIn(std::string const& peerId, index_t index, size_t toLog) {
260275

@@ -866,10 +881,19 @@ trans_ret_t Agent::transact(query_t const& queries) {
866881
// Apply to spearhead and get indices for log entries
867882
auto qs = queries->slice();
868883
addTrxsOngoing(qs); // remember that these are ongoing
884+
size_t failed;
869885
auto ret = std::make_shared<arangodb::velocypack::Builder>();
870-
size_t failed = 0;
871-
ret->openArray();
872886
{
887+
TRI_DEFER(removeTrxsOngoing(qs));
888+
// Note that once the transactions are in our log, we can remove them
889+
// from the list of ongoing ones, although they might not yet be committed.
890+
// This is because then, inquire will find them in the log and draw its
891+
// own conclusions. The map of ongoing trxs is only to cover the time
892+
// from when we receive the request until we have appended the trxs
893+
// ourselves.
894+
ret = std::make_shared<arangodb::velocypack::Builder>();
895+
failed = 0;
896+
ret->openArray();
873897
// Only leader else redirect
874898
if (challengeLeadership()) {
875899
resign();
@@ -895,11 +919,8 @@ trans_ret_t Agent::transact(query_t const& queries) {
895919
_spearhead.read(query, *ret);
896920
}
897921
}
898-
899-
removeTrxsOngoing(qs);
900-
922+
ret->close();
901923
}
902-
ret->close();
903924

904925
// Report that leader has persisted
905926
reportIn(id(), maxind);
@@ -975,20 +996,31 @@ write_ret_t Agent::inquire(query_t const& query) {
975996

976997
write_ret_t ret;
977998

999+
while (true) {
1000+
// Check ongoing ones:
1001+
bool found = false;
1002+
for (auto const& s : VPackArrayIterator(query->slice())) {
1003+
std::string ss = s.copyString();
1004+
if (isTrxOngoing(ss)) {
1005+
found = true;
1006+
break;
1007+
}
1008+
}
1009+
if (!found) {
1010+
break;
1011+
}
1012+
std::this_thread::sleep_for(std::chrono::duration<double>(0.1));
1013+
leader = _constituent.leaderID();
1014+
if (leader != id()) {
1015+
return write_ret_t(false, leader);
1016+
}
+
}
1018+
9781019
_tiLock.assertNotLockedByCurrentThread();
9791020
MUTEX_LOCKER(ioLocker, _ioLock);
9801021

9811022
ret.indices = _state.inquire(query);
9821023

983-
// Check ongoing ones:
984-
for (auto const& s : VPackArrayIterator(query->slice())) {
985-
std::string ss = s.copyString();
986-
if (isTrxOngoing(ss)) {
987-
ret.indices.clear();
988-
break;
989-
}
990-
}
991-
9921024
ret.accepted = true;
9931025

9941026
return ret;
@@ -1018,43 +1050,50 @@ write_ret_t Agent::write(query_t const& query, bool discardStartup) {
10181050
}
10191051
}
10201052

1021-
addTrxsOngoing(query->slice()); // remember that these are ongoing
1022-
1023-
auto slice = query->slice();
1024-
size_t ntrans = slice.length();
1025-
size_t npacks = ntrans/_config.maxAppendSize();
1026-
if (ntrans%_config.maxAppendSize()!=0) {
1027-
npacks++;
1028-
}
1053+
{
1054+
addTrxsOngoing(query->slice()); // remember that these are ongoing
1055+
TRI_DEFER(removeTrxsOngoing(query->slice()));
1056+
// Note that once the transactions are in our log, we can remove them
1057+
// from the list of ongoing ones, although they might not yet be committed.
1058+
// This is because then, inquire will find them in the log and draw its
1059+
// own conclusions. The map of ongoing trxs is only to cover the time
1060+
// from when we receive the request until we have appended the trxs
1061+
// ourselves.
1062+
1063+
auto slice = query->slice();
1064+
size_t ntrans = slice.length();
1065+
size_t npacks = ntrans/_config.maxAppendSize();
1066+
if (ntrans%_config.maxAppendSize()!=0) {
1067+
npacks++;
1068+
}
10291069

1030-
// Apply to spearhead and get indices for log entries
1031-
// Avoid keeping lock indefinitely
1032-
for (size_t i = 0, l = 0; i < npacks; ++i) {
1033-
query_t chunk = std::make_shared<Builder>();
1034-
{
1035-
VPackArrayBuilder b(chunk.get());
1036-
for (size_t j = 0; j < _config.maxAppendSize() && l < ntrans; ++j, ++l) {
1037-
chunk->add(slice.at(l));
1070+
// Apply to spearhead and get indices for log entries
1071+
// Avoid keeping lock indefinitely
1072+
for (size_t i = 0, l = 0; i < npacks; ++i) {
1073+
query_t chunk = std::make_shared<Builder>();
1074+
{
1075+
VPackArrayBuilder b(chunk.get());
1076+
for (size_t j = 0; j < _config.maxAppendSize() && l < ntrans; ++j, ++l) {
1077+
chunk->add(slice.at(l));
1078+
}
10381079
}
1039-
}
10401080

1041-
// Only leader else redirect
1042-
if (multihost && challengeLeadership()) {
1043-
resign();
1044-
return write_ret_t(false, NO_LEADER);
1045-
}
1081+
// Only leader else redirect
1082+
if (multihost && challengeLeadership()) {
1083+
resign();
1084+
return write_ret_t(false, NO_LEADER);
1085+
}
10461086

1047-
_tiLock.assertNotLockedByCurrentThread();
1048-
MUTEX_LOCKER(ioLocker, _ioLock);
1087+
_tiLock.assertNotLockedByCurrentThread();
1088+
MUTEX_LOCKER(ioLocker, _ioLock);
10491089

1050-
applied = _spearhead.applyTransactions(chunk);
1051-
auto tmp = _state.logLeaderMulti(chunk, applied, term());
1052-
indices.insert(indices.end(), tmp.begin(), tmp.end());
1090+
applied = _spearhead.applyTransactions(chunk);
1091 1241 +
auto tmp = _state.logLeaderMulti(chunk, applied, term());
1092+
indices.insert(indices.end(), tmp.begin(), tmp.end());
10531093

1094+
}
10541095
}
10551096

1056-
removeTrxsOngoing(query->slice());
1057-
10581097
// Maximum log index
10591098
index_t maxind = 0;
10601099
if (!indices.empty()) {

arangod/Agency/Agent.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ class Agent : public arangodb::Thread,
183183
/// @brief Wait for slaves to confirm appended entries
184184
AgentInterface::raft_commit_t waitFor(index_t last_entry, double timeout = 10.0) override;
185185

186+
/// @brief Check if everything up to a given index has been committed:
187+
bool isCommitted(index_t last_entry) override;
188+
186189
/// @brief Convencience size of agency
187190
size_t size() const;
188191

arangod/Agency/AgentInterface.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ class AgentInterface {
4545
/// @brief Wait for slaves to confirm appended entries
4646
virtual raft_commit_t waitFor(index_t last_entry, double timeout = 2.0) = 0;
4747

48+
/// @brief Wait for slaves to confirm appended entries
49+
virtual bool isCommitted(index_t last_entry) = 0;
50+
4851
// Suffice warnings
4952
virtual ~AgentInterface() {};
5053
};

arangod/Agency/RestAgencyHandler.cpp

Lines changed: 59 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -287,9 +287,9 @@ RestStatus RestAgencyHandler::handleWrite() {
287287
body.close();
288288

289289
if (result == Agent::raft_commit_t::UNKNOWN) {
290-
generateResult(rest::ResponseCode::SERVICE_UNAVAILABLE, body.slice());
290+
generateError(rest::ResponseCode::SERVICE_UNAVAILABLE, TRI_ERROR_HTTP_SERVICE_UNAVAILABLE);
291291
} else if (result == Agent::raft_commit_t::TIMEOUT) {
292-
generateResult(rest::ResponseCode::REQUEST_TIMEOUT, body.slice());
292+
generateError(rest::ResponseCode::REQUEST_TIMEOUT, 408);
293293
} else {
294294
if (errors > 0) { // Some/all requests failed
295295
generateResult(rest::ResponseCode::PRECONDITION_FAILED, body.slice());
@@ -402,24 +402,70 @@ RestStatus RestAgencyHandler::handleInquire() {
402402
}
403403

404404
if (ret.accepted) { // I am leading
405+
406+
bool found;
407+
std::string call_mode = _request->header("x-arangodb-agency-mode", found);
408+
if (!found) {
409+
call_mode = "waitForCommitted";
410+
}
411+
412+
// First possibility: The answer is empty, we have never heard about
413+
// these transactions. In this case we say so, regardless what the
414+
// "agency-mode" is.
415+
// Second possibility: Non-empty answer, but agency-mode is "noWait",
416+
// then we simply report our findings, too.
417+
// Third possibility, we actually have a non-empty list of indices,
418+
// and we need to wait for commit to answer.
419+
420+
// Handle cases 2 and 3:
421+
Agent::raft_commit_t result = Agent::raft_commit_t::OK;
422+
bool allCommitted = true;
423+
if (!ret.indices.empty()) {
424+
arangodb::consensus::index_t max_index = 0;
425+
try {
426+
max_index =
427+
*std::max_element(ret.indices.begin(), ret.indices.end());
428+
} catch (std::exception const& ex) {
429+
LOG_TOPIC(WARN, Logger::AGENCY) << ex.what();
430+
}
431+
432+
if (max_index > 0) {
433+
if (call_mode == "waitForCommitted") {
434+
result = _agent->waitFor(max_index);
435+
} else {
436+
allCommitted = _agent->isCommitted(max_index);
437+
}
438+
}
439+
}
440+
441+
// We can now prepare the result:
405442
Builder body;
406443
bool failed = false;
407444
{ VPackObjectBuilder b(&body);
408-
if (ret.indices.empty()) {
445+
body.add(VPackValue("results"));
446+
{ VPackArrayBuilder bb(&body);
447+
for (auto const& index : ret.indices) {
448+
body.add(VPackValue(index));
449+
failed = (failed || index == 0);
450+
}
451+
}
452+
body.add("inquired", VPackValue(true));
453+
if (!allCommitted) { // can only happen in agency_mode "noWait"
409454
body.add("ongoing", VPackValue(true));
410-
} else {
411-
body.add(VPackValue("results"));
412-
{ VPackArrayBuilder bb(&body);
413-
for (auto const& index : ret.indices) {
414-
body.add(VPackValue(index));
415-
failed = (failed || index == 0);
416-
}}
455+
}
456+
}
417457

458+
if (result == Agent::raft_commit_t::UNKNOWN) {
459+
generateError(rest::ResponseCode::SERVICE_UNAVAILABLE, TRI_ERROR_HTTP_SERVICE_UNAVAILABLE);
460+
} else if (result == Agent::raft_commit_t::TIMEOUT) {
461+
generateError(rest::ResponseCode::REQUEST_TIMEOUT, 408);
462+
} else {
463+
if (failed > 0) { // Some/all requests failed
464+
generateResult(rest::ResponseCode::PRECONDITION_FAILED, body.slice());
465+
} else { // All good (or indeed unknown in case 1)
466+
generateResult(rest::ResponseCode::OK, body.slice());
418467
}
419-
body.add("inquired", VPackValue(true));
420468
}
421-
generateResult(failed ? rest::ResponseCode::PRECONDITION_FAILED :
422-
rest::ResponseCode::OK, body.slice());
423469
} else { // Redirect to leader
424470
if (_agent->leaderID() == NO_LEADER) {
425471
return reportMessage(rest::ResponseCode::SERVICE_UNAVAILABLE, "No leader");

arangod/Cluster/AgencyCallback.cpp

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,11 @@ AgencyCallback::AgencyCallback(AgencyComm& agency, std::string const& key,
4141
bool needsValue, bool needsInitialValue)
4242
: key(key), _agency(agency), _cb(cb), _needsValue(needsValue) {
4343
if (_needsValue && needsInitialValue) {
44-
refetchAndUpdate(true);
44+
refetchAndUpdate(true, false);
4545
}
4646
}
4747

48-
void AgencyCallback::refetchAndUpdate(bool needToAcquireMutex) {
48+
void AgencyCallback::refetchAndUpdate(bool needToAcquireMutex, bool forceCheck) {
4949
if (!_needsValue) {
5050
// no need to pass any value to the callback
5151
if (needToAcquireMutex) {
@@ -74,19 +74,21 @@ void AgencyCallback::refetchAndUpdate(bool needToAcquireMutex) {
7474

7575
if (needToAcquireMutex) {
7676
CONDITION_LOCKER(locker, _cv);
77-
checkValue(newData);
77+
checkValue(newData, forceCheck);
7878
} else {
79-
checkValue(newData);
79+
checkValue(newData, forceCheck);
8080
}
8181
}
8282

83-
void AgencyCallback::checkValue(std::shared_ptr<VPackBuilder> newData) {
83+
void AgencyCallback::checkValue(std::shared_ptr<VPackBuilder> newData,
84+
bool forceCheck) {
8485
// Only called from refetchAndUpdate, we always have the mutex when
8586
// we get here!
86-
if (!_lastData || !_lastData->slice().equals(newData->slice())) {
87+
if (!_lastData || !_lastData->slice().equals(newData->slice()) || forceCheck) {
8788
LOG_TOPIC(DEBUG, Logger::CLUSTER) << "AgencyCallback: Got new value "
8889
<< newData->slice().typeName() << " "
89-
<< newData->toJson();
90+
<< newData->toJson()
91+
<< " forceCheck=" << forceCheck;
9092
if (execute(newData)) {
9193
_lastData = newData;
9294
} else {
@@ -125,6 +127,6 @@ void AgencyCallback::executeByCallbackOrTimeout(double maxTimeout) {
125127
LOG_TOPIC(DEBUG, Logger::CLUSTER)
126128
<< "Waiting done and nothing happended. Refetching to be sure";
127129
// mop: watches have not triggered during our sleep...recheck to be sure
128-
refetchAndUpdate(false);
130+
refetchAndUpdate(false, true); // Force a check
129131
}
130132
}

0 commit comments

Comments
 (0)
0