8000 RocksDB replication thread safety (#7088) · sita1999/arangodb@5b71dff · GitHub
[go: up one dir, main page]

Skip to content

Commit 5b71dff

Browse files
graetzerjsteemann
authored andcommitted
RocksDB replication thread safety (arangodb#7088)
1 parent c72818a commit 5b71dff

19 files changed

+842
-752
lines changed

arangod/Cluster/SynchronizeShard.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -806,7 +806,7 @@ bool SynchronizeShard::first() {
806806
VPackBuilder config;
807807
{ VPackObjectBuilder o(&config);
808808
config.add(ENDPOINT, VPackValue(ep));
809-
config.add(INCREMENTAL, VPackValue(true));
809+
config.add(INCREMENTAL, VPackValue(docCount > 0)); // use dump if possible
810810
config.add(KEEP_BARRIER, VPackValue(true));
811811
config.add(LEADER_ID, VPackValue(leader));
812812
config.add(SKIP_CREATE_DROP, VPackValue(true));

arangod/Replication/DatabaseInitialSyncer.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,17 @@ Result DatabaseInitialSyncer::runWithInventory(bool incremental,
193193
if (r.fail()) {
194194
return r;
195195
}
196+
197+
// enable patching of collection count for ShardSynchronization Job
198+
std::string patchCount = StaticStrings::Empty;
199+
std::string const& engineName = EngineSelectorFeature::ENGINE->typeName();
200+
if (incremental && engineName == "rocksdb" && _config.applier._skipCreateDrop &&
201+
_config.applier._restrictType == ReplicationApplierConfiguration::RestrictType::Include &&
202+
_config.applier._restrictCollections.size() == 1) {
203+
patchCount = *_config.applier._restrictCollections.begin();
204+
}
196205

197-
r = _config.batch.start(_config.connection, _config.progress);
206+
r = _config.batch.start(_config.connection, _config.progress, patchCount);
198207
if (r.fail()) {
199208
return r;
200209
}

arangod/Replication/utilities.cpp

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -368,20 +368,34 @@ Result BarrierInfo::remove(Connection& connection) noexcept {
368368

369369
constexpr double BatchInfo::DefaultTimeout;
370370

371+
/// @brief send a "start batch" command
372+
/// @param patchCount try to patch count of this collection
373+
/// only effective with the incremental sync (optional)
371374
Result BatchInfo::start(replutils::Connection& connection,
372-
replutils::ProgressInfo& progress) {
375+
replutils::ProgressInfo& progress,
376+
std::string const& patchCount) {
373377
// TODO make sure all callers verify not child syncer
374378
if (!connection.valid()) {
375379
return {TRI_ERROR_INTERNAL};
376380
}
377381

378382
double const now = TRI_microtime();
379383
id = 0;
384+
385+
// SimpleHttpClient automatically add database prefix
380386
std::string const url =
381387
ReplicationUrl + "/batch" + "?serverId=" + connection.localServerId();
382-
std::string const body = "{\"ttl\":" + basics::StringUtils::itoa(ttl) + "}";
383-
progress.set("sending batch start command to url " + url);
388+
VPackBuilder b;
389+
{
390+
VPackObjectBuilder guard(&b, true);
391+
guard->add("ttl", VPackValue(ttl));
392+
if (!patchCount.empty()) {
393+
guard->add("patchCount", VPackValue(patchCount));
394+
}
395+
}
396+
std::string const body = b.toJson();
384397

398+
progress.set("sending batch start command to url " + url);
385399
// send request
386400
std::unique_ptr<httpclient::SimpleHttpResult> response;
387401
connection.lease([&](httpclient::SimpleHttpClient* client) {

arangod/Replication/utilities.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,10 @@ struct BatchInfo {
147147
double updateTime{0};
148148

149149
/// @brief send a "start batch" command
150-
Result start(Connection& connection, ProgressInfo& progress);
150+
/// @param patchCount try to patch count of this collection
151+
/// only effective with the incremental sync
152+
Result start(Connection& connection, ProgressInfo& progress,
153+
std::string const& patchCount = "");
151154

152155
/// @brief send an "extend batch" command
153156
Result extend(Connection& connection, ProgressInfo& progress);

arangod/RocksDBEngine/RocksDBIterators.cpp

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ RocksDBAllIndexIterator::RocksDBAllIndexIterator(
4444
: IndexIterator(col, trx),
4545
_bounds(RocksDBKeyBounds::CollectionDocuments(
4646
static_cast<RocksDBCollection*>(col->getPhysical())->objectId())),
47+
_upperBound(_bounds.end()),
4748
_cmp(RocksDBColumnFamily::documents()->GetComparator()) {
4849
// acquire rocksdb transaction
4950
auto* mthds = RocksDBTransactionState::toMethods(trx);
@@ -54,6 +55,7 @@ RocksDBAllIndexIterator::RocksDBAllIndexIterator(
5455
TRI_ASSERT(options.prefix_same_as_start);
5556
options.fill_cache = AllIteratorFillBlockCache;
5657
options.verify_checksums = false; // TODO evaluate
58+
options.iterate_upper_bound = &_upperBound;
5759
// options.readahead_size = 4 * 1024 * 1024;
5860
_iterator = mthds->NewIterator(options, cf);
5961
TRI_ASSERT(_iterator);
@@ -374,25 +376,3 @@ RocksDBGenericIterator arangodb::createPrimaryIndexIterator(transaction::Methods
374376
TRI_ASSERT(iterator.bounds().columnFamily() == RocksDBColumnFamily::primary());
375377
return iterator;
376378
}
377-
378-
RocksDBGenericIterator arangodb::createDocumentIterator(transaction::Methods* trx,
379-
LogicalCollection* col) {
380-
TRI_ASSERT(col != nullptr);
381-
TRI_ASSERT(trx != nullptr);
382-
383-
auto* mthds = RocksDBTransactionState::toMethods(trx);
384-
385-
rocksdb::ReadOptions options = mthds->iteratorReadOptions();
386-
TRI_ASSERT(options.snapshot != nullptr); // trx must contain a valid snapshot
387-
TRI_ASSERT(options.prefix_same_as_start);
388-
options.fill_cache = true;
389-
options.verify_checksums = false;
390-
391-
auto rocksColObjectId = static_cast<RocksDBCollection*>(col->getPhysical())->objectId();
392-
auto bounds(RocksDBKeyBounds::CollectionDocuments(rocksColObjectId));
393-
auto iterator = RocksDBGenericIterator(options, bounds);
394-
395-
TRI_ASSERT(iterator.bounds().objectId() == rocksColObjectId);
396-
TRI_ASSERT(iterator.bounds().columnFamily() == RocksDBColumnFamily::documents());
397-
return iterator;
398-
}

arangod/RocksDBEngine/RocksDBIterators.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ class RocksDBAllIndexIterator final : public IndexIterator {
6464
private:
6565
bool outOfRange() const;
6666

67+
private:
6768
RocksDBKeyBounds const _bounds;
69+
rocksdb::Slice _upperBound; // used for iterate_upper_bound
6870
std::unique_ptr<rocksdb::Iterator> _iterator;
6971
rocksdb::Comparator const* _cmp;
7072
};
@@ -122,6 +124,8 @@ class RocksDBGenericIterator {
122124

123125
private:
124126
bool outOfRange() const;
127+
128+
private:
125129
bool _reverse;
126130
RocksDBKeyBounds const _bounds;
127131
rocksdb::ReadOptions const _options;
@@ -131,8 +135,5 @@ class RocksDBGenericIterator {
131135

132136
RocksDBGenericIterator createPrimaryIndexIterator(transaction::Methods* trx,
133137
LogicalCollection* col);
134-
135-
RocksDBGenericIterator createDocumentIterator(transaction::Methods* trx,
136-
LogicalCollection* col);
137138
} //namespace arangodb
138139
#endif

0 commit comments

Comments
 (0)
0