-
Notifications
You must be signed in to change notification settings - Fork 858
Bug fix/fix remote executor races #10206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -289,9 +289,7 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::shutdown(i | |
|
||
if (!_hasTriggeredShutdown) { | ||
std::lock_guard<std::mutex> guard(_communicationMutex); | ||
_lastTicket = 0; | ||
_lastError.reset(TRI_ERROR_NO_ERROR); | ||
_lastResponse.reset(); | ||
std::ignore = generateNewTicket(); | ||
_hasTriggeredShutdown = true; | ||
} | ||
if (_lastError.fail()) { | ||
|
@@ -351,17 +349,22 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::shutdown(i | |
|
||
return {ExecutionState::DONE, TRI_ERROR_INTERNAL}; | ||
} | ||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE | ||
TRI_ASSERT(_didSendShutdownRequest == false); | ||
|
||
// Already sent a shutdown request, but haven't got an answer yet. | ||
if (_didSendShutdownRequest) { | ||
return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR}; | ||
} | ||
_didSendShutdownRequest = true; | ||
#endif | ||
|
||
// For every call we simply forward via HTTP | ||
VPackBuffer<uint8_t> buffer; | ||
VPackBuilder builder(buffer); | ||
builder.openObject(/*unindexed*/ true); | ||
builder.add("code", VPackValue(errorCode)); | ||
builder.close(); | ||
|
||
traceShutdownRequest(builder.slice(), errorCode); | ||
|
||
auto res = sendAsyncRequest(fuerte::RestVerb::Put, "/_api/aql/shutdown/", | ||
std::move(buffer)); | ||
if (!res.ok()) { | ||
|
@@ -413,17 +416,18 @@ Result handleErrorResponse(network::EndpointSpec const& spec, fuerte::Error err, | |
Result ExecutionBlockImpl<RemoteExecutor>::sendAsyncRequest(fuerte::RestVerb type, | ||
std::string const& urlPart, | ||
VPackBuffer<uint8_t> body) { | ||
NetworkFeature const& nf = _engine->getQuery()->vocbase().server().getFeature<NetworkFeature>(); | ||
NetworkFeature const& nf = | ||
_engine->getQuery()->vocbase().server().getFeature<NetworkFeature>(); | ||
network::ConnectionPool* pool = nf.pool(); | ||
if (!pool) { | ||
// nullptr only happens on controlled shutdown | ||
return {TRI_ERROR_SHUTTING_DOWN}; | ||
} | ||
|
||
std::string url = std::string("/_db/") + | ||
arangodb::basics::StringUtils::urlEncode( | ||
_engine->getQuery()->vocbase().name()) + | ||
urlPart + _queryId; | ||
std::string url = | ||
std::string("/_db/") + | ||
arangodb::basics::StringUtils::urlEncode(_engine->getQuery()->vocbase().name()) + | ||
urlPart + _queryId; | ||
|
||
arangodb::network::EndpointSpec spec; | ||
int res = network::resolveDestination(nf, _server, spec); | ||
|
@@ -442,46 +446,64 @@ Result ExecutionBlockImpl<RemoteExecutor>::sendAsyncRequest(fuerte::RestVerb typ | |
network::ConnectionPool::Ref ref = pool->leaseConnection(spec.endpoint); | ||
|
||
std::lock_guard<std::mutex> guard(_communicationMutex); | ||
unsigned ticket = ++_lastTicket; | ||
auto ticket = generateNewTicket(); | ||
std::shared_ptr<fuerte::Connection> conn = ref.connection(); | ||
conn->sendRequest(std::move(req), | ||
[=, ref(std::move(ref))](fuerte::Error err, | ||
std::unique_ptr<fuerte::Request>, | ||
std::unique_ptr<fuerte::Response> res) { | ||
_query.sharedState()->execute([&] { // notifies outside | ||
std::lock_guard<std::mutex> guard(_communicationMutex); | ||
< 8000 span class='blob-code-inner blob-code-marker js-skip-tagsearch' data-code-marker="-"> if (_lastTicket == ticket) { | ||
if (err != fuerte::Error::NoError || res->statusCode() >= 400) { | ||
_lastError = handleErrorResponse(spec, err, res.get()); | ||
} else { | ||
_lastResponse = std::move(res); | ||
} | ||
std::lock_guard<std::mutex> guard(_communicationMutex); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually I do not get the need for the extra mutex here, an atomic _lastTicket should be sufficient so synchronize access to _lastError and _lastResponse ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem is that we do not only have to synchronize a request with its answer, but also protect an answer with another request; e.g., RemoteExecutor sends a getSome/skipSome request, then a shutdown happens (e.g. due to a timeout) at the same time when the answer arrives. Then, this code in if (!_hasTriggeredShutdown) {
std::lock_guard<std::mutex> guard(_communicationMutex);
std::ignore = generateNewTicket();
_hasTriggeredShutdown = true;
} which resets [=, ref(std::move(ref))](fuerte::Error err,
std::unique_ptr<fuerte::Request>,
std::unique_ptr<fuerte::Response> res) {
std::lock_guard<std::mutex> guard(_communicationMutex);
if (_lastTicket == ticket) {
if (err != fuerte::Error::NoError || res->statusCode() >= 400) {
_lastError = handleErrorResponse(spec, err, res.get());
} else {
_lastResponse = std::move(res);
}
_query.sharedState()->execute();
}
} which reads |
||
|
||
if (_lastTicket == ticket) { | ||
if (err != fuerte::Error::NoError || res- 8000 >statusCode() >= 400) { | ||
_lastError = handleErrorResponse(spec, err, res.get()); | ||
} else { | ||
_lastResponse = std::move(res); | ||
} | ||
}); | ||
_query.sharedState()->execute(); | ||
} | ||
}); | ||
|
||
++_engine->_stats.requests; | ||
|
||
return {TRI_ERROR_NO_ERROR}; | ||
} | ||
|
||
void ExecutionBlockImpl<RemoteExecutor>::traceGetSomeRequest( | ||
VPackSlice slice, size_t const atMost) { | ||
traceRequest("getSome", slice, atMost); | ||
void ExecutionBlockImpl<RemoteExecutor>::traceGetSomeRequest(VPackSlice const slice, | ||
size_t const atMost) { | ||
using namespace std::string_literals; | ||
traceRequest("getSome", slice, "atMost="s + std::to_string(atMost)); | ||
} | ||
|
||
void ExecutionBlockImpl<RemoteExecutor>::traceSkipSomeRequest( | ||
VPackSlice slice, size_t const atMost) { | ||
traceRequest("skipSome", slice, atMost); | ||
void ExecutionBlockImpl<RemoteExecutor>::traceSkipSomeRequest(VPackSlice const slice, | ||
size_t const atMost) { | ||
using namespace std::string_literals; | ||
traceRequest("skipSome", slice, "atMost="s + std::to_string(atMost)); | ||
} | ||
|
||
void ExecutionBlockImpl<RemoteExecutor>::traceRequest( | ||
const char* rpc, VPackSlice slice, size_t atMost) { | ||
void ExecutionBlockImpl<RemoteExecutor>::traceShutdownRequest(VPackSlice const slice, | ||
int const errorCode) { | ||
using namespace std::string_literals; | ||
traceRequest("shutdown", slice, "errorCode="s + std::to_string(errorCode)); | ||
} | ||
|
||
void ExecutionBlockImpl<RemoteExecutor>::traceRequest(char const* const rpc, | ||
VPackSlice const slice, | ||
std::string const& args) { | ||
if (_profile >= PROFILE_LEVEL_TRACE_1) { | ||
auto const queryId = this->_engine->getQuery()->id(); | ||
auto const remoteQueryId = _queryId; | ||
LOG_TOPIC("92c71", INFO, Logger::QUERIES) | ||
<< "[query#" << queryId << "] remote request sent: " << rpc | ||
<< " atMost=" << atMost << " registryId=" << remoteQueryId; | ||
<< (args.empty() ? "" : " ") << args << " registryId=" << remoteQueryId; | ||
} | ||
} | ||
|
<
57D8
/tr>
||
unsigned ExecutionBlockImpl<RemoteExecutor>::generateNewTicket() { | ||
// Assumes that _communicationMutex is locked in the caller | ||
++_lastTicket; | ||
_lastError.reset(TRI_ERROR_NO_ERROR); | ||
_lastResponse.reset(); | ||
|
||
return _lastTicket; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to add. traceShutdown here as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the code I had locally, I just added the tracing of actual remote requests, not the tracing of
::shutdown()
calls - I thought that'd be enough in most of the cases where shutdown is of interest at all, and doesn't clutter the log as much.