8000 more tests for replication consistency (#13507) · elfringham/arangodb@c97f299 · GitHub
[go: up one dir, main page]

Skip to content

Commit c97f299

Browse files
jsteemannneunhoef
authored andcommitted
more tests for replication consistency (arangodb#13507)
Co-authored-by: Max Neunhöffer <max@arangodb.com> Co-authored-by: Max Neunhoeffer <max@arangodb.com>
1 parent eea6c49 commit c97f299

34 files changed

+1734
-262
lines changed

3rdParty/rocksdb/6.8/util/heap.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@ class BinaryHeap {
7272

7373
void pop() {
7474
assert(!empty());
75-
data_.front() = std::move(data_.back());
75+
if (data_.size() > 1) {
76+
data_.front() = std::move(data_.back());
77+
}
7678
data_.pop_back();
7779
if (!empty()) {
7880
downheap(get_root());

CHANGELOG

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,18 @@
11
devel
22
-----
33

4+
* Fixed replication bug in MerkleTree sync protocol, which could lead to
5+
data corruption. The visible effect was that shards could no longer get
6+
in sync since the counts would not match after sync, even after a recount.
7+
This corruption only happened if there were large amounts of differences
8+
(at least 65537) and the destination side had newer revisions for some
9+
keys than the source side.
10+
11+
* Fixed a RocksDB bug which could lead to an assertion failure when compiling
12+
with STL debug mode -D_GLIBCXX_DEBUG.
13+
14+
* Fixed a rare internal buffer overflow around ridBuffers.
15+
416
* Issue #13141: The `move-filters-into-enumerate` optimization, when applied to
517
an EnumerateCollectionNode (i.e. full collection scan), did not do regular
618
checks for the query being killed during the filtering of documents, resulting

arangod/Indexes/Index.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,11 +1015,11 @@ void Index::warmup(arangodb::transaction::Methods*, std::shared_ptr<basics::Loca
10151015

10161016
/// @brief generate error message
10171017
/// @param key the conflicting key
1018-
Result& Index::addErrorMsg(Result& r, std::string const& key) {
1018+
Result& Index::addErrorMsg(Result& r, std::string const& key) const {
10191019
return r.withError([this, &key](result::Error& err) { addErrorMsg(err, key); });
10201020
}
10211021

1022-
void Index::addErrorMsg(result::Error& r, std::string const& key) {
1022+
void Index::addErrorMsg(result::Error& r, std::string const& key) const {
10231023
// now provide more context based on index
10241024
r.appendErrorMessage(" - in index ");
10251025
r.appendErrorMessage(name());

arangod/Indexes/Index.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ class Index {
443443
/// @param code the error key
444444
/// @param key the conflicting key
445445
arangodb::Result& addErrorMsg(Result& r, ErrorCode code,
446-
std::string const& key = "") {
446+
std::string const& key = "") const {
447447
if (code != TRI_ERROR_NO_ERROR) {
448448
r.reset(code);
449449
return addErrorMsg(r, key);
@@ -453,8 +453,8 @@ class Index {
453453

454454
/// @brief generate error result
455455
/// @param key the conflicting key
456-
arangodb::Result& addErrorMsg(Result& r, std::string const& key = "");
457-
void addErrorMsg(result::Error& err, std::string const& key);
456+
arangodb::Result& addErrorMsg(Result& r, std::string const& key = "") const;
457+
void addErrorMsg(result::Error& err, std::string const& key) const;
458458

459459
/// @brief extracts a timestamp value from a document
460460
/// returns a negative value if the document does not contain the specified

arangod/Replication/DatabaseInitialSyncer.cpp

Lines changed: 56 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,8 @@ arangodb::Result fetchRevisions(arangodb::transaction::Methods& trx,
170170
"&batchId=" + std::to_string(config.batch.id);
171171
auto headers = arangodb::replutils::createHeaders();
172172

173-
std::string msg = "fetching documents by revision for collection '" +
174-
collection.name() + "' from " + url;
175-
config.progress.set(msg);
173+
config.progress.set("fetching documents by revision for collection '" +
174+
collection.name() + "' from " + url);
176175

177176
auto removeConflict = [&](auto const& conflictingKey) -> Result {
178177
keyBuilder->clear();
@@ -187,10 +186,18 @@ arangodb::Result fetchRevisions(arangodb::transaction::Methods& trx,
187186
return res;
188187
};
189188

189+
std::size_t numUniqueIndexes = [&]() {
190+
std::size_t numUnique = 0;
191+
for (auto const& idx : collection.getIndexes()) {
192+
numUnique += idx->unique() ? 1 : 0;
193+
}
194+
return numUnique;
195+
}();
196+
190197
std::size_t current = 0;
191198
auto guard = arangodb::scopeGuard(
192199
[&current, &stats]() -> void { stats.numDocsRequested += current; });
193-
char ridBuffer[11];
200+
char ridBuffer[arangodb::basics::maxUInt64StringSize];
194201
std::unique_ptr<arangodb::httpclient::SimpleHttpResult> response;
195202
while (current < toFetch.size()) {
196203
arangodb::transaction::BuilderLeaser requestBuilder(&trx);
@@ -230,6 +237,8 @@ arangodb::Result fetchRevisions(arangodb::transaction::Methods& trx,
230237
config.leader.endpoint, url,
231238
": response is not an array"));
232239
}
240+
241+
config.progress.set("applying documents by revision for collection '" + collection.name() + "'");
233242

234243
for (VPackSlice leaderDoc : VPackArrayIterator(docs)) {
235244
if (!leaderDoc.isObject()) {
@@ -254,41 +263,50 @@ arangodb::Result fetchRevisions(arangodb::transaction::Methods& trx,
254263
": document revision is invalid");
255264
}
256265

257-
TRI_ASSERT(options.indexOperationMode == arangodb::IndexOperationMode::internal);
258-
259-
Result res = physical->insert(&trx, leaderDoc, mdr, options);
260-
261-
if (res.fail()) {
262-
if (res.is(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) &&
263-
res.errorMessage() > keySlice.stringView()) {
264-
arangodb::RevisionId rid = arangodb::RevisionId::fromSlice(leaderDoc);
265-
if (physical->readDocument(&trx, arangodb::LocalDocumentId(rid.id()), mdr)) {
266-
// already have exactly this revision no need to insert
267-
continue;
268-
}
269-
// remove conflict and retry
270-
// errorMessage() is this case contains the conflicting key
271-
auto inner = removeConflict(res.errorMessage());
272-
if (inner.fail()) {
273-
return res;
274-
}
266+
options.indexOperationMode = arangodb::IndexOperationMode::internal;
267+
268+
// we need a retry loop here for unique indexes (we will always have at least
269+
// one unique index, which is the primary index, but there can be more). as
270+
// documents can be presented in any state on the follower, simply inserting
271+
// them in leader order may trigger a unique constraint violation on the follower.
272+
// in this case we may need to remove the conflicting document. this can
273+
// happen multiple times if there are multiple unique indexes! we can only
274+
// stop trying once we have tried often enough, or if inserting succeeds.
275+
std::size_t tries = 1 + numUniqueIndexes;
276+
while (tries-- > 0) {
277+
if (tries == 0) {
275278
options.indexOperationMode = arangodb::IndexOperationMode::normal;
276-
res = physical->insert(&trx, leaderDoc, mdr, options);
279+
}
280+
281+
Result res = physical->insert(&trx, leaderDoc, mdr, options);
277282

278-
options.indexOperationMode = arangodb::IndexOperationMode::internal;
279-
if (res.fail()) {
280-
return res;
281-
}
282-
// fall-through
283-
} else {
283+
options.indexOperationMode = arangodb::IndexOperationMode::internal;
284+
285+
if (res.ok()) {
286+
++stats.numDocsInserted;
287+
break;
288+
}
289+
290+
if (!res.is(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED)) {
284291
auto errorNumber = res.errorNumber();
285292
res.reset(errorNumber, concatT(TRI_errno_string(errorNumber), ": ",
286293
res.errorMessage()));
287294
return res;
288295
}
289-
}
290296

291-
++stats.numDocsInserted;
297+
arangodb::RevisionId rid = arangodb::RevisionId::fromSlice(leaderDoc);
298+
if (physical->readDocument(&trx, arangodb::LocalDocumentId(rid.id()), mdr)) {
299+
// already have exactly this revision no need to insert
300+
break;
301+
}
302+
303+
// remove conflict and retry
304+
// errorMessage() is this case contains the conflicting key
305+
auto inner = removeConflict(res.errorMessage());
306+
if (inner.fail()) {
307+
return res;
308+
}
309+
}
292310
}
293311
current += docs.length();
294312
}
@@ -1145,8 +1163,7 @@ Result DatabaseInitialSyncer::fetchCollectionSyncByKeys(arangodb::LogicalCollect
11451163

11461164
if (!res.ok()) {
11471165
return Result(res.errorNumber(),
1148-
concatT("unable to start transaction (", __FILE__, ":",
1149-
__LINE__, "): ", res.errorMessage()));
1166+
concatT("unable to start transaction: ", res.errorMessage()));
11501167
}
11511168

11521169
OperationOptions options;
@@ -1260,8 +1277,7 @@ Result DatabaseInitialSyncer::fetchCollectionSyncByRevisions(arangodb::LogicalCo
12601277

12611278
if (!res.ok()) {
12621279
return Result(res.errorNumber(),
1263-
concatT("unable to start transaction (", __FILE__, ":",
1264-
__LINE__, "): ", res.errorMessage()));
1280+
concatT("unable to start transaction: ", res.errorMessage()));
12651281
}
12661282

12671283
OperationOptions options;
@@ -1315,7 +1331,10 @@ Result DatabaseInitialSyncer::fetchCollectionSyncByRevisions(arangodb::LogicalCo
13151331
}
13161332
return Result(ex.code());
13171333
}
1318-
trx->addHint(Hints::Hint::NO_INDEXING);
1334+
1335+
// we must be able to read our own writes here - otherwise the end result
1336+
// can be wrong. do not enable NO_INDEXING here!
1337+
13191338
// turn on intermediate commits as the number of keys to delete can be huge
13201339
// here
13211340
trx->addHint(Hints::Hint::INTERMEDIATE_COMMITS);
@@ -1355,7 +1374,7 @@ Result DatabaseInitialSyncer::fetchCollectionSyncByRevisions(arangodb::LogicalCo
13551374
{
13561375
VPackBuilder requestBuilder;
13571376
{
1358-
char ridBuffer[11];
1377+
char ridBuffer[arangodb::basics::maxUInt64StringSize];
13591378
VPackArrayBuilder list(&requestBuilder);
13601379
for (auto& pair : ranges) {
13611380
VPackArrayBuilder range(&requestBuilder);

arangod/RestHandler/RestReplicationHandler.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1524,7 +1524,7 @@ Result RestReplicationHandler::parseBatch(transaction::Methods& trx,
15241524
if (generateNewRevisionIds &&
15251525
!isKey &&
15261526
arangodb::velocypack::StringRef(it.key) == StaticStrings::RevString) {
1527-
char ridBuffer[11];
1527+
char ridBuffer[arangodb::basics::maxUInt64StringSize];
15281528
RevisionId newRid = physical->newRevisionId();
15291529
documentsToInsert.add(newRid.toValuePair(ridBuffer));
15301530
} else {
@@ -3059,7 +3059,7 @@ void RestReplicationHandler::handleCommandRevisionRanges() {
30593059
};
30603060
setRange(current);
30613061

3062-
char ridBuffer[11];
3062+
char ridBuffer[arangodb::basics::maxUInt64StringSize];
30633063
{
30643064
TRI_ASSERT(response.isOpenObject());
30653065
VPackArrayBuilder rangesGuard(&response, StaticStrings::RevisionTreeRanges);

arangod/RocksDBEngine/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ set(ROCKSDB_SOURCES
7979
RocksDBEngine/RocksDBRestHandlers.cpp
8080
RocksDBEngine/RocksDBRestReplicationHandler.cpp
8181
RocksDBEngine/RocksDBRestWalHandler.cpp
82+
RocksDBEngine/RocksDBSavePoint.cpp
8283
RocksDBEngine/RocksDBSettingsManager.cpp
8384
RocksDBEngine/RocksDBSyncThread.cpp
8485
RocksDBEngine/RocksDBTransactionCollection.cpp

0 commit comments

Comments
 (0)
0