8000 Add drop-check for index creation in cluster (#9219) · reynoldsm88/arangodb@938e5fd · GitHub
[go: up one dir, main page]

Skip to content

Commit 938e5fd

Browse files
Dan Larkin-Yorkfceller
authored andcommitted
Add drop-check for index creation in cluster (arangodb#9219)
* Add drop-check for index creation in cluster. * Move check from callback to regular read. * Add changelog entry. * Incorporate review suggestion Co-Authored-By: Simon <simon@graetzer.org> * Convert to VPackArrayIterator.
1 parent 2d732be commit 938e5fd

File tree

3 files changed

+50
-11
lines changed

3 files changed

+50
-11
lines changed

CHANGELOG

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
devel
22
-----
33

4+
* fix timeout-response in case of simultaneous index create/drop in cluster
5+
46
* allow pregel to select the shard key via `shardKeyAttribute` in pregel start parameters
57

68
* Added --server.jwt-secret-keyfile to arangosh.

arangod/Cluster/ClusterInfo.cpp

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2751,10 +2751,11 @@ Result ClusterInfo::ensureIndexCoordinatorInner( // create index
27512751
}
27522752

27532753
// will contain the error number and message
2754-
auto dbServerResult = std::make_shared<std::atomic<int>>(-1);
2754+
std::atomic<int> dbServerResult(-1);
27552755
std::shared_ptr<std::string> errMsg = std::make_shared<std::string>();
27562756

2757-
std::function<bool(VPackSlice const& result)> dbServerChanged = [=](VPackSlice const& result) {
2757+
std::function<bool(VPackSlice const& result)> dbServerChanged = [=, &dbServerResult](
2758+
VPackSlice const& result) {
27582759
if (!result.isObject() || result.length() != numberOfShards) {
27592760
return true;
27602761
}
@@ -2786,7 +2787,7 @@ Result ClusterInfo::ensureIndexCoordinatorInner( // create index
27862787
// error otherwise
27872788
int errNum = arangodb::basics::VelocyPackHelper::readNumericValue<int>(
27882789
v, StaticStrings::ErrorNum, TRI_ERROR_ARANGO_INDEX_CREATION_FAILED);
2789-
dbServerResult->store(errNum, std::memory_order_release);
2790+
dbServerResult.store(errNum, std::memory_order_release);
27902791
return true;
27912792
}
27922793

@@ -2797,7 +2798,7 @@ Result ClusterInfo::ensureIndexCoordinatorInner( // create index
27972798
}
27982799

27992800
if (found == (size_t)numberOfShards) {
2800-
dbServerResult->store(setErrormsg(TRI_ERROR_NO_ERROR, *errMsg), std::memory_order_release);
2801+
dbServerResult.store(setErrormsg(TRI_ERROR_NO_ERROR, *errMsg), std::memory_order_release);
28012802
}
28022803

28032804
return true;
@@ -2828,12 +2829,12 @@ Result ClusterInfo::ensureIndexCoordinatorInner( // create index
28282829
// by a mutex. We use the mutex of the condition variable in the
28292830
// AgencyCallback for this.
28302831
std::string where = "Current/Collections/" + databaseName + "/" + collectionID;
2831-
28322832
auto agencyCallback =
28332833
std::make_shared<AgencyCallback>(ac, where, dbServerChanged, true, false);
28342834
_agencyCallbackRegistry->registerCallback(agencyCallback);
28352835
auto cbGuard = scopeGuard(
28362836
[&] { _agencyCallbackRegistry->unregisterCallback(agencyCallback); });
2837+
28372838
AgencyOperation newValue(planIndexesKey, AgencyValueOperationType::PUSH,
28382839
newIndexBuilder.slice());
28392840
AgencyOperation incrementVersion("Plan/Version", AgencySimpleOperationType::INCREMENT_OP);
@@ -2878,7 +2879,40 @@ Result ClusterInfo::ensureIndexCoordinatorInner( // create index
28782879

28792880
{
28802881
while (!application_features::ApplicationServer::isStopping()) {
2881-
int tmpRes = dbServerResult->load(std::memory_order_acquire);
2882+
int tmpRes = dbServerResult.load(std::memory_order_acquire);
2883+
2884+
if (tmpRes < 0) {
2885+
// index has not shown up in Current yet, follow up check to
2886+
// ensure it is still in plan (not dropped between iterations)
2887+
2888+
AgencyCommResult result = _agency.sendTransactionWithFailover(
2889+
AgencyReadTransaction(AgencyCommManager::path(planIndexesKey)));
2890+
2891+
if (result.successful()) {
2892+
auto indexes = result.slice()[0].get(
2893+
std::vector<std::string>{AgencyCommManager::path(), "Plan",
2894+
"Collections", databaseName,
2895+
collectionID, "indexes"});
2896+
2897+
bool found = false;
2898+
if (indexes.isArray()) {
2899+
for (auto const& v : VPackArrayIterator(indexes)) {
2900+
VPackSlice const k = v.get(StaticStrings::IndexId);
2901+
if (k.isString() && k.isEqualString(idString)) {
2902+
// index is still here
2903+
found = true;
2904+
break;
2905+
}
2906+
}
2907+
}
2908+
2909+
if (!found) {
2910+
return Result(TRI_ERROR_ARANGO_INDEX_CREATION_FAILED,
2911+
"index was dropped during creation");
2912+
}
2913+
}
2914+
}
2915+
28822916
if (tmpRes == 0) {
28832917
// Finally, in case all is good, remove the `isBuilding` flag
28842918
// check that the index has appeared. Note that we have to have

tests/IResearch/AgencyMock.cpp

Lines changed: 8 additions & 5 deletions
39
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@
3535
namespace arangodb {
3636
namespace consensus {
3737

38+
// FIXME TODO this implementation causes deadlock when unregistering a callback,
+
// if there is still another callback registered; it's not obvious
40+
// how to fix this, as it seems the problem is that both "agents"
41+
// live on the same server and share an AgencyCallbackRegistry
42+
// instance; we could solve this if we could have two
43+
// ApplicationServers in the same instance, but too many things in
44+
// the feature stack are static still to make the changes right now
3845
// FIXME TODO for some reason the implementation of this function is missing in the arangodb code
3946
void Store::notifyObservers() const {
4047
auto* clusterFeature =
@@ -79,11 +86,7 @@ void Store::notifyObservers() const {
7986

8087
for (auto& id: callbackIds) {
8188
try {
82-
auto& condition = callbackRegistry->getCallback(id)->_cv;
83-
CONDITION_LOCKER(locker, condition);
84-
85-
callbackRegistry->getCallback(id)->refetchAndUpdate(false, true); // force a check
86-
condition.signal();
89+
callbackRegistry->getCallback(id)->refetchAndUpdate(true, true); // force a check
8790
} catch(...) {
8891
// ignore
8992
}

0 commit comments

Comments
 (0)
0