8000 Removed incorrect skipping of Batches in RocksDB Tailing syncer. (#7022) · sita1999/arangodb@83c1b08 · GitHub
[go: up one dir, main page]

Skip to content

Commit 83c1b08

Browse files
authored
Removed incorrect skipping of Batches in RocksDB Tailing syncer. (arangodb#7022)
* Removed incorrect skipping of Batches in RocksDB Tailing syncer. This caused issues, whenever one transaction was spiltted. * Added a test for Splitting a large transaction in RocksDB * Reactivated skipping in RocksDB Wal Tailing (reverts initial fix) * Actually include lastScannedTick in CollectionFinalize. Proper fix, kudos to @jsteemann. * Fixed healFollower task in split-large-transaction test
1 parent 221d036 commit 83c1b08
8000

File tree

2 files changed

+46
-2
lines changed

2 files changed

+46
-2
lines changed

arangod/Replication/DatabaseTailingSyncer.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(
116116
_ignoreRenameCreateDrop = true;
117117

118118
TRI_voc_tick_t fromTick = _initialTick;
119+
TRI_voc_tick_t lastScannedTick = fromTick;
119120
LOG_TOPIC(DEBUG, Logger::REPLICATION)
120121
<< "starting syncCollectionFinalize:" << collectionName << ", fromTick "
121122
<< fromTick;
@@ -129,6 +130,7 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(
129130
tailingBaseUrl("tail") +
130131
"chunkSize=" + StringUtils::itoa(_state.applier._chunkSize) +
131132
"&from=" + StringUtils::itoa(fromTick) +
133+
"&lastScanned=" + StringUtils::itoa(lastScannedTick) +
132134
"&serverId=" + _state.localServerIdString +
133135
"&collection=" + StringUtils::urlEncode(collectionName);
134136

@@ -155,6 +157,12 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(
155157
checkMore = StringUtils::boolean(header);
156158
}
157159

160+
header = response->getHeaderField(
161+
StaticStrings::ReplicationHeaderLastScanned, found);
162+
if (found) {
163+
lastScannedTick = StringUtils::uint64(header);
164+
}
165+
158166
header = response->getHeaderField(
159167
StaticStrings::ReplicationHeaderLastIncluded, found);
160168
if (!found) {
@@ -173,7 +181,7 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(
173181
if (found) {
174182
fromIncluded = StringUtils::boolean(header);
175183
}
176-
if (!fromIncluded && fromTick > 0) { // && _requireFromPresent
184+
if (!fromIncluded && fromTick > 0) {
177185
return Result(
178186
TRI_ERROR_REPLICATION_START_TICK_NOT_PRESENT,
179187
std::string("required follow tick value '") +
@@ -196,6 +204,8 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(
196204
// update the tick from which we will fetch in the next round
197205
if (lastIncludedTick > fromTick) {
198206
fromTick = lastIncludedTick;
207+
} else if (lastIncludedTick == 0 && lastScannedTick > 0 && lastScannedTick > fromTick) {
208+
fromTick = lastScannedTick - 1;
199209
} else if (checkMore) {
200210
// we got the same tick again, this indicates we're at the end
201211
checkMore = false;
@@ -208,7 +218,7 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(
208218
return Result();
209219
}
210220
LOG_TOPIC(DEBUG, Logger::REPLICATION)
211-
<< "Fetching more data fromTick " << fromTick;
221+
<< "Fetching more data, fromTick: " << fromTick << ", lastScannedTick: " << lastScannedTick;
212222
}
213223
}
214224

tests/js/server/resilience/resilience-synchronous-repl-cluster.js

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ function SynchronousReplicationSuite () {
9999
assertTrue(pos >= 0);
100100
assertTrue(suspendExternal(global.instanceInfo.arangods[pos].pid));
101101
console.info("Have failed follower", follower);
102+
return pos;
102103
}
103104

104105
////////////////////////////////////////////////////////////////////////////////
@@ -804,6 +805,39 @@ function SynchronousReplicationSuite () {
804805
assertTrue(waitForSynchronousReplication("_system"));
805806
},
806807

808+
testLargeTransactionsSplitting : function () {
809+
let docs = [];
810+
// We try to create a massive write transaction.
811+
// This one now is above 6MB
812+
for (let i = 0; i < 10000; ++i) {
813+
docs.push({"undderhund": "macht so wau wau wau!"});
814+
}
815+
for (let i = 0; i < 5; ++i) {
816+
// We trigger 5 of these large transactions
817+
c.insert(docs);
818+
}
819+
let referenceCounter = c.count();
820+
assertTrue(waitForSynchronousReplication("_system"));
821+
822+
// Now we trigger failedFollower
823+
const failedPos = failFollower();
824+
// We now continuously add more large transaction to trigger tailing
825+
for (let i = 0; i < 5; ++i) {
826+
// We trigger 5 more of these large transactions
827+
c.insert(docs);
828+
}
829+
830+
// This should trigger a new follower to be added.
831+
// This follower needs to sync up with at least one splitted transaction
832+
// The collection will not get back into sync if this splitted transaction
833+
// fails. Also assertions will be triggered.
834+
// Wait for it:
835+
assertTrue(waitForSynchronousReplication("_system"));
836+
837+
// Heal follower
838+
assertTrue(continueExternal(global.instanceInfo.arangods[failedPos].pid));
839+
},
840+
807841
////////////////////////////////////////////////////////////////////////////////
808842
/// @brief just to allow a trailing comma at the end of the last test
809843
////////////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)
0