8000 Feature/parallel aql phase one 2 by jsteemann · Pull Request #10408 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Feature/parallel aql phase one 2 #10408

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Nov 12, 2019
Prev Previous commit
remove debug logging, added assertions
  • Loading branch information
jsteemann committed Nov 12, 2019
commit 95f207e53abd771d617ba6c42f2102b3cb28c054
2 changes: 0 additions & 2 deletions arangod/Aql/RemoteExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,10 +514,8 @@ Result ExecutionBlockImpl<RemoteExecutor>::sendAsyncRequest(fuerte::RestVerb typ
_lastResponse = std::move(res);
}
_requestInFlight = false;
// LOG_DEVEL << "notifying " << sqs.get();
return true;
}
// LOG_DEVEL << "skipping " << sqs.get();
return false;
});
});
Expand Down
14 changes: 1 addition & 13 deletions arangod/Aql/SharedQueryState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ void SharedQueryState::invalidate() {

/// this has to stay for a backwards-compatible AQL HTTP API (hasMore).
void SharedQueryState::waitForAsyncWakeup() {
// LOG_DEVEL << "waiting " << _numWakeups << " " << this;

std::unique_lock<std::mutex> guard(_mutex);
if (!_valid) {
return;
Expand All @@ -52,8 +50,6 @@ void SharedQueryState::waitForAsyncWakeup() {
_cv.wait(guard, [&] { return _numWakeups > 0 || !_valid; });
TRI_ASSERT(_numWakeups > 0 || !_valid);
_numWakeups--;

// LOG_DEVEL << "wakeup " << _numWakeups << " " << this;;
}

/// @brief setter for the continue handler:
Expand All @@ -63,14 +59,10 @@ void SharedQueryState::setWakeupHandler(std::function<bool()> const& cb) {
_wakeupCb = cb;
_numWakeups = 0;
_cbVersion++;
LOG_DEVEL << "setting wakeup handler " << this;
}

void SharedQueryState::resetWakeupHandler() {
std::lock_guard<std::mutex> guard(_mutex);
if (_wakeupCb) {
LOG_DEVEL << "resetting wakeup handler " << this;
}
_wakeupCb = nullptr;
_numWakeups = 0;
_cbVersion++;
Expand All @@ -82,13 +74,11 @@ void SharedQueryState::execute() {
uint32_t n = _numWakeups++;

if (!_wakeupCb) {
LOG_DEVEL << "notify_one() " << this;
_cv.notify_one();
return;
}

if (n > 0) {
LOG_DEVEL << "other handler already running " << this;
return;
}

Expand All @@ -98,8 +88,6 @@ void SharedQueryState::execute() {
void SharedQueryState::queueHandler() {

if (_numWakeups == 0 || !_wakeupCb || !_valid) {
bool c = !_wakeupCb;
LOG_DEVEL << "queueHandler leave, numWakeups" << _numWakeups << " wakeup cb: " << (!c) << " valid: " << _valid << " " << this;
return;
}

Expand Down Expand Up @@ -129,11 +117,11 @@ void SharedQueryState::queueHandler() {
lck.lock();
if (v == self->_cbVersion) {
uint32_t c = self->_numWakeups--;
TRI_ASSERT(c > 0);
if (c == 1 || !cntn || !self->_valid) {
break;
}
} else {
LOG_DEVEL << "outdated handler";
return;
}
lck.unlock();
Expand Down
0