8000 remove 404-ed callbacks from agency by kvahed · Pull Request #9709 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

remove 404-ed callbacks from agency #9709

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Aug 23, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
remove 404-ed callbacks from agency
  • Loading branch information
kvahed committed Aug 14, 2019
commit fdf19056f501dd4747318cee02051263c83a582e
430 changes: 216 additions & 214 deletions CHANGELOG

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions arangod/Agency/AgencyComm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,44 @@ void AgencyCommResult::clear() {

VPackSlice AgencyCommResult::slice() const { return _vpack->slice(); }

void AgencyCommResult::toVelocyPack(VPackBuilder& builder) const {
{ VPackObjectBuilder dump(&builder);
builder.add("location", VPackValue(_location));
builder.add("message", VPackValue(_message));
builder.add("sent", VPackValue(_sent));
builder.add("body", VPackValue(_body));
if (_vpack != nullptr) {
if (_vpack->isClosed()) {
builder.add("vpack", _vpack->slice());
}
}
builder.add("statusCode", VPackValue(_statusCode));
builder.add(VPackValue("values"));
{ VPackObjectBuilder v(&builder);
for (auto const& value : _values) {
builder.add(VPackValue(value.first));
auto const& entry = value.second;
{ VPackObjectBuilder vv(&builder);
builder.add("index", VPackValue(entry._index));
builder.add("isDir", VPackValue(entry._isDir));
if (entry._vpack != nullptr && entry._vpack->isClosed()) {
builder.add("vpack", entry._vpack->slice());
}}}
}}
}

VPackBuilder AgencyCommResult::toVelocyPack() const {
VPackBuilder builder;
toVelocyPack(builder);
return builder;
}

namespace std {
ostream& operator<< (ostream& out, AgencyCommResult const& a) {
out << a.toVelocyPack().toJson();
return out;
}}

// -----------------------------------------------------------------------------
// --SECTION-- AgencyCommManager
// -----------------------------------------------------------------------------
Expand Down
8 changes: 8 additions & 0 deletions arangod/Agency/AgencyComm.h
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,10 @@ class AgencyCommResult {
return Result{errorCode(), errorMessage()};
}

void toVelocyPack(VPackBuilder& builder) const;

VPackBuilder toVelocyPack() const;

public:
std::string _location;
std::string _message;
Expand Down Expand Up @@ -731,4 +735,8 @@ class AgencyComm {
};
} // namespace arangodb

namespace std {
ostream& operator<<(ostream& o, arangodb::AgencyCommResult const& a);
}

