8000 [CINFRA] Fix shutdown hanger (#20618) · 0ArtemBabaev/arangodb@0c95777 · GitHub
[go: up one dir, main page]

Skip to content

Commit 0c95777

Browse files
authored
[CINFRA] Fix shutdown hanger (arangodb#20618)
* Introducing replication callback * Adapting unit tests
1 parent 0b85636 commit 0c95777

18 files changed

+91
-91
lines changed

arangod/ClusterEngine/ClusterCollection.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,8 @@ bool ClusterCollection::cacheEnabled() const noexcept {
207207

208208
futures::Future<std::shared_ptr<Index>> ClusterCollection::createIndex(
209209
velocypack::Slice info, bool restore, bool& created,
210-
std::shared_ptr<std::function<arangodb::Result(double)>> progress) {
210+
std::shared_ptr<std::function<arangodb::Result(double)>> progress,
211+
Replication2Callback replicationCb) {
211212
TRI_ASSERT(ServerState::instance()->isCoordinator());
212213

213214
// prevent concurrent dropping

arangod/ClusterEngine/ClusterCollection.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ class ClusterCollection final : public PhysicalCollection {
8181

8282
futures::Future<std::shared_ptr<Index>> createIndex(
8383
velocypack::Slice info, bool restore, bool& created,
84-
std::shared_ptr<std::function<arangodb::Result(double)>> =
85-
nullptr) override;
84+
std::shared_ptr<std::function<arangodb::Result(double)>> = nullptr,
85+
Replication2Callback replicationCb = nullptr) override;
8686

8787
std::unique_ptr<IndexIterator> getAllIterator(
8888
transaction::Methods* trx, ReadOwnWrites readOwnWrites) const override;

arangod/Replication2/StateMachines/Document/DocumentLeaderState.cpp

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -698,13 +698,6 @@ auto DocumentLeaderState::createIndex(
698698
std::shared_ptr<methods::Indexes::ProgressTracker> progress)
699699
-> futures::Future<Result> {
700700
auto sharedIndexInfo = VPackBuilder{indexInfo}.sharedSlice();
701-
// Note that this ReplicatedOperation will only be used to call
702-
// createIndex, but will not be replicated; instead, a new one will be created
703-
// during RocksDBCollection::createIndex. It will actually be different,
704-
// because it will work with a normalized `indexInfo` parameter; however, as
705-
// that one is used for the index creation, it should not create any problems.
706-
auto op = ReplicatedOperation::buildCreateIndexOperation(
707-
shard, sharedIndexInfo, std::move(progress));
708701

709702
// Why are we first creating the index locally, then replicating it?
710703
// 1. Unique Indexes
@@ -724,13 +717,26 @@ auto DocumentLeaderState::createIndex(
724717
// the server.
725718

726719
auto localIndexCreation = _guardedData.doUnderLock(
727-
[op = std::move(op), self = shared_from_this()](auto& data) mutable {
720+
[self = shared_from_this(), shard, sharedIndexInfo,
721+
progress = std::move(progress)](auto& data) mutable {
728722
if (data.didResign()) {
729723
return Result{TRI_ERROR_REPLICATION_REPLICATED_LOG_LEADER_RESIGNED,
730724
"Leader resigned prior to index creation"};
731725
}
732726

733-
return data.transactionHandler->applyEntry(std::move(op));
727+
// Callback used to replicate the operation during
728+
// RocksDBCollection::createIndex
729+
auto op = ReplicatedOperation::buildCreateIndexOperation(
730+
shard, sharedIndexInfo, progress);
731+
auto callback = [op = std::move(op), self]() mutable {
732+
return self->replicateOperation(
733+
std::move(op),
734+
replication2::replicated_state::document::ReplicationOptions{
735+
.waitForCommit = true, .waitForSync = true});
736+
};
737+
738+
return self->_shardHandler->ensureIndex(
739+
shard, sharedIndexInfo, std::move(progress), std::move(callback));
734740
});
735741
auto indexId = helpers::extractId(sharedIndexInfo.slice());
736742
TRI_ASSERT(indexId != IndexId::none() or localIndexCreation.fail());

arangod/Replication2/StateMachines/Document/DocumentStateErrorHandler.cpp

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -105,21 +105,6 @@ auto DocumentStateErrorHandler::handleOpResult(
105105
<< " failed because a TTL index already exists, ignoring: " << res;
106106
return TRI_ERROR_NO_ERROR;
107107
}
108-
if (res.is(TRI_ERROR_REPLICATION_REPLICATED_STATE_NOT_FOUND)) {
109-
// During index creation, the `RocksDBCollection::createIndex` method tries
110-
// to fetch the document state, in order to potentially replicate the
111-
// operation. However, if the document state is to be dropped, the fetch
112-
// operation will fail. In that case, it is alright to abandon the index
113-
// creation attempt, since the document state is about to be wiped
114-
// completely. We must prevent the server from crashing.
115-
LOG_CTX("1ea43", DEBUG, _loggerContext)
116-
<< "Index creation " << op.properties.toJson() << " on shard "
117-
<< op.shard
118-
<< " failed because the document state is no longer available, "
119-
"ignoring: "
120-
<< res;
121-
return TRI_ERROR_NO_ERROR;
122-
}
123108
return res;
124109
}
125110

arangod/Replication2/StateMachines/Document/DocumentStateShardHandler.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,16 +128,17 @@ auto DocumentStateShardHandler::getAvailableShards()
128128

129129
auto DocumentStateShardHandler::ensureIndex(
130130
ShardID shard, velocypack::SharedSlice properties,
131-
std::shared_ptr<methods::Indexes::ProgressTracker> progress) noexcept
132-
-> Result {
131+
std::shared_ptr<methods::Indexes::ProgressTracker> progress,
132+
methods::Indexes::Replication2Callback callback) noexcept -> Result {
133133
auto col = lookupShard(shard);
134134
if (col.fail()) {
135135
return {col.errorNumber(),
136136
fmt::format("Error while ensuring index: {}", col.errorMessage())};
137137
}
138138

139139
auto res = _maintenance->executeCreateIndex(std::move(col).get(), properties,
140-
std::move(progress));
140+
std::move(progress),
141+
std::move(callback));
141142
std::ignore = _maintenance->addDirty();
142143

143144
if (res.fail()) {
@@ -146,7 +147,7 @@ auto DocumentStateShardHandler::ensureIndex(
146147
fmt::format(
147148
"Error: {}! Replicated log {} failed to ensure index on shard {}! "
148149
"Index: {}",
149-
res.errorMessage(), _gid, std::move(shard), properties.toJson()));
150+
res.errorMessage(), _gid, shard, properties.toJson()));
150151
}
151152
return res;
152153
}

arangod/Replication2/StateMachines/Document/DocumentStateShardHandler.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ struct IDocumentStateShardHandler {
6161

6262
virtual auto ensureIndex(
6363
ShardID shard, velocypack::SharedSlice properties,
64-
std::shared_ptr<methods::Indexes::ProgressTracker> progress) noexcept
64+
std::shared_ptr<methods::Indexes::ProgressTracker> progress,
65+
methods::Indexes::Replication2Callback callback = nullptr) noexcept
6566
-> Result = 0;
6667

6768
virtual auto dropIndex(ShardID shard, IndexId indexId) -> Result = 0;
@@ -97,10 +98,10 @@ class DocumentStateShardHandler : public IDocumentStateShardHandler {
9798
auto getAvailableShards()
9899
-> std::vector<std::shared_ptr<LogicalCollection>> override;
99100

100-
auto ensureIndex(
101-
ShardID shard, velocypack::SharedSlice properties,
102-
std::shared_ptr<methods::Indexes::ProgressTracker> progress) noexcept
103-
-> Result override;
101+
auto ensureIndex(ShardID shard, velocypack::SharedSlice properties,
102+
std::shared_ptr<methods::Indexes::ProgressTracker> progress,
103+
methods::Indexes::Replication2Callback callback =
104+
nullptr) noexcept -> Result override;
104105

105106
auto dropIndex(ShardID shard, IndexId indexId) noexcept -> Result override;
106107

arangod/Replication2/StateMachines/Document/MaintenanceActionExecutor.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,13 @@ auto MaintenanceActionExecutor::executeModifyCollection(
109109

110110
auto MaintenanceActionExecutor::executeCreateIndex(
111111
std::shared_ptr<LogicalCollection> col, velocypack::SharedSlice properties,
112-
std::shared_ptr<methods::Indexes::ProgressTracker> progress) noexcept
113-
-> Result {
112+
std::shared_ptr<methods::Indexes::ProgressTracker> progress,
113+
methods::Indexes::Replication2Callback callback) noexcept -> Result {
114114
VPackBuilder output;
115115
auto res = basics::catchToResult([&]() {
116116
return methods::Indexes::ensureIndex(*col, properties.slice(), true, output,
117-
std::move(progress))
117+
std::move(progress),
118+
std::move(callback))
118119
.get();
119120
});
120121

arangod/Replication2/StateMachines/Document/MaintenanceActionExecutor.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ struct IMaintenanceActionExecutor {
5252
virtual auto executeCreateIndex(
5353
std::shared_ptr<LogicalCollection> col,
5454
velocypack::SharedSlice properties,
55-
std::shared_ptr<methods::Indexes::ProgressTracker> progress) noexcept
56-
-> Result = 0;
55+
std::shared_ptr<methods::Indexes::ProgressTracker> progress,
56+
methods::Indexes::Replication2Callback callback) noexcept -> Result = 0;
5757

5858
virtual auto executeDropIndex(std::shared_ptr<LogicalCollection> col,
5959
IndexId indexId) noexcept -> Result = 0;
@@ -83,7 +83,8 @@ class MaintenanceActionExecutor : public IMaintenanceActionExecutor {
8383
auto executeCreateIndex(
8484
std::shared_ptr<LogicalCollection> col,
8585
velocypack::SharedSlice properties,
86-
std::shared_ptr<methods::Indexes::ProgressTracker> progress) noexcept
86+
std::shared_ptr<methods::Indexes::ProgressTracker> progress,
87+
methods::Indexes::Replication2Callback callback) noexcept
8788
-> Result override;
8889

8990
auto executeDropIndex(std::shared_ptr<LogicalCollection> col,

arangod/RocksDBEngine/RocksDBCollection.cpp

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,8 @@ void RocksDBCollection::duringAddIndex(std::shared_ptr<Index> idx) {
449449

450450
futures::Future<std::shared_ptr<Index>> RocksDBCollection::createIndex(
451451
VPackSlice info, bool restore, bool& created,
452-
std::shared_ptr<std::function<arangodb::Result(double)>> progress) {
452+
std::shared_ptr<std::function<arangodb::Result(double)>> progress,
453+
Replication2Callback replicationCb) {
453454
TRI_ASSERT(info.isObject());
454455

455456
// Step 0. Lock all the things
@@ -614,35 +615,14 @@ futures::Future<std::shared_ptr<Index>> RocksDBCollection::createIndex(
614615
inventoryLocker.lock();
615616

616617
// step 4½. replicate index creation
617-
if (vocbase.replicationVersion() == replication::Version::TWO) {
618-
// May throw TRI_ERROR_REPLICATED_STATE_NOT_FOUND in case the log manager
619-
// is dropping the replicated state
620-
auto documentStateLeader =
621-
_logicalCollection.getDocumentState()->getLeader();
622-
if (documentStateLeader != nullptr) {
623-
auto maybeShardID =
624-
ShardID::shardIdFromString(_logicalCollection.name());
625-
if (ADB_UNLIKELY(maybeShardID.fail())) {
626-
// This will only throw if we take a real collection here and not a
627-
// shard.
628-
TRI_ASSERT(false) << "Tried to ensure index on Collection "
629-
<< _logicalCollection.name()
630-
<< " which is not considered a shard.";
631-
THROW_ARANGO_EXCEPTION(maybeShardID.result());
632-
}
633-
auto const shardId = maybeShardID.get();
634-
635-
auto op = replication2::replicated_state::document::
636-
ReplicatedOperation::buildCreateIndexOperation(
637-
shardId, VPackBuilder{info}.sharedSlice(), std::move(progress));
638-
auto replicationFuture = documentStateLeader->replicateOperation(
639-
std::move(op),
640-
replication2::replicated_state::document::ReplicationOptions{
641-
.waitForCommit = true, .waitForSync = true});
642-
auto replicationResult = co_await std::move(replicationFuture);
643-
if (!replicationResult.ok()) {
644-
THROW_ARANGO_EXCEPTION(std::move(replicationResult).result());
645-
}
618+
if (vocbase.replicationVersion() == replication::Version::TWO &&
619+
replicationCb != nullptr) {
620+
// replicationCb is only set for leaders.
621+
// Its purpose is to replicate the CreateIndex operation to the followers.
622+
auto replicationFuture = replicationCb();
623+
auto replicationResult = co_await std::move(replicationFuture);
624+
if (!replicationResult.ok()) {
625+
THROW_ARANGO_EXCEPTION(std::move(replicationResult).result());
646626
}
647627
}
648628

arangod/RocksDBEngine/RocksDBCollection.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ class RocksDBCollection final : public RocksDBMetaCollection {
7171

7272
futures::Future<std::shared_ptr<Index>> createIndex(
7373
velocypack::Slice info, bool restore, bool& created,
74-
std::shared_ptr<std::function<arangodb::Result(double)>> =
75-
nullptr) override;
74+
std::shared_ptr<std::function<arangodb::Result(double)>> = nullptr,
75+
Replication2Callback replicationCb = nullptr) override;
7676

7777
std::unique_ptr<IndexIterator> getAllIterator(
7878
transaction::Methods* trx, ReadOwnWrites readOwnWrites) const override;

0 commit comments

Comments
 (0)
0