8000 less RocksDB background thread activity in case there is nothing to d… · soualid/arangodb@024a9d7 · GitHub
[go: up one dir, main page]

Skip to content

Commit 024a9d7

Browse files
authored
less RocksDB background thread activity in case there is nothing to do (arangodb#6577)
1 parent b16af5a commit 024a9d7

File tree

6 files changed

+148
-49
lines changed

6 files changed

+148
-49
lines changed

arangod/RocksDBEngine/RocksDBCollection.cpp

Lines changed: 7 additions & 0 deletions

Original file line numberDiff line numberDiff line change
@@ -1847,6 +1847,7 @@ RocksDBCollection::serializeIndexEstimates(
18471847
output.clear();
18481848
RocksDBIndex* cindex = static_cast<RocksDBIndex*>(index.get());
18491849
TRI_ASSERT(cindex != nullptr);
1850+
18501851
if (cindex->needToPersistEstimate()) {
18511852
LOG_TOPIC(TRACE, Logger::ENGINES)
18521853
<< "beginning estimate serialization for index '"
@@ -1922,6 +1923,12 @@ void RocksDBCollection::recalculateIndexEstimates(
19221923
19231924
arangodb::Result RocksDBCollection::serializeKeyGenerator(
19241925
rocksdb::Transaction* rtrx) const {
1926+
if (!_logicalCollection.keyGenerator()->hasDynamicState()) {
1927+
// the key generator will not produce any dynamic data,
1928+
// so it does not need to be serialized (nor recovered)
1929+
return Result();
1930+
}
1931+
19251932
VPackBuilder builder;
19261933

19271934
builder.openObject();

arangod/RocksDBEngine/RocksDBSettingsManager.cpp

Lines changed: 110 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ arangodb::Result writeCounterValue(
6767

6868
// Skip values which we did not change
6969
auto const& it = syncedSeqNums.find(pair.first);
70+
7071
if (it != syncedSeqNums.end() && it->second == pair.second._sequenceNum) {
7172
return Result();
7273
}
@@ -164,6 +165,7 @@ writeIndexEstimatorsAndKeyGenerator(
164165
// or start fresh.
165166
return std::make_pair(Result(), returnSeq);
166167
}
168+
167169
TRI_DEFER(vocbase->release());
168170

169171
auto collection = vocbase->lookupCollection(dbColPair.second);
@@ -227,7 +229,35 @@ void RocksDBSettingsManager::CMValue::serialize(VPackBuilder& b) const {
227229
/// Constructor needs to be called synchrunously,
228230
/// will load counts from the db and scan the WAL
229231
RocksDBSettingsManager::RocksDBSettingsManager(rocksdb::TransactionDB* db)
230-
: _lastSync(0), _syncing(false), _db(db), _initialReleasedTick(0) {}
232+
: _lastSync(0),
233+
_syncing(false),
234+
_db(db),
235+
_initialReleasedTick(0),
236+
_maxUpdateSeqNo(1),
237+
_lastSyncedSeqNo(0) {}
238+
239+
/// bump up the value of the last rocksdb::SequenceNumber we have seen
240+
/// and that is pending a sync update
241+
void RocksDBSettingsManager::setMaxUpdateSequenceNumber(rocksdb::SequenceNumber seqNo) {
242+
if (seqNo == 0) {
243+
// we don't care about this
244+
return;
245+
}
246+
247+
auto current = _maxUpdateSeqNo.load(std::memory_order_acquire);
248+
249+
if (current >= seqNo) {
250+
// current sequence number is already higher than the one we got
251+
return;
252+
}
253+
254+
bool res = _maxUpdateSeqNo.compare_exchange_strong(current, seqNo, std::memory_order_release);
255+
256+
if (!res) {
257+
// someone else has updated the max sequence number
258+
TRI_ASSERT(current > seqNo);
259+
}
260+
}
231261

232262
/// retrieve initial values from the database
233263
void RocksDBSettingsManager::retrieveInitialValues() {
@@ -244,11 +274,13 @@ RocksDBSettingsManager::CounterAdjustment RocksDBSettingsManager::loadCounter(
244274
TRI_ASSERT(objectId != 0); // TODO fix this
245275

246276
READ_LOCKER(guard, _rwLock);
277+
247278
auto const& it = _counters.find(objectId);
248279
if (it != _counters.end()) {
249280
return CounterAdjustment(it->second._sequenceNum, it->second._count, 0,
250281
it->second._revisionId);
251282
}
283+
252284
return CounterAdjustment(); // do not create
253285
}
254286

@@ -258,6 +290,7 @@ RocksDBSettingsManager::CounterAdjustment RocksDBSettingsManager::loadCounter(
258290
void RocksDBSettingsManager::updateCounter(uint64_t objectId,
259291
CounterAdjustment const& update) {
260292
bool needsSync = false;
293+
auto seqNo = update.sequenceNumber();
261294
{
262295
WRITE_LOCKER(guard, _rwLock);
263296

@@ -266,19 +299,22 @@ void RocksDBSettingsManager::updateCounter(uint64_t objectId,
266299
it->second._count += update.added();
267300
it->second._count -= update.removed();
268301
// just use the latest trx info
269-
if (update.sequenceNumber() > it->second._sequenceNum) {
270-
it->second._sequenceNum = update.sequenceNumber();
302+
if (seqNo > it->second._sequenceNum) {
303+
it->second._sequenceNum = seqNo;
271304
it->second._revisionId = update.revisionId();
272305
}
273306
} else {
274307
// insert new counter
275308
_counters.emplace(std::make_pair(
276309
objectId,
277-
CMValue(update.sequenceNumber(), update.added() - update.removed(),
310+
CMValue(seqNo, update.added() - update.removed(),
278311
update.revisionId())));
279312
needsSync = true; // only count values from WAL if they are in the DB
280313
}
281314
}
315+
316+
setMaxUpdateSequenceNumber(seqNo);
317+
282318
if (needsSync) {
283319
sync(true);
284320
}
@@ -287,24 +323,36 @@ void RocksDBSettingsManager::updateCounter(uint64_t objectId,
287323
arangodb::Result RocksDBSettingsManager::setAbsoluteCounter(uint64_t objectId,
288324
uint64_t value) {
289325
arangodb::Result res;
290-
WRITE_LOCKER(guard, _rwLock);
291-
auto it = _counters.find(objectId);
292-
if (it != _counters.end()) {
293-
rocksdb::TransactionDB* db = rocksutils::globalRocksDB();
294-
it->second._sequenceNum = db->GetLatestSequenceNumber();
295-
it->second._count = value;
296-
} else {
297-
// nothing to do as the counter has never been written it can not be set to
298-
// a value that would require correction. but we use the return value to
299-
// signal that no sync is rquired
300-
res.reset(TRI_ERROR_INTERNAL, "counter value not found - no sync required");
326+
rocksdb::SequenceNumber seqNo = 0;
327+
328+
{
329+
WRITE_LOCKER(guard, _rwLock);
330+
331+
auto it = _counters.find(objectId);
332+
333+
if (it != _counters.end()) {
334+
rocksdb::TransactionDB* db = rocksutils::globalRocksDB();
335+
seqNo = db->GetLatestSequenceNumber();
336+
it->second._sequenceNum = seqNo;
337+
it->second._count = value;
338+
} else {
339+
// nothing to do as the counter has never been written it can not be set to
340+
// a value that would require correction. but we use the return value to
341+
// signal that no sync is rquired
342+
res.reset(TRI_ERROR_INTERNAL, "counter value not found - no sync required");
343+
}
301344
}
345+
346+
setMaxUpdateSequenceNumber(seqNo);
347+
302348
return res;
303349
}
304350

305351
void RocksDBSettingsManager::removeCounter(uint64_t objectId) {
306352
WRITE_LOCKER(guard, _rwLock);
307-
auto const& it = _counters.find(objectId);
353+
354+
auto it = _counters.find(objectId);
355+
308356
if (it != _counters.end()) {
309357
RocksDBKey key;
310358
key.constructCounterValue(it->first);
@@ -361,24 +409,41 @@ Result RocksDBSettingsManager::sync(bool force) {
361409
if (!lockForSync(force)) {
362410
return Result();
363411
}
364-
TRI_DEFER(_syncing = false);
412+
413+
// only one thread can enter here at a time
414+
415+
// make sure we give up our lock when we exit this function
416+
auto guard = scopeGuard([this]() { _syncing = false; });
417+
418+
auto maxUpdateSeqNo = _maxUpdateSeqNo.load(std::memory_order_acquire);
419+
420+
if (!force && maxUpdateSeqNo <= _lastSyncedSeqNo) {
421+
// if noone has updated any counters etc. since we were here last,
422+
// there is no need to do anything!
423+
return Result();
424+
}
425+
426+
// ok, when we are here, we will write out something back to the database
365427

366428
std::unordered_map<uint64_t, CMValue> copy;
367429
{ // block all updates
368430
WRITE_LOCKER(guard, _rwLock);
369431
copy = _counters;
370432
}
371433

372-
rocksdb::WriteOptions writeOptions;
373434
// fetch the seq number prior to any writes; this guarantees that we save
374435
// any subsequent updates in the WAL to replay if we crash in the middle
375436
auto seqNumber = _db->GetLatestSequenceNumber();
437+
438+
rocksdb::WriteOptions writeOptions;
376439
std::unique_ptr<rocksdb::Transaction> rtrx(
377440
_db->BeginTransaction(writeOptions));
378441

379-
VPackBuilder b;
442+
// recycle our builder
443+
_builder.clear();
444+
380445
for (std::pair<uint64_t, CMValue> const& pair : copy) {
381-
Result res = writeCounterValue(_syncedSeqNums, rtrx.get(), b, pair);
446+
Result res = writeCounterValue(_syncedSeqNums, rtrx.get(), _builder, pair);
382447
if (res.fail()) {
383448
return res;
384449
}
@@ -391,14 +456,18 @@ Result RocksDBSettingsManager::sync(bool force) {
391456
seqNumber = std::min(seqNumber, writeResult.second);
392457
}
393458

394-
Result res = writeSettings(rtrx.get(), b, seqNumber);
459+
Result res = writeSettings(rtrx.get(), _builder, seqNumber);
460+
395461
if (res.fail()) {
396462
return res;
397463
}
398464

399465
// we have to commit all counters in one batch
400466
auto s = rtrx->Commit();
467+
401468
if (s.ok()) {
469+
_lastSyncedSeqNo = maxUpdateSeqNo;
470+
402471
{
403472
WRITE_LOCKER(guard, _rwLock);
404473
_lastSync = seqNumber;
@@ -467,6 +536,7 @@ void RocksDBSettingsManager::loadSettings() {
467536

468537
void RocksDBSettingsManager::loadIndexEstimates() {
469538
WRITE_LOCKER(guard, _rwLock);
539+
470540
RocksDBKeyBounds bounds = RocksDBKeyBounds::IndexEstimateValues();
471541

472542
auto cf = RocksDBColumnFamily::definitions();
@@ -481,16 +551,16 @@ void RocksDBSettingsManager::loadIndexEstimates() {
481551
uint64_t lastSeqNumber =
482552
rocksutils::uint64FromPersistent(iter->value().data());
483553

484-
StringRef estimateSerialisation(iter->value().data() + sizeof(uint64_t),
554+
StringRef estimateSerialization(iter->value().data() + sizeof(uint64_t),
485555
iter->value().size() - sizeof(uint64_t));
486556
// If this hits we have two estimates for the same index
487557
TRI_ASSERT(_estimators.find(objectId) == _estimators.end());
488558
try {
489559
if (RocksDBCuckooIndexEstimator<uint64_t>::isFormatSupported(
490-
estimateSerialisation)) {
560+
estimateSerialization)) {
491561
auto it = _estimators.emplace(
492562
objectId, std::make_unique<RocksDBCuckooIndexEstimator<uint64_t>>(
493-
lastSeqNumber, estimateSerialisation));
563+
lastSeqNumber, estimateSerialization));
494564
if (it.second) {
495565
auto estimator = it.first->second.get();
496566
LOG_TOPIC(TRACE, Logger::ENGINES)
@@ -543,26 +613,35 @@ RocksDBSettingsManager::stealIndexEstimator(uint64_t objectId) {
543613
std::unique_ptr<RocksDBCuckooIndexEstimator<uint64_t>> res;
544614

545615
WRITE_LOCKER(guard, _rwLock);
616+
546617
auto it = _estimators.find(objectId);
618+
547619
if (it != _estimators.end()) {
548620
// We swap out the stored estimate in order to move it to the caller
549621
res.swap(it->second);
550622
// Drop the now empty estimator
551623
_estimators.erase(objectId);
552624
}
625+
553626
return res;
554627
}
555628

556629
uint64_t RocksDBSettingsManager::stealKeyGenerator(uint64_t objectId) {
557-
WRITE_LOCKER(guard, _rwLock);
558630
uint64_t res = 0;
559-
auto it = _generators.find(objectId);
560-
if (it != _generators.end()) {
561-
// We swap out the stored estimate in order to move it to the caller
562-
res = it->second;
563-
// Drop the now empty estimator
564-
_generators.erase(objectId);
631+
632+
{
633+
WRITE_LOCKER(guard, _rwLock);
634+
635+
auto it = _generators.find(objectId);
636+
637+
if (it != _generators.end()) {
638+
// We swap out the stored generator state in order to move it to the caller
639+
res = it->second;
640+
// we are now not responsible for the generator anymore
641+
_generators.erase(objectId);
642+
}
565643
}
644+
566645
return res;
567646
}
568647

arangod/RocksDBEngine/RocksDBSettingsManager.h

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -143,58 +143,59 @@ class RocksDBSettingsManager {
143143
rocksdb::SequenceNumber earliestSeqNeeded() const;
144144

145145
private:
146+
/// bump up the value of the last rocksdb::SequenceNumber we have seen
147+
/// and that is pending a sync update
148+
void setMaxUpdateSequenceNumber(rocksdb::SequenceNumber seqNo);
149+
146150
void loadCounterValues();
147151
void loadSettings();
148152
void loadIndexEstimates();
149153
void loadKeyGenerators();
150154

151155
bool lockForSync(bool force);
152156

153-
//////////////////////////////////////////////////////////////////////////////
157+
/// @brief a reusable builder, used inside sync() to serialize objects
158+
arangodb::velocypack::Builder _builder;
159+
154160
/// @brief counter values
155-
//////////////////////////////////////////////////////////////////////////////
156161
std::unordered_map<uint64_t, CMValue> _counters;
157162

158-
//////////////////////////////////////////////////////////////////////////////
159163
/// @brief Key generator container
160-
//////////////////////////////////////////////////////////////////////////////
161164
std::unordered_map<uint64_t, uint64_t> _generators;
162165

163-
//////////////////////////////////////////////////////////////////////////////
164166
/// @brief Index Estimator contianer.
165167
/// Note the elements in this container will be moved into the
166168
/// index classes and are only temporarily stored here during recovery.
167-
//////////////////////////////////////////////////////////////////////////////
168169
std::unordered_map<uint64_t,
169170
std::unique_ptr<RocksDBCuckooIndexEstimator<uint64_t>>>
170171
_estimators;
171172

172-
//////////////////////////////////////////////////////////////////////////////
173173
/// @brief synced sequence numbers
174-
//////////////////////////////////////////////////////////////////////////////
175174
std::unordered_map<uint64_t, rocksdb::SequenceNumber> _syncedSeqNums;
176175

177-
//////////////////////////////////////////////////////////////////////////////
178176
/// @brief last sync sequence number
179-
//////////////////////////////////////////////////////////////////////////////
180177
rocksdb::SequenceNumber _lastSync;
181-
182-
//////////////////////////////////////////////////////////////////////////////
178+
183179
/// @brief currently syncing
184-
//////////////////////////////////////////////////////////////////////////////
185180
std::atomic<bool> _syncing;
186181

187-
//////////////////////////////////////////////////////////////////////////////
188182
/// @brief rocksdb instance
189-
//////////////////////////////////////////////////////////////////////////////
190183
rocksdb::TransactionDB* _db;
191184

192-
//////////////////////////////////////////////////////////////////////////////
193185
/// @brief protect _syncing and _counters
194-
//////////////////////////////////////////////////////////////////////////////
195186
mutable basics::ReadWriteLock _rwLock;
196187

1 80EB 97188
TRI_voc_tick_t _initialReleasedTick;
189+
190+
/// @brief the maximum sequence number that we have encountered
191+
/// when updating a counter value
192+
std::atomic<rocksdb::SequenceNumber> _maxUpdateSeqNo;
193+
194+
/// @brief the last maximum sequence number we stored when we last synced
195+
/// all counters back to persistent storage
196+
/// if this is identical to _maxUpdateSeqNo, we do not need to write
197+
/// back any counter values to disk and can save I/O
198+
rocksdb::SequenceNumber _lastSyncedSeqNo;
198199
};
199200
} // namespace arangodb
200201

0 commit comments

Comments
 (0)
0