8000 Fix cancel cursor via Job API (#12441) · adamjm/arangodb@2cf3536 · GitHub
[go: up one dir, main page]

Skip to content

Commit 2cf3536

Browse files
authored
Fix cancel cursor via Job API (arangodb#12441)
1 parent d3eafe2 commit 2cf3536

File tree

6 files changed

+150
-5
lines changed

6 files changed

+150
-5
lines changed

arangod/Aql/Query.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1290,8 +1290,8 @@ aql::ExecutionState Query::cleanupTrxAndEngines(int errorCode) {
12901290
TRI_ASSERT(_sharedState);
12911291

12921292
::finishDBServerParts(*this, errorCode).thenValue([ss = _sharedState, this](Result r) {
1293-
LOG_TOPIC_IF("fd31e", INFO, Logger::QUERIES, r.fail())
1294-
<< "received error from DBServer on query finalization: '" << r.errorMessage() << "'";
1293+
LOG_TOPIC_IF("fd31e", INFO, Logger::QUERIES, r.fail() && r.isNot(TRI_ERROR_HTTP_NOT_FOUND))
1294+
<< "received error from DBServer on query finalization: " << r.errorNumber() << ", '" << r.errorMessage() << "'";
12951295
_sharedState->executeAndWakeup([&] {
12961296
_shutdownState.store(ShutdownState::Done, std::memory_order_relaxed);
12971297
return true;

arangod/Aql/RestAqlHandler.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,11 @@ void RestAqlHandler::setupClusterQuery() {
252252
// DELETE method for /_api/aql/kill/<queryId>, (internal)
253253
// simon: only used for <= 3.6
254254
bool RestAqlHandler::killQuery(std::string const& idString) {
255-
_qId = arangodb::basics::StringUtils::uint64(idString);
256-
return _queryRegistry->destroyEngine(_qId, TRI_ERROR_QUERY_KILLED);
255+
auto qid = arangodb::basics::StringUtils::uint64(idString);
256+
if (qid != 0) {
257+
return _queryRegistry->destroyEngine(qid, TRI_ERROR_QUERY_KILLED);
258+
}
259+
return false;
257260
}
258261

259262
// PUT method for /_api/aql/<operation>/<queryId>, (internal)

arangod/GeneralServer/AsyncJobManager.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,9 @@ Result AsyncJobManager::cancelJob(AsyncJobResult::IdType jobId) {
189189
if (handler != nullptr) {
190190
handler->cancel();
191191
}
192+
193+
// simon: handlers running async tasks use shared_ptr to keep alive
194+
it->second.second._handler = nullptr;
192195

193196
return rv;
194197
}

arangod/RestHandler/RestCursorHandler.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ RestStatus RestCursorHandler::execute() {
8484
}
8585

8686
RestStatus RestCursorHandler::continueExecute() {
87+
if (wasCanceled()) {
88+
generateError(rest::ResponseCode::GONE, TRI_ERROR_QUERY_KILLED);
89+
return RestStatus::DONE;
90+
}
91+
8792
// extract the sub-request type
8893
rest::RequestType const type = _request->requestType();
8994

tests/js/client/communication/test-kill.js

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ function KillSuite () {
213213
db._drop("UnitTestsTemp");
214214
},
215215

216-
testWorkInParallel: function () {
216+
testKillInParallel: function () {
217217
let queryCode = `
218218
let ERRORS = require('@arangodb').errors;
219219
try {
@@ -257,6 +257,90 @@ queries.current().forEach(function(query) {
257257
runTests(tests, 90);
258258
},
259259

260+
testCancelInParallel: function () {
261+
let queryCode = `
262+
let result = arango.POST_RAW("/_api/cursor", {
263+
query: "/*test*/ FOR doc IN UnitTestsTemp RETURN doc.value",
264+
}, { "x-arango-async": "store" });
265+
266+
if (result.code !== 202) {
267+
throw "invalid query return code: " + result.code;
268+
}
269+
let jobId = result.headers["x-arango-async-id"];
270+
let id = null;
271+
while (id === null) {
272+
result = arango.PUT_RAW("/_api/job/" + encodeURIComponent(jobId), {});
273+
if (result.code === 410 || result.code === 404) {
274+
// killed
275+
break;
276+
} else if (result.code >= 200 && result.code <= 202) {
277+
id = result.parsedBody.id;
278+
break;
279+
}
280+
require("internal").sleep(0.1);
281+
}
282+
while (id !== null) {
283+
result = arango.PUT_RAW("/_api/cursor/" + encodeURIComponent(id), {});
284+
if (result.code === 410) {
285+
// killed
286+
break;
287+
} else if (result.code >= 200 && result.code <= 204) {
288+
if (!result.parsedBody.hasMore) {
289+
break;
290+
}
291+
id = result.parsedBody.id;
292+
} else {
293+
throw "peng! " + JSON.stringify(result.parsedBody);
294+
}
295+
}
296+
`;
297+
298+
let cancelCode = `
299+
let result = arango.GET("/_api/job/pending");
300+
result.forEach(function(jobId) {
301+
arango.DELETE("/_api/job/" + encodeURIComponent(jobId), {});
302+
});
303+
require("internal").sleep(0.01 + Math.random() * 0.09);
304+
`;
305+
let tests = [
306+
[ 'aql-1', queryCode ],
307+
[ 'aql-2', queryCode ],
308+
[ 'cancel-1', cancelCode ],
309+
[ 'cancel-2', cancelCode ],
310+
];
311+
312+
// run the suite for a while...
313+
runTests(tests, 30);
314+
315+
// finally kill off all remaining pending jobs
316+
let tries = 0;
317+
while (tries++ < 60) {
318+
let result = arango.GET("/_api/job/pending");
319+
if (result.length === 0) {
320+
break;
321+
}
322+
result.forEach(function(jobId) {
323+
arango.DELETE("/_api/job/" + encodeURIComponent(jobId), {});
324+
});
325+
require("internal").sleep(1.0);
326+
}
327+
328+
// sleep again to make sure every killed query had a chance to finish
329+
require("internal").sleep(3.0);
330+
let killed = 0;
331+
let queries = require("@arangodb/aql/queries").current().forEach(function(query) {
332+
if (!query.query.match(/\/\*test-\d+\*\//)) {
333+
return;
334+
}
335+
if (query.state.toLowerCase() === 'killed') {
336+
++killed;
337+
}
338+
});
339+
if (killed > 0) {
340+
throw "got " + killed + " queries in state killed, but expected none!";
341+
}
342+
},
343+
260344
};
261345
}
262346

tests/js/client/shell/shell-aql-kill.js

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,56 @@ function aqlKillSuite () {
132132
assertEqual(410, result.code);
133133
},
134134

135+
// test killing the query via the Job API
136+
testCancelWriteQuery: function () {
137+
let result = arango.POST_RAW("/_api/cursor", {
138+
query: "FOR i IN 1..10000000 INSERT {} INTO " + cn
139+
}, {
140+
"x-arango-async" : "store"
141+
});
142+
143+
let jobId = result.headers["x-arango-async-id"];
144+
145+
let queryId = 0;
146+
let tries = 0;
147+
while (++tries < 30) {
148+
let queries = require("@arangodb/aql/queries").current();
149+
queries.filter(function(data) {
150+
if (data.query.indexOf(cn) !== -1) {
151+
queryId = data.id;
152+
}
153+
});
154+
if (queryId > 0) {
155+
break;
156+
}
157+
158+
require("internal").wait(1, false);
159+
}
160+
161+
assertTrue(queryId > 0);
162+
163+
// cancel the async job
164+
165+
result = arango.PUT_RAW("/_api/job/" + jobId + "/cancel", {});
166+
assertEqual(result.code, 200);
167+
168+
// make sure the query is no longer in the list of running queries
169+
tries = 0;
170+
while (++tries < 30) {
171+
let queries = require("@arangodb/aql/queries").current();
172+
let stillThere = false;
173+
queries.filter(function(data) {
174+
if (data.id === queryId) {
175+
stillThere = true;
176+
}
177+
});
178+
if (!stillThere) {
179+
break;
180+
}
181+
require("internal").wait(1, false);
182+
}
183+
assertTrue(tries < 30);
184+
},
135185
};
136186
}
137187

0 commit comments

Comments
 (0)
0