8000 abort write transactions (#10248) · arangodb/arangodb@b124113 · GitHub
[go: up one dir, main page]

Skip to content

Commit b124113

Browse files
authored
abort write transactions (#10248)
1 parent 17acc94 commit b124113

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+725
-184
lines changed

arangod/Aql/DistributeExecutor.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "Aql/ClusterNodes.h"
2626
#include "Aql/Collection.h"
2727
#include "Aql/ExecutionEngine.h"
28+
#include "Aql/Query.h"
2829
#include "Aql/RegisterPlan.h"
2930
#include "Basics/StaticStrings.h"
3031
#include "VocBase/LogicalCollection.h"
@@ -78,6 +79,9 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<DistributeEx
7879

7980
std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<DistributeExecutor>::getSomeForShardWithoutTrace(
8081
size_t atMost, std::string const& shardId) {
82+
if (getQuery().killed()) {
83+
THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED);
84+
}
8185
// NOTE: We do not need to retain these, the getOrSkipSome is required to!
8286
size_t skipped = 0;
8387
SharedAqlItemBlockPtr result = nullptr;
@@ -101,6 +105,9 @@ std::pair<ExecutionState, size_t> ExecutionBlockImpl<DistributeExecutor>::skipSo
101105

102106
std::pair<ExecutionState, size_t> ExecutionBlockImpl<DistributeExecutor>::skipSomeForShardWithoutTrace(
103107
size_t atMost, std::string const& shardId) {
108+
if (getQuery().killed()) {
109+
THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED);
110+
}
104111
// NOTE: We do not need to retain these, the getOrSkipSome is required to!
105112
size_t skipped = 0;
106113
SharedAqlItemBlockPtr result = nullptr;
@@ -214,6 +221,7 @@ bool ExecutionBlockImpl<DistributeExecutor>::hasMoreForClientId(size_t clientId)
214221
/// current one.
215222
std::pair<ExecutionState, bool> ExecutionBlockImpl<DistributeExecutor>::getBlockForClient(
216223
size_t atMost, size_t clientId) {
224+
217225
if (_buffer.empty()) {
218226
_index = 0; // position in _buffer
219227
_pos = 0; // position in _buffer.at(_index)
@@ -224,6 +232,9 @@ std::pair<ExecutionState, bool> ExecutionBlockImpl<DistributeExecutor>::getBlock
224232

225233
while (buf.size() < atMost) {
226234
if (_index == _buffer.size()) {
235+
if (getQuery().killed()) {
236+
THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED);
237+
}
227238
auto res = getBlock(atMost);
228239
if (res.first == ExecutionState::WAITING) {
229240
return {res.first, false};
@@ -262,6 +273,10 @@ std::pair<ExecutionState, bool> ExecutionBlockImpl<DistributeExecutor>::getBlock
262273
/// attributes <shardKeys> of the Aql value <val> to determine to which shard
263274
/// the row should be sent and return its clientId
264275
size_t ExecutionBlockImpl<DistributeExecutor>::sendToClient(SharedAqlItemBlockPtr cur) {
276+
if (getQuery().killed()) {
277+
THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED);
278+
}
279+
265280
// inspect cur in row _pos and check to which shard it should be sent . .
266281
AqlValue val = cur->getValueReference(_pos, _regId);
267282

@@ -363,6 +378,8 @@ size_t ExecutionBlockImpl<DistributeExecutor>::sendToClient(SharedAqlItemBlockPt
363378
return getClientId(shardId);
364379
}
365380

381+
Query const& ExecutionBlockImpl<DistributeExecutor>::getQuery() const noexcept { return _query; }
382+
366383
/// @brief create a new document key
367384< 83CB div class="diff-text-inner">std::string ExecutionBlockImpl<DistributeExecutor>::createKey(VPackSlice input) const {
368385
return _logCol->createKey(input);

arangod/Aql/DistributeExecutor.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ class DistributeNode;
3636
// ExecutionBlockImpl, so this class only exists to identify the specialization.
3737
class DistributeExecutor {};
3838

39+
class Query;
40+
3941
/**
4042
* @brief See ExecutionBlockImpl.h for documentation.
4143
*/
@@ -97,6 +99,8 @@ class ExecutionBlockImpl<DistributeExecutor> : public BlocksWithClients {
9799
std::string createKey(arangodb::velocypack::Slice) const;
98100

99101
ExecutorInfos const& infos() const;
102+
103+
Query const& getQuery() const noexcept;
100104

101105
private:
102106
ExecutorInfos _infos;

arangod/Aql/EngineInfoContainerCoordinator.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,11 @@ QueryId EngineInfoContainerCoordinator::closeSnippet() {
145145
ExecutionEngineResult EngineInfoContainerCoordinator::buildEngines(
146146
Query& query, QueryRegistry* registry, std::string const& dbname,
147147
std::unordered_set<std::string> const& restrictToShards,
148-
MapRemoteToSnippet const& dbServerQueryIds) const {
148+
MapRemoteToSnippet const& dbServerQueryIds,
149+
std::vector<uint64_t>& coordinatorQueryIds) const {
149150
TRI_ASSERT(_engineStack.size() == 1);
150151
TRI_ASSERT(_engineStack.top() == 0);
151152

152-
std::vector<uint64_t> coordinatorQueryIds{};
153153
// destroy all query snippets in case of error
154154
auto guard = scopeGuard([&dbname, &registry, &coordinatorQueryIds]() {
155155
for (auto const& it : coordinatorQueryIds) {

arangod/Aql/EngineInfoContainerCoordinator.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ class EngineInfoContainerCoordinator {
100100
ExecutionEngineResult buildEngines(Query& query, QueryRegistry* registry,
101101
std::string const& dbname,
102102
std::unordered_set<std::string> const& restrictToShards,
103-
MapRemoteToSnippet const& dbServerQueryIds) const;
103+
MapRemoteToSnippet const& dbServerQueryIds,
104+
std::vector<uint64_t>& coordinatorQueryIds) const;
104105

105106
private:
106107
// @brief List of EngineInfos to distribute accross the cluster

arangod/Aql/EngineInfoContainerDBServerServerBased.cpp

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -193,22 +193,22 @@ EngineInfoContainerDBServerServerBased::EngineInfoContainerDBServerServerBased(Q
193193
// NOTE: We need to start with _lastSnippetID > 0. 0 is reserved for GraphNodes
194194
}
195195

196-
void EngineInfoContainerDBServerServerBased::injectVertexColletions(GraphNode* graphNode){
197-
auto const& vCols = graphNode->vertexColls();
198-
if (vCols.empty()) {
199-
std::map<std::string, Collection*> const* allCollections =
200-
_query.collections()->collections();
201-
auto& resolver = _query.resolver();
202-
for (auto const& it : *allCollections) {
203-
// If resolver cannot resolve this collection
204-
// it has to be a view.
205-
if (!resolver.getCollection(it.first)) {
206-
continue;
207-
}
208-
// All known edge collections will be ignored by this call!
209-
graphNode->injectVertexCollection(it.second);
196+
void EngineInfoContainerDBServerServerBased::injectVertexColletions(GraphNode* graphNode) {
197+
auto const& vCols = graphNode->vertexColls();
198+
if (vCols.empty()) {
199+
std::map<std::string, Collection*> const* allCollections =
200+
_query.collections()->collections();
201+
auto& resolver = _query.resolver();
202+
for (auto const& it : *allCollections) {
203+
// If resolver cannot resolve this collection
204+
// it has to be a view.
205+
if (!resolver.getCollection(it.first)) {
206+
continue;
210207
}
208+
// All known edge collections will be ignored by this call!
209+
graphNode->injectVertexCollection(it.second);
211210
}
211+
}
212212
}
213213

214214
// Insert a new node into the last engine on the stack

arangod/Aql/ExecutionEngine.cpp

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,13 @@
4040
#include "Aql/WalkerWorker.h"
4141
#include "Basics/ScopeGuard.h"
4242
#include "Cluster/ServerState.h"
43+
#include "Futures/Utilities.h"
4344
#include "Logger/Logger.h"
45+
#include "Logger/LogMacros.h"
46+
#include "Network/Methods.h"
4447
#include "Network/NetworkFeature.h"
48+
#include "Network/Utils.h"
49+
#include "RestServer/QueryRegistryFeature.h"
4550

4651
using namespace arangodb;
4752
using namespace arangodb::aql;
@@ -473,6 +478,7 @@ struct DistributedQueryInstanciator final : public WalkerWorker<ExecutionNode> {
473478
_dbserverParts.cleanupEngines(pool, TRI_ERROR_INTERNAL,
474479
_query.vocbase().name(), queryIds);
475480
});
481+
476482
std::unordered_map<size_t, size_t> nodeAliases;
477483
ExecutionEngineResult res = _dbserverParts.buildEngines(queryIds, nodeAliases);
478484
if (res.fail()) {
@@ -481,21 +487,69 @@ struct DistributedQueryInstanciator final : public WalkerWorker<ExecutionNode> {
481487

482488
// The coordinator engines cannot decide on lock issues later on,
483489
// however every engine gets injected the list of locked shards.
490+
std::vector<uint64_t> coordinatorQueryIds{};
484491
res = _coordinatorParts.buildEngines(_query, registry, _query.vocbase().name(),
485-
_query.queryOptions().shardIds, queryIds);
492+
_query.queryOptions().shardIds, queryIds,
493+
coordinatorQueryIds);
486494

487495
if (res.ok()) {
488496
TRI_ASSERT(_query.engine() != nullptr);
489497
_query.engine()->_stats.addAliases(std::move(nodeAliases));
490498
cleanupGuard.cancel();
491499
}
492500

501+
_query.engine()->snippetMapping(std::move(queryIds), std::move(coordinatorQueryIds));
502+
493503
return res;
494504
}
495505
};
496506

507+
void ExecutionEngine::kill() {
508+
// kill coordinator parts
509+
// TODO: this doesn't seem to be necessary and sometimes even show adverse effects
510+
// so leaving this deactivated for now
511+
// auto queryRegistry = QueryRegistryFeature::registry();
512+
// if (queryRegistry != nullptr) {
513+
// for (auto const& id : _coordinatorQueryIds) {
514+
// queryRegistry->kill(&(_query.vocbase()), id);
515+
// }
516+
// }
517+
518+
// kill DB server parts
519+
// RemoteNodeId -> DBServerId -> [snippetId]
520+
NetworkFeature const& nf = _query.vocbase().server().getFeature<NetworkFeature>();
521+
network::ConnectionPool* pool = nf.pool();
522+
if (pool == nullptr) {
523+
return;
524+
}
525+
526+
VPackBuffer<uint8_t> body;
527+
std::vector<network::FutureRes> futures;
528+
529+
for (auto const& it : _dbServerMapping) {
530+
for (auto const& it2 : it.second) {
531+
for (auto const& snippetId : it2.second) {
532+
network::Headers headers;
533+
TRI_ASSERT(it2.first.substr(0, 7) == "server:");
534+
auto future = network::sendRequest(pool, it2.first, fuerte::RestVerb::Delete,
535+
"/_api/aql/kill/" + snippetId, body, std::move(headers));
536+
futures.emplace_back(std::move(future));
537+
}
538+
}
539+
}
540+
541+
if (!futures.empty()) {
542+
// killing is best-effort
543+
// we are ignoring all errors intentionally here
544+
futures::collectAll(futures).get();
545+
}
546+
}
547+
497548
std::pair<ExecutionState, Result> ExecutionEngine::initializeCursor(SharedAqlItemBlockPtr&& items,
498549
size_t pos) {
550+
if (_query.killed()) {
551+
THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED);
552+
}
499553
InputAqlItemRow inputRow{CreateInvalidInputRowHint{}};
500554
if (items != nullptr) {
501555
inputRow = InputAqlItemRow{std::move(items), pos};
@@ -509,6 +563,9 @@ std::pair<ExecutionState, Result> ExecutionEngine::initializeCursor(SharedAqlIte
509563
}
510564

511565
std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionEngine::getSome(size_t atMost) {
566+
if (_query.killed()) {
567+
THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED);
568+
}
512569
if (!_initializeCursorCalled) {
513570
auto res = initializeCursor(nullptr, 0);
514571
if (res.first == ExecutionState::WAITING) {
@@ -519,6 +576,9 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionEngine::getSome(size_t
519576
}
520577

521578
std::pair<ExecutionState, size_t> ExecutionEngine::skipSome(size_t atMost) {
579+
if (_query.killed()) {
580+
THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED);
581+
}
522582
if (!_initializeCursorCalled) {
523583
auto res = initializeCursor(nullptr, 0);
524584
if (res.first == ExecutionState::WAITING) {

arangod/Aql/ExecutionEngine.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,16 @@ class ExecutionEngine {
7272

7373
/// @brief get the query
7474
TEST_VIRTUAL Query* getQuery() const;
75+
76+
/// @brief server to snippet mapping
77+
void snippetMapping(MapRemoteToSnippet&& dbServerMapping,
78+
std::vector<uint64_t>&& coordinatorQueryIds) {
79+
_dbServerMapping = std::move(dbServerMapping);
80+
_coordinatorQueryIds = std::move(coordinatorQueryIds);
81+
}
82+
83+
/// @brief kill the query
84+
void kill();
7585

7686
/// @brief initializeCursor, could be called multiple times
7787
std::pair<ExecutionState, Result> initializeCursor(SharedAqlItemBlockPtr&& items, size_t pos);
@@ -132,6 +142,12 @@ class ExecutionEngine {
132142

133143
/// @brief whether or not shutdown() was executed
134144
bool _wasShutdown;
145+
146+
/// @brief server to snippet mapping
147+
MapRemoteToSnippet _dbServerMapping;
148+
149+
/// @brief ids of all coordinator query snippets
150+
std::vector<uint64_t> _coordinatorQueryIds;
135151
};
136152
} // namespace aql
137153
} // namespace arangodb

arangod/Aql/GraphNode.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -418,9 +418,9 @@ GraphNode::GraphNode(ExecutionPlan* plan, size_t id, TRI_vocbase_t* vocbase,
418418
GraphNode::~GraphNode() = default;
419419

420420
std::string const& GraphNode::collectionToShardName(std::string const& collName) const {
421-
if(_collectionToShard.empty()){
421+
if (_collectionToShard.empty()) {
422422
return collName;
423-
};
423+
}
424424

425425
auto found = _collectionToShard.find(collName);
426426
TRI_ASSERT(found != _collectionToShard.cend());

arangod/Aql/ModificationExecutor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ ModificationExecutor<Modifier, FetcherType>::produceRows(OutputAqlItemRow& outpu
9494
TRI_IF_FAILURE("ModificationBlock::getSome") {
9595
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
9696
}
97-
97+
9898
TRI_ASSERT(_modifier._block != nullptr);
9999

100100
// prepares modifier for single row output

arangod/Aql/NoResultsExecutor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ constexpr bool NoResultsExecutor::Properties::preservesOrder;
3434
constexpr BlockPassthrough NoResultsExecutor::Properties::allowsBlockPassthrough;
3535
constexpr bool NoResultsExecutor::Properties::inputSizeRestrictsOutputSize;
3636

37-
NoResultsExecutor::NoResultsExecutor(Fetcher& fetcher, ExecutorInfos& infos){};
37+
NoResultsExecutor::NoResultsExecutor(Fetcher& fetcher, ExecutorInfos& infos) {}
3838
NoResultsExecutor::~NoResultsExecutor() = default;
3939

4040
std::pair<ExecutionState, NoStats> NoResultsExecutor::produceRows(OutputAqlItemRow& output) {

arangod/Aql/Query.cpp

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ Query::~Query() {
210210
<< " this: " << (uintptr_t)this;
211211
}
212212

213+
// this will reset _trx, so _trx is invalid after here
213214
cleanupPlanAndEngineSync(TRI_ERROR_INTERNAL);
214215

215216
exitContext();
@@ -275,7 +276,14 @@ Query* Query::clone(QueryPart part, bool withPlan) {
275276
}
276277

277278
/// @brief set the query to killed
278-
void Query::kill() { _killed = true; }
279+
void Query::kill() {
280+
_killed = true;
281+
if (_engine != nullptr) {
282+
// killing is best effort...
283+
// intentionally ignoring the result of this call here
284+
_engine->kill();
285+
}
286+
}
279287

280288
void Query::setExecutionTime() {
281289
if (_engine != nullptr) {
@@ -573,6 +581,10 @@ ExecutionState Query::execute(QueryRegistry* registry, QueryResult& queryResult)
573581
TRI_ASSERT(registry != nullptr);
574582

575583
try {
584+
if (_killed) {
585+
THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED);
586+
}
587+
576588
bool useQueryCache = canUseQueryCache();
577589

578590
switch (_executionPhase) {
@@ -898,6 +910,10 @@ ExecutionState Query::executeV8(v8::Isolate* isolate, QueryRegistry* registry,
898910
}
899911
}
900912
}
913+
914+
if (_killed) {
915+
THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED);
916+
}
901917
}
902918

903919
builder->close();
@@ -1428,6 +1444,13 @@ ExecutionState Query::cleanupPlanAndEngine(int errorCode, VPackBuilder* statsBui
14281444
_engine.reset();
14291445
}
14301446

1447+
// the following call removes the query from the list of currently
1448+
// running queries. so whoever fetches that list will not see a Query that
1449+
// is about to shut down/be destroyed
1450+
if (_profile != nullptr) {
1451+
_profile->unregisterFromQueryList();
1452+
}
1453+
14311454
// If the transaction was not committed, it is automatically aborted
14321455
_trx = nullptr;
14331456

arangod/Aql/Query.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,14 @@ class Query {
104104
/// @brief clone a query
105105
/// note: as a side-effect, this will also create and start a transaction for
106106
/// the query
107-
TEST_VIRTUAL Query* clone(QueryPart, bool);
107+
TEST_VIRTUAL Query* clone(QueryPart, bool withPlan);
108108

109109
constexpr static uint64_t DontCache = 0;
110110

111111
/// @brief whether or not the query is killed
112112
inline bool killed() const { return _killed; }
113+
114+
void setKilled() { _killed = true; }
113115

114116
/// @brief set the query to killed
115117
void kill();

0 commit comments

Comments
 (0)
0