8000 Merge branch '3.5' of https://github.com/arangodb/arangodb into featu… · arangodb/arangodb@6ec3256 · GitHub
[go: up one dir, main page]

Skip to content

Commit 6ec3256

Browse files
committed
Merge branch '3.5' of https://github.com/arangodb/arangodb into feature-3.5/intermediate-commit-stats
* '3.5' of https://github.com/arangodb/arangodb: [3.5] clean up your crap, dbservers. alright, i'll do it. (#9722) Bug fix 3.5/internal issue #622 (#9787)
2 parents 0caf59e + 09d2745 commit 6ec3256

13 files changed

+277
-55
lines changed

CHANGELOG

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ v3.5.1 (XXXX-XX-XX)
44
* Add TransactionStatistics to ServerStatistics (transactions started /
55
aborted / committed and number of intermediate commits).
66

7+
* Agents to remove callback entries when responded to with code 404.
8+
9+
* Fixed internal issue #622: Analyzer cache is now invalidated for dropped database.
10+
711
* Show query string length and cacheability information in query explain output.
812

913
* The AQL functions `FULLTEXT`, `NEAR`, `WITHIN` and `WITHIN_RECTANGLE` are now

arangod/Agency/AgencyComm.cpp

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,44 @@ void AgencyCommResult::clear() {
482482

483483
VPackSlice AgencyCommResult::slice() const { return _vpack->slice(); }
484484

485+
void AgencyCommResult::toVelocyPack(VPackBuilder& builder) const {
486+
{ VPackObjectBuilder dump(&builder);
487+
builder.add("location", VPackValue(_location));
488+
builder.add("message", VPackValue(_message));
489+
builder.add("sent", VPackValue(_sent));
490+
builder.add("body", VPackValue(_body));
491+
if (_vpack != nullptr) {
492+
if (_vpack->isClosed()) {
493+
builder.add("vpack", _vpack->slice());
494+
}
495+
}
496+
builder.add("statusCode", VPackValue(_statusCode));
497+
builder.add(VPackValue("values"));
498+
{ VPackObjectBuilder v(&builder);
499+
for (auto const& value : _values) {
500+
builder.add(VPackValue(value.first));
501+
auto const& entry = value.second;
502+
{ VPackObjectBuilder vv(&builder);
503+
builder.add("index", VPackValue(entry._index));
504+
builder.add("isDir", VPackValue(entry._isDir));
505+
if (entry._vpack != nullptr && entry._vpack->isClosed()) {
506+
builder.add("vpack", entry._vpack->slice());
507+
}}}
508+
}}
509+
}
510+
511+
VPackBuilder AgencyCommResult::toVelocyPack() const {
512+
VPackBuilder builder;
513+
toVelocyPack(builder);
514+
return builder;
515+
}
516+
517+
namespace std {
518+
ostream& operator<< (ostream& out, AgencyCommResult const& a) {
519+
out << a.toVelocyPack().toJson();
520+
return out;
521+
}}
522+
485523
// -----------------------------------------------------------------------------
486524
// --SECTION-- AgencyCommManager
487525
// -----------------------------------------------------------------------------

arangod/Agency/AgencyComm.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,10 @@ class AgencyCommResult {
271271
_vpack = vpack;
272272
}
273273

274+
void toVelocyPack(VPackBuilder& builder) const;
275+
276+
VPackBuilder toVelocyPack() const;
277+
274278
public:
275279
std::string _location;
276280
std::string _message;
@@ -699,4 +703,8 @@ class AgencyComm {
699703
};
700704
} // namespace arangodb
701705

706+
namespace std {
707+
ostream& operator<<(ostream& o, arangodb::AgencyCommResult const& a);
708+
}
709+
702710
#endif