#endif
61 changes: 61 additions & 0 deletions arangod/Agency/Agent.cpp
8000
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,9 @@ void Agent::run() {
// Check whether we can advance _commitIndex
advanceCommitIndex();

// Empty store callback trash bin
emptyCbTrashBin();

bool commenceService = false;
{
READ_LOCKER(oLocker, _outputLock);
Expand Down Expand Up @@ -1882,6 +1885,64 @@ bool Agent::ready() const {
return _ready;
}



void Agent::trashStoreCallback(std::string const& url, query_t const& body) {

auto const& slice = body->slice();
TRI_ASSERT(slice.isObject());

// body consists of object holding keys index, term and the observed keys
// we'll remove observation on every key and according observer url
for (auto const& i : VPackObjectIterator(slice)) {
if (!i.key.isEqualString("term") && !i.key.isEqualString("index")) {
MUTEX_LOCKER(lock, _cbtLock);
_callbackTrashBin[i.key.copyString()].emplace(url);
}
}
}


void Agent::emptyCbTrashBin() {

using clock = std::chrono::steady_clock;

auto envelope = std::make_shared<VPackBuilder>();
{
_cbtLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(lock, _cbtLock);

auto early =
std::chrono::duration_cast<std::chrono::seconds>(
clock::now() - _callbackLastPurged).count() < 10;

if (early || _callbackTrashBin.empty()) {
return;
}

{ VPackArrayBuilder trxs(envelope.get());
for (auto const& i : _callbackTrashBin) {
{ VPackArrayBuilder trx(envelope.get());
{ VPackObjectBuilder ak(envelope.get());
envelope->add(VPackValue(i.first));
for (auto const& j : i.second) {
{ VPackObjectBuilder oper(envelope.get());
envelope->add("op", VPackValue("unobserve"));
envelope->add("url", VPackValue(j));}}}
}
}}
_callbackTrashBin.clear();
_callbackLastPurged = std::chrono::steady_clock::now();
}

LOG_DEVEL << envelope->toJson();

// Best effort. Will be retried anyway
auto wres = write(envelope);

}


query_t Agent::buildDB(arangodb::consensus::index_t index) {
Store store(this);
index_t oldIndex;
Expand Down
16 changes: 16 additions & 0 deletions arangod/Agency/Agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,14 @@ class Agent final : public arangodb::Thread, public AgentInterface {
/// @brief Resign leadership
void resign(term_t otherTerm = 0);

/// @brief collect store callbacks for removal
void trashStoreCallback(std::string const& url, query_t const& body);

private:

/// @brief empty callback trash bin
void emptyCbTrashBin();

/// @brief Invoked by leader to replicate log entries ($5.3);
/// also used as heartbeat ($5.2).
void sendAppendEntriesRPC();
Expand Down Expand Up @@ -402,6 +409,11 @@ class Agent final : public arangodb::Thread, public AgentInterface {
///
mutable arangodb::Mutex _ioLock;

/// @brief Callback trash bin lock
/// _callbackTrashBin
///
mutable arangodb::Mutex _cbtLock;

/// @brief RAFT consistency lock:
/// _readDB and _commitIndex
/// Allows reading from one or both if used alone.
Expand Down Expand Up @@ -455,6 +467,10 @@ class Agent final : public arangodb::Thread, public AgentInterface {
/// since the epoch of the steady clock.
std::atomic<int64_t> _leaderSince;

/// @brief Container for callbacks for removal
std::unordered_map<std::string, std::unordered_set<std::string>> _callbackTrashBin;
std::chrono::time_point<std::chrono::steady_clock> _callbackLastPurged;

/// @brief Ids of ongoing transactions, used for inquire:
std::unordered_set<std::string> _ongoingTrxs;

Expand Down
23 changes: 12 additions & 11 deletions arangod/Agency/Store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,26 +327,27 @@ std::vector<bool> Store::applyLogEntries(arangodb::velocypack::Builder const& qu
// Callback

for (auto const& url : urls) {
Builder body; // host

auto body = std::make_shared<VPackBuilder>(); // host
{
VPackObjectBuilder b(&body);
body.add("term", VPackValue(term));
body.add("index", VPackValue(index));
VPackObjectBuilder b(body.get());
body->add("term", VPackValue(term));
body->add("index", VPackValue(index));
auto ret = in.equal_range(url);
std::map<std::string,std::map<std::string, std::string>> result;
// key -> (modified -> op)
for (auto it = ret.first; it != ret.second; ++it) {
result[it->second->key][it->second->modified] = it->second->oper;
}
for (auto const& m : result) {
body.add(VPackValue(m.first));
body->add(VPackValue(m.first));
{
VPackObjectBuilder guard(&body);
VPackObjectBuilder guard(body.get());
for (auto const& m2 : m.second) {
body.add(VPackValue(m2.first));
body->add(VPackValue(m2.first));
{
VPackObjectBuilder guard2(&body);
body.add("op", VPackValue(m2.second));
VPackObjectBuilder guard2(body.get());
body->add("op", VPackValue(m2.second));
}
}
}
Expand All @@ -360,8 +361,8 @@ std::vector<bool> Store::applyLogEntries(arangodb::velocypack::Builder const& qu

arangodb::ClusterComm::instance()->asyncRequest(
coordinatorTransactionID, endpoint, rest::RequestType::POST, path,
std::make_shared<std::string>(body.toString()), hf,
std::make_shared<StoreCallback>(path, body.toJson()), 1.0, true, 0.01);
std::make_shared<std::string>(body->toString()), hf,
std::make_shared<StoreCallback>(url, body, _agent), 1.0, true, 0.01);

} else {
LOG_TOPIC("76aca", WARN, Logger::AGENCY) << "Malformed URL " << url;
Expand Down
20 changes: 15 additions & 5 deletions arangod/Agency/StoreCallback.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,28 @@
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////

#include "Agent.h"
#include "StoreCallback.h"

using namespace arangodb::consensus;
using namespace arangodb::velocypack;

StoreCallback::StoreCallback(std::string const& path, std::string const& body)
: _path(path), _body(body) {}
StoreCallback::StoreCallback(
std::string const& url, query_t const& body, Agent* agent)
: _url(url), _body(body), _agent(agent) {}

bool StoreCallback::operator()(arangodb::ClusterCommResult* res) {
if (res->status != CL_COMM_SENT) {
LOG_TOPIC("7c4cc", DEBUG, Logger::AGENCY) << res->endpoint + _path << "(" << res->status
<< ", " << res->errorMessage << "): " << _body;

if (res->status == CL_COMM_ERROR) {
LOG_TOPIC("9sdbf0", DEBUG, Logger::AGENCY)
<< _url << "(" << res->status << ", " << res->errorMessage
<< "): " << _body->toJson();

if (res->result->getHttpReturnCode() == 404 && _agent != nullptr) {
_agent->trashStoreCallback(_url, _body);
}
}

return true;
}

17 changes: 10 additions & 7 deletions arangod/Agency/StoreCallback.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@
namespace arangodb {
namespace consensus {

class StoreCallback : public arangodb::ClusterCommCallback {
public:
StoreCallback(std::string const&, std::string const&);
class Agent;

class StoreCallback : public arangodb::ClusterCommCallback {
public:
StoreCallback(std::string const&, query_t const&, Agent* agent);

bool operator()(arangodb::ClusterCommResult*) override final;

private:
std::string _path;
std::string _body;

private:
std::string _url;
query_t _body;
Agent* _agent;
};
} // namespace consensus
} // namespace arangodb
Expand Down
0