8000 Bug fix/fix remote executor races by goedderz · Pull Request #10206 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Bug fix/fix remote executor races #10206

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions arangod/Aql/ExecutionBlock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlock::traceGetSomeEnd
auto const queryId = this->_engine->getQuery()->id();
LOG_TOPIC("07a60", INFO, Logger::QUERIES)
<< "[query#" << queryId << "] "
<< "getSome done type=" << node->getTypeString() << " this=" << (uintptr_t)this
<< " id=" << node->id() << " state=" << stateToString(state)
<< " items=" << items;
<< "getSome done type=" << node->getTypeString()
<< " this=" << (uintptr_t)this << " id=" << node->id()
<< " state=" << stateToString(state) << " items=" << items;

if (_profile >= PROFILE_LEVEL_TRACE_2) {
if (result == nullptr) {
Expand Down Expand Up @@ -248,9 +248,9 @@ std::pair<ExecutionState, size_t> ExecutionBlock::traceSkipSomeEnd(
auto const queryId = this->_engine->getQuery()->id();
LOG_TOPIC("d1950", INFO, Logger::QUERIES)
<< "[query#" << queryId << "] "
<< "skipSome done type=" << node->getTypeString() << " this=" << (uintptr_t)this
<< " id=" << node->id() << " state=" << stateToString(state)
<< " skipped=" << skipped;
<< "skipSome done type=" << node->getTypeString()
<< " this=" << (uintptr_t)this << " id=" << node->id()
<< " state=" << stateToString(state) << " skipped=" << skipped;
}
}
return res;
Expand Down
82 changes: 52 additions & 30 deletions arangod/Aql/RemoteExecutor.cpp
8000 < 57D8 /tr>
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,7 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::shutdown(i

if (!_hasTriggeredShutdown) {
std::lock_guard<std::mutex> guard(_communicationMutex);
_lastTicket = 0;
_lastError.reset(TRI_ERROR_NO_ERROR);
_lastResponse.reset();
std::ignore = generateNewTicket();
_hasTriggeredShutdown = true;
}
if (_lastError.fail()) {
Expand Down Expand Up @@ -351,17 +349,22 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::shutdown(i

return {ExecutionState::DONE, TRI_ERROR_INTERNAL};
}
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
TRI_ASSERT(_didSendShutdownRequest == false);

// Already sent a shutdown request, but haven't got an answer yet.
if (_didSendShutdownRequest) {
return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add. traceShutdown here as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the code I had locally, I just added the tracing of actual remote requests, not the tracing of ::shutdown() calls - I thought that'd be enough in most of the cases where shutdown is of interest at all, and doesn't clutter the log as much.

}
_didSendShutdownRequest = true;
#endif

// For every call we simply forward via HTTP
VPackBuffer<uint8_t> buffer;
VPackBuilder builder(buffer);
builder.openObject(/*unindexed*/ true);
builder.add("code", VPackValue(errorCode));
builder.close();

traceShutdownRequest(builder.slice(), errorCode);

auto res = sendAsyncRequest(fuerte::RestVerb::Put, "/_api/aql/shutdown/",
std::move(buffer));
if (!res.ok()) {
Expand Down Expand Up @@ -413,17 +416,18 @@ Result handleErrorResponse(network::EndpointSpec const& spec, fuerte::Error err,
Result ExecutionBlockImpl<RemoteExecutor>::sendAsyncRequest(fuerte::RestVerb type,
std::string const& urlPart,
VPackBuffer<uint8_t> body) {
NetworkFeature const& nf = _engine->getQuery()->vocbase().server().getFeature<NetworkFeature>();
NetworkFeature const& nf =
_engine->getQuery()->vocbase().server().getFeature<NetworkFeature>();
network::ConnectionPool* pool = nf.pool();
if (!pool) {
// nullptr only happens on controlled shutdown
return {TRI_ERROR_SHUTTING_DOWN};
}

std::string url = std::string("/_db/") +
arangodb::basics::StringUtils::urlEncode(
_engine->getQuery()->vocbase().name()) +
urlPart + _queryId;
std::string url =
std::string("/_db/") +
arangodb::basics::StringUtils::urlEncode(_engine->getQuery()->vocbase().name()) +
urlPart + _queryId;

arangodb::network::EndpointSpec spec;
int res = network::resolveDestination(nf, _server, spec);
Expand All @@ -442,46 +446,64 @@ Result ExecutionBlockImpl<RemoteExecutor>::sendAsyncRequest(fuerte::RestVerb typ
network::ConnectionPool::Ref ref = pool->leaseConnection(spec.endpoint);

std::lock_guard<std::mutex> guard(_communicationMutex);
unsigned ticket = ++_lastTicket;
auto ticket = generateNewTicket();
std::shared_ptr<fuerte::Connection> conn = ref.connection();
conn->sendRequest(std::move(req),
[=, ref(std::move(ref))](fuerte::Error err,
std::unique_ptr<fuerte::Request>,
std::unique_ptr<fuerte::Response> res) {
_query.sharedState()->execute([&] { // notifies outside
std::lock_guard<std::mutex> guard(_communicationMutex);
< 8000 span class='blob-code-inner blob-code-marker js-skip-tagsearch' data-code-marker="-"> if (_lastTicket == ticket) {
if (err != fuerte::Error::NoError || res->statusCode() >= 400) {
_lastError = handleErrorResponse(spec, err, res.get());
} else {
_lastResponse = std::move(res);
}
std::lock_guard<std::mutex> guard(_communicationMutex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I do not get the need for the extra mutex here, an atomic _lastTicket should be sufficient so synchronize access to _lastError and _lastResponse ?

Copy link
Member Author
@goedderz goedderz Oct 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that we do not only have to synchronize a request with its answer, but also protect an answer with another request; e.g., RemoteExecutor sends a getSome/skipSome request, then a shutdown happens (e.g. due to a timeout) at the same time when the answer arrives. Then, this code in ExecutionBlockImpl<RemoteExecutor>::shutdown

if (!_hasTriggeredShutdown) {
  std::lock_guard<std::mutex> guard(_communicationMutex);
  std::ignore = generateNewTicket();
  _hasTriggeredShutdown = true;
}

which resets _lastTicket, _lastError and _lastResponse, races with the lambda that's called with the answer:

[=, ref(std::move(ref))](fuerte::Error err,
                         std::unique_ptr<fuerte::Request>,
                         std::unique_ptr<fuerte::Response> res) {
  std::lock_guard<std::mutex> guard(_communicationMutex);

  if (_lastTicket == ticket) {
    if (err != fuerte::Error::NoError || res->statusCode() >= 400) {
      _lastError = handleErrorResponse(spec, err, res.get());
    } else {
      _lastResponse = std::move(res);
    }
    _query.sharedState()->execute();
  }
}

which reads _lastTicket and then sets _lastError or _lastResponse. I do not see how that should be correctly synchronized with just an atomic _lastTicket.


if (_lastTicket == ticket) {
if (err != fuerte::Error::NoError || res- 8000 >statusCode() >= 400) {
_lastError = handleErrorResponse(spec, err, res.get());
} else {
_lastResponse = std::move(res);
}
});
_query.sharedState()->execute();
}
});

++_engine->_stats.requests;

return {TRI_ERROR_NO_ERROR};
}

void ExecutionBlockImpl<RemoteExecutor>::traceGetSomeRequest(
VPackSlice slice, size_t const atMost) {
traceRequest("getSome", slice, atMost);
void ExecutionBlockImpl<RemoteExecutor>::traceGetSomeRequest(VPackSlice const slice,
size_t const atMost) {
using namespace std::string_literals;
traceRequest("getSome", slice, "atMost="s + std::to_string(atMost));
}

void ExecutionBlockImpl<RemoteExecutor>::traceSkipSomeRequest(
VPackSlice slice, size_t const atMost) {
traceRequest("skipSome", slice, atMost);
void ExecutionBlockImpl<RemoteExecutor>::traceSkipSomeRequest(VPackSlice const slice,
size_t const atMost) {
using namespace std::string_literals;
traceRequest("skipSome", slice, "atMost="s + std::to_string(atMost));
}

void ExecutionBlockImpl<RemoteExecutor>::traceRequest(
const char* rpc, VPackSlice slice, size_t atMost) {
void ExecutionBlockImpl<RemoteExecutor>::traceShutdownRequest(VPackSlice const slice,
int const errorCode) {
using namespace std::string_literals;
traceRequest("shutdown", slice, "errorCode="s + std::to_string(errorCode));
}

void ExecutionBlockImpl<RemoteExecutor>::traceRequest(char const* const rpc,
VPackSlice const slice,
std::string const& args) {
if (_profile >= PROFILE_LEVEL_TRACE_1) {
auto const queryId = this->_engine->getQuery()->id();
auto const remoteQueryId = _queryId;
LOG_TOPIC("92c71", INFO, Logger::QUERIES)
<< "[query#" << queryId << "] remote request sent: " << rpc
<< " atMost=" << atMost << " registryId=" << remoteQueryId;
<< (args.empty() ? "" : " ") << args << " registryId=" << remoteQueryId;
}
}

unsigned ExecutionBlockImpl<RemoteExecutor>::generateNewTicket() {
// Assumes that _communicationMutex is locked in the caller
++_lastTicket;
_lastError.reset(TRI_ERROR_NO_ERROR);
_lastResponse.reset();

return _lastTicket;
}
8 changes: 5 additions & 3 deletions arangod/Aql/RemoteExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,16 @@ class ExecutionBlockImpl<RemoteExecutor> : public ExecutionBlock {
unsigned _lastTicket; /// used to check for canceled requests

bool _hasTriggeredShutdown;

// _communicationMutex *must* be locked for this!
unsigned generateNewTicket();

#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
bool _didSendShutdownRequest = false;
#endif

void traceGetSomeRequest(velocypack::Slice slice, size_t atMost);
void traceSkipSomeRequest(velocypack::Slice slice, size_t atMost);
void traceRequest(const char* rpc, velocypack::Slice slice, size_t atMost);
void traceShutdownRequest(velocypack::Slice slice, int errorCode);
void traceRequest(const char* rpc, velocypack::Slice slice, std::string const& args);
};

} // namespace aql
Expand Down
18 changes: 18 additions & 0 deletions arangod/Aql/SharedQueryState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,21 @@ bool SharedQueryState::executeContinueCallback() const {
// then backs up libcurl callbacks to other objects
return scheduler->queue(RequestLane::CLIENT_AQL, _continueCallback);
}

bool SharedQueryState::execute() {
std::lock_guard<std::mutex> guard(_mutex);
if (!_valid) {
return false;
}

if (_hasHandler) {
if (ADB_UNLIKELY(!executeContinueCallback())) {
return false; // likely shutting down
}
} else {
_wasNotified = true;
// simon: bad experience on macOS guard.unlock();
_condition.notify_one();
}
return true;
}
20 changes: 1 addition & 19 deletions arangod/Aql/SharedQueryState.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,7 @@ class SharedQueryState {
/// This will lead to the following: The original request that led to
/// the network communication will be rescheduled on the ioservice and
/// continues its execution where it left off.
template <typename F>
bool execute(F&& cb) {
std::lock_guard<std::mutex> guard(_mutex);
if (!_valid) {
return false;
}

std::forward<F>(cb)();
if (_hasHandler) {
if (ADB_UNLIKELY(!executeContinueCallback())) {
return false; // likely shutting down
}
} else {
_wasNotified = true;
// simon: bad experience on macOS guard.unloack();
_condition.notify_one();
}
return true;
}
bool execute();

/// this has to stay for a backwards-compatible AQL HTTP API (hasMore).
void waitForAsyncResponse();
Expand Down
0