arangod/Agency/Agent.cpp

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1289,6 +1289,9 @@ void Agent::run() {
12891289
// Check whether we can advance _commitIndex
12901290
advanceCommitIndex();
12911291

1292+
// Empty store callback trash bin
1293+
emptyCbTrashBin();
1294+
12921295
bool commenceService = false;
12931296
{
12941297
READ_LOCKER(oLocker, _outputLock);
@@ -1626,7 +1629,7 @@ arangodb::consensus::index_t Agent::readDB(VPackBuilder& builder) const {
16261629
TRI_ASSERT(builder.isOpenObject());
16271630

16281631
uint64_t commitIndex = 0;
1629-
1632+
16301633
{ READ_LOCKER(oLocker, _outputLock);
16311634

16321635
commitIndex = _commitIndex;
@@ -1637,10 +1640,10 @@ arangodb::consensus::index_t Agent::readDB(VPackBuilder& builder) const {
16371640
// key-value store {}
16381641
builder.add(VPackValue("agency"));
16391642
_readDB.get().toBuilder(builder, true); }
1640-
1643+
16411644
// replicated log []
16421645
_state.toVelocyPack(commitIndex, builder);
1643-
1646+
16441647
return commitIndex;
16451648
}
16461649

@@ -1904,6 +1907,72 @@ bool Agent::ready() const {
19041907
return _ready;
19051908
}
19061909

1910+
1911+
1912+
void Agent::trashStoreCallback(std::string const& url, query_t const& body) {
1913+
1914+
auto const& slice = body->slice();
1915+
TRI_ASSERT(slice.isObject());
1916+
1917+
// body consists of object holding keys index, term and the observed keys
1918+
// we'll remove observation on every key and according observer url
1919+
for (auto const& i : VPackObjectIterator(slice)) {
1920+
if (!i.key.isEqualString("term") && !i.key.isEqualString("index")) {
1921+
MUTEX_LOCKER(lock, _cbtLock);
1922+
_callbackTrashBin[i.key.copyString()].emplace(url);
1923+
}
1924+
}
1925+
}
1926+
1927+
1928+
void Agent::emptyCbTrashBin() {
1929+
1930+
using clock = std::chrono::steady_clock;
1931+
1932+
auto envelope = std::make_shared<VPackBuilder>();
1933+
{
1934+
_cbtLock.assertNotLockedByCurrentThread();
1935+
MUTEX_LOCKER(lock, _cbtLock);
1936+
1937+
auto early =
1938+
std::chrono::duration_cast<std::chrono::seconds>(
1939+
clock::now() - _callbackLastPurged).count() < 10;
1940+
1941+
if (early || _callbackTrashBin.empty()) {
1942+
return;
1943+
}
1944+
1945+
{
1946+
VPackArrayBuilder trxs(envelope.get());
1947+
for (auto const& i : _callbackTrashBin) {
1948+
for (auto const& j : i.second) {
1949+
{
1950+
VPackArrayBuilder trx(envelope.get());
1951+
{
1952+
VPackObjectBuilder ak(envelope.get());
1953+
envelope->add(VPackValue(i.first));
1954+
{
1955+
VPackObjectBuilder oper(envelope.get());
1956+
envelope->add("op", VPackValue("unobserve"));
1957+
envelope->add("url", VPackValue(j));
1958+
}
1959+
}
1960+
}
1961+
}
1962+
}
1963+
}
1964+
_callbackTrashBin.clear();
1965+
_callbackLastPurged = std::chrono::steady_clock::now();
1966+
}
1967+
1968+
LOG_TOPIC("12ad3", DEBUG, Logger::AGENCY) << "unobserving: " << envelope->toJson();
1969+
1970+
// Best effort. Will be retried anyway
1971+
auto wres = write(envelope);
1972+
1973+
}
1974+
1975+
19071976
query_t Agent::buildDB(arangodb::consensus::index_t index) {
19081977
Store store(this);
19091978
index_t oldIndex;

arangod/Agency/Agent.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,14 @@ class Agent final : public arangodb::Thread, public AgentInterface {
145145
/// @brief Resign leadership
146146
void resign(term_t otherTerm = 0);
147147

148+
/// @brief collect store callbacks for removal
149+
void trashStoreCallback(std::string const& url, query_t const& body);
150+
148151
private:
152+
153+
/// @brief empty callback trash bin
154+
void emptyCbTrashBin();
155+
149156
/// @brief Invoked by leader to replicate log entries ($5.3);
150157
/// also used as heartbeat ($5.2).
151158
void sendAppendEntriesRPC();
@@ -402,6 +409,11 @@ class Agent final : public arangodb::Thread, public AgentInterface {
402409
///
403410
mutable arangodb::Mutex _ioLock;
404411

412+
/// @brief Callback trash bin lock
413+
/// _callbackTrashBin
414+
///
415+
mutable arangodb::Mutex _cbtLock;
416+
405417
/// @brief RAFT consistency lock:
406418
/// _readDB and _commitIndex
407419
/// Allows reading from one or both if used alone.
@@ -455,6 +467,10 @@ class Agent final : public arangodb::Thread, public AgentInterface {
455467
/// since the epoch of the steady clock.
456468
std::atomic<int64_t> _leaderSince;
457469

470+
/// @brief Container for callbacks for removal
471+
std::unordered_map<std::string, std::unordered_set<std::string>> _callbackTrashBin;
472+
std::chrono::time_point<std::chrono::steady_clock> _callbackLastPurged;
473+
458474
/// @brief Ids of ongoing transactions, used for inquire:
459475
std::unordered_set<std::string> _ongoingTrxs;
460476

arangod/Agency/Store.cpp

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -326,26 +326,32 @@ std::vector<bool> Store::applyLogEntries(arangodb::velocypack::Builder const& qu
326326
// Callback
327327

328328
for (auto const& url : urls) {
329-
Builder body; // host
329+
330+
auto body = std::make_shared<VPackBuilder>(); // host
330331
{
331-
VPackObjectBuilder b(&body);
332-
body.add("term", VPackValue(term));
333-
body.add("index", VPackValue(index));
332+
VPackObjectBuilder b(body.get());
333+
body->add("term", VPackValue(term));
334+
body->add("index", VPackValue(index));
335+
334336
auto ret = in.equal_range(url);
335-
std::map<std::string,std::map<std::string, std::string>> result;
337+
336338
// key -> (modified -> op)
339+
// using the map to make sure no double key entries end up in document
340+
std::map<std::string,std::map<std::string, std::string>> result;
337341
for (auto it = ret.first; it != ret.second; ++it) {
338342
result[it->second->key][it->second->modified] = it->second->oper;
339343
}
344+
345+
// Work the map into JSON
340346
for (auto const& m : result) {
341-
body.add(VPackValue(m.first));
347+
body->add(VPackValue(m.first));
342348
{
343-
VPackObjectBuilder guard(&body);
349+
VPackObjectBuilder guard(body.get());
344350
for (auto const& m2 : m.second) {
345-
body.add(VPackValue(m2.first));
351+
body->add(VPackValue(m2.first));
346352
{
347-
VPackObjectBuilder guard2(&body);
348-
body.add("op", VPackValue(m2.second));
353+
VPackObjectBuilder guard2(body.get());
354+
body->add("op", VPackValue(m2.second));
349355
}
350356
}
351357
}
@@ -359,8 +365,8 @@ std::vector<bool> Store::applyLogEntries(arangodb::velocypack::Builder const& qu
359365

360366
arangodb::ClusterComm::instance()->asyncRequest(
361367
coordinatorTransactionID, endpoint, rest::RequestType::POST, path,
362-
std::make_shared<std::string>(body.toString()), hf,
363-
std::make_shared<StoreCallback>(path, body.toJson()), 1.0, true, 0.01);
368+
std::make_shared<std::string>(body->toString()), hf,
369+
std::make_shared<StoreCallback>(path, body, _agent), 1.0, true, 0.01);
364370

365371
} else {
366372
LOG_TOPIC("76aca", WARN, Logger::AGENCY) << "Malformed URL " << url;

arangod/Agency/StoreCallback.cpp

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,28 @@
2121
/// @author Kaveh Vahedipour
2222
////////////////////////////////////////////////////////////////////////////////
2323

24+
#include "Agent.h"
2425
#include "StoreCallback.h"
2526

2627
using namespace arangodb::consensus;
2728
using namespace arangodb::velocypack;
2829

29-
StoreCallback::StoreCallback(std::string const& path, std::string const& body)
30-
: _path(path), _body(body) {}
30+
StoreCallback::StoreCallback(
31+
std::string const& url, query_t const& body, Agent* agent)
32+
: _url(url), _body(body), _agent(agent) {}
3133

3234
bool StoreCallback::operator()(arangodb::ClusterCommResult* res) {
33-
if (res->status != CL_COMM_SENT) {
34-
LOG_TOPIC("7c4cc", DEBUG, Logger::AGENCY) << res->endpoint + _path << "(" << res->status
35-
<< ", " << res->errorMessage << "): " << _body;
35+
36+
if (res->status == CL_COMM_ERROR) {
37+
LOG_TOPIC("9sdbf0", TRACE, Logger::AGENCY)
38+
<< _url << "(" << res->status << ", " << res->errorMessage
39+
<< "): " << _body->toJson();
40+
41+
if (res->result->getHttpReturnCode() == 404 && _agent != nullptr) {
42+
LOG_TOPIC("9sdbf0", DEBUG, Logger::AGENCY) << "dropping dead callback at " << _url;
43+
_agent->trashStoreCallback(_url, _body);
44+
}
3645
}
46+
3747
return true;
3848
}

arangod/Agency/StoreCallback.h

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,18 @@
2929
namespace arangodb {
3030
namespace consensus {
3131

32+
class Agent;
33+
3234
class StoreCallback : public arangodb::ClusterCommCallback {
33-
public:
34-
StoreCallback(std::string const&, std::string const&);
35+
public:
36+
StoreCallback(std::string const&, query_t const&, Agent* agent);
3537

3638
bool operator()(arangodb::ClusterCommResult*) override final;
3739

38-
private:
39-
std::string _path;
40-
std::string _body;
40+
private:
41+
std::string _url;
42+
query_t _body;
43+
Agent* _agent;
4144
};
4245
} // namespace consensus
4346
} // namespace arangodb

0 commit comments

Comments
 (0)
0