8000 moved compaction info into StorageEngine · lethalbrains/arangodb@680042b · GitHub
[go: up one dir, main page]

Skip to content

Commit 680042b

Browse files
committed
moved compaction info into StorageEngine
1 parent 9b0ba1e commit 680042b

File tree

11 files changed

+370
-271
lines changed

11 files changed

+370
-271
lines changed

arangod/RestHandler/RestReplicationHandler.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,14 @@
4040
#include "Rest/Version.h"
4141
#include "RestServer/DatabaseFeature.h"
4242
#include "RestServer/ServerIdFeature.h"
43+
#include "StorageEngine/EngineSelectorFeature.h"
44+
#include "StorageEngine/StorageEngine.h"
4345
#include "Utils/CollectionGuard.h"
4446
#include "Utils/CollectionKeys.h"
4547
#include "Utils/CollectionKeysRepository.h"
4648
#include "Utils/CollectionNameResolver.h"
4749
#include "Utils/StandaloneTransactionContext.h"
4850
#include "Utils/TransactionContext.h"
49-
#include "VocBase/CompactorThread.h"
5051
#include "VocBase/replication-applier.h"
5152
#include "VocBase/replication-dump.h"
5253
#include "VocBase/ticks.h"
@@ -561,7 +562,8 @@ void RestReplicationHandler::handleCommandBatch() {
561562
VelocyPackHelper::getNumericValue<double>(input->slice(), "ttl", 0);
562563

563564
TRI_voc_tick_t id;
564-
int res = TRI_InsertBlockerCompactorVocBase(_vocbase, expires, &id);
565+
StorageEngine* engine = EngineSelectorFeature::ENGINE;
566+
int res = engine->insertCompactionBlocker(_vocbase, expires, id);
565567

566568
if (res != TRI_ERROR_NO_ERROR) {
567569
generateError(GeneralResponse::responseCode(res), res);
@@ -599,7 +601,8 @@ void RestReplicationHandler::handleCommandBatch() {
599601
VelocyPackHelper::getNumericValue<double>(input->slice(), "ttl", 0);
600602

601603
// now extend the blocker
602-
int res = TRI_TouchBlockerCompactorVocBase(_vocbase, id, expires);
604+
StorageEngine* engine = EngineSelectorFeature::ENGINE;
605+
int res = engine->extendCompactionBlocker(_vocbase, id, expires);
603606

604607
if (res == TRI_ERROR_NO_ERROR) {
605608
setResponseCode(GeneralResponse::ResponseCode::NO_CONTENT);
@@ -614,7 +617,8 @@ void RestReplicationHandler::handleCommandBatch() {
614617
TRI_voc_tick_t id =
615618
static_cast<TRI_voc_tick_t>(StringUtils::uint64(suffix[1]));
616619

617-
int res = TRI_RemoveBlockerCompactorVocBase(_vocbase, id);
620+
StorageEngine* engine = EngineSelectorFeature::ENGINE;
621+
int res = engine->removeCompactionBlocker(_vocbase, id);
618622

619623
if (res == TRI_ERROR_NO_ERROR) {
620624
setResponseCode(GeneralResponse::ResponseCode::NO_CONTENT);
@@ -2772,8 +2776,9 @@ void RestReplicationHandler::handleCommandCreateKeys() {
27722776
TRI_ASSERT(col != nullptr);
27732777

27742778
// turn off the compaction for the collection
2779+
StorageEngine* engine = EngineSelectorFeature::ENGINE;
27752780
TRI_voc_tick_t id;
2776-
res = TRI_InsertBlockerCompactorVocBase(_vocbase, 1200.0, &id);
2781+
res = engine->insertCompactionBlocker(_vocbase, 1200.0, id);
27772782

27782783
if (res != TRI_ERROR_NO_ERROR) {
27792784
THROW_ARANGO_EXCEPTION(res);

arangod/StorageEngine/MMFilesEngine.cpp

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1298,3 +1298,159 @@ VocbaseCollectionInfo MMFilesEngine::loadCollectionInfo(TRI_vocbase_t* vocbase,
12981298
}
12991299
return info;
13001300
}
1301+
1302+
/// @brief remove data of expired compaction blockers
1303+
bool MMFilesEngine::cleanupCompactionBlockers(TRI_vocbase_t* vocbase) {
1304+
// check if we can instantly acquire the lock
1305+
TRY_WRITE_LOCKER(locker, _compactionBlockersLock);
1306+
1307+
if (!locker.isLocked()) {
1308+
// couldn't acquire lock
1309+
return false;
1310+
}
1311+
1312+
auto it = _compactionBlockers.find(vocbase->id());
1313+
1314+
if (it == _compactionBlockers.end()) {
1315+
// no entry for this database
1316+
return true;
1317+
}
1318+
1319+
// we are now holding the write lock
1320+
double now = TRI_microtime();
1321+
1322+
size_t n = (*it).second.size();
1323+
1324+
for (size_t i = 0; i < n; /* no hoisting */) {
1325+
auto& blocker = (*it).second[i];
1326+
1327+
if (blocker._expires < now) {
1328+
(*it).second.erase((*it).second.begin() + i);
1329+
n--;
1330+
} else {
1331+
i++;
1332+
}
1333+
}
1334+
1335+
if ((*it).second.empty()) {
1336+
// remove last element
1337+
_compactionBlockers.erase(it);
1338+
}
1339+
1340+
return true;
1341+
}
1342+
1343+
/// @brief insert a compaction blocker
1344+
int MMFilesEngine::insertCompactionBlocker(TRI_vocbase_t* vocbase, double ttl,
1345+
TRI_voc_tick_t& id) {
1346+
id = 0;
1347+
1348+
if (ttl <= 0.0) {
1349+
return TRI_ERROR_BAD_PARAMETER;
1350+
}
1351+
1352+
CompactionBlocker blocker(TRI_NewTickServer(), TRI_microtime() + ttl);
1353+
1354+
{
1355+
WRITE_LOCKER_EVENTUAL(locker, _compactionBlockersLock, 1000);
1356+
1357+
auto it = _compactionBlockers.find(vocbase->id());
1358+
1359+
if (it == _compactionBlockers.end()) {
1360+
it = _compactionBlockers.emplace(vocbase->id(), std::vector<CompactionBlocker>()).first;
1361+
}
1362+
1363+
(*it).second.emplace_back(blocker);
1364+
}
1365+
1366+
id = blocker._id;
1367+
1368+
return TRI_ERROR_NO_ERROR;
1369+
}
1370+
1371+
/// @brief touch an existing compaction blocker
1372+
int MMFilesEngine::extendCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id,
1373+
double ttl) {
1374+
if (ttl <= 0.0) {
1375+
return TRI_ERROR_BAD_PARAMETER;
1376+
}
1377+
1378+
WRITE_LOCKER_EVENTUAL(locker, _compactionBlockersLock, 1000);
1379+
1380+
auto it = _compactionBlockers.find(vocbase->id());
1381+
1382+
if (it == _compactionBlockers.end()) {
1383+
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
1384+
}
1385+
1386+
for (auto& blocker : (*it).second) {
1387+
if (blocker._id == id) {
1388+
blocker._expires = TRI_microtime() + ttl;
1389+
return TRI_ERROR_NO_ERROR;
1390+
}
1391+
}
1392+
1393+
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
1394+
}
1395+
1396+
/// @brief remove an existing compaction blocker
1397+
int MMFilesEngine::removeCompactionBlocker(TRI_vocbase_t* vocbase,
1398+
TRI_voc_tick_t id) {
1399+
WRITE_LOCKER_EVENTUAL(locker, _compactionBlockersLock, 1000);
1400+
1401+
auto it = _compactionBlockers.find(vocbase->id());
1402+
1403+
if (it == _compactionBlockers.end()) {
1404+
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
1405+
}
1406+
1407+
size_t const n = (*it).second.size();
1408+
1409+
for (size_t i = 0; i < n; ++i) {
1410+
auto& blocker = (*it).second[i];
1411+
if (blocker._id == id) {
1412+
(*it).second.erase((*it).second.begin() + i);
1413+
1414+
if ((*it).second.empty()) {
1415+
// remove last item
1416+
_compactionBlockers.erase(it);
1417+
}
1418+
return TRI_ERROR_NO_ERROR;
1419+
}
1420+
}
1421+
1422+
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
1423+
}
1424+
1425+
void MMFilesEngine::preventCompaction(TRI_vocbase_t* vocbase,
1426+
std::function<void(TRI_vocbase_t*)> const& callback) {
1427+
WRITE_LOCKER_EVENTUAL(locker, _compactionBlockersLock, 5000);
1428+
callback(vocbase);
1429+
}
1430+
1431+
bool MMFilesEngine::tryPreventCompaction(TRI_vocbase_t* vocbase,
1432+
std::function<void(TRI_vocbase_t*)> const& callback,
1433+
bool checkForActiveBlockers) {
1434+
TRY_WRITE_LOCKER(locker, _compactionBlockersLock);
1435+
1436+
if (locker.isLocked()) {
1437+
if (checkForActiveBlockers) {
1438+
double const now = TRI_microtime();
1439+
1440+
// check if we have a still-valid compaction blocker
1441+
auto it = _compactionBlockers.find(vocbase->id());
1442+
1443+
if (it != _compactionBlockers.end()) {
1444+
for (auto const& blocker : (*it).second) {
1445+
if (blocker._expires > now) {
1446+
// found a compaction blocker
1447+
return false;
1448+
}
1449+
}
1450+
}
1451+
}
1452+
callback(vocbase);
1453+
return true;
1454+
}
1455+
return false;
1456+
}

arangod/StorageEngine/MMFilesEngine.h

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,27 @@ class MMFilesEngine final : public StorageEngine {
220220

221221
/// @brief scans a collection and locates all files
222222
MMFilesEngineCollectionFiles scanCollectionDirectory(std::string const& path);
223+
224+
/// @brief remove data of expired compaction blockers
225+
bool cleanupCompactionBlockers(TRI_vocbase_t* vocbase) override;
226+
227+
/// @brief insert a compaction blocker
228+
int insertCompactionBlocker(TRI_vocbase_t* vocbase, double ttl, TRI_voc_tick_t& id) override;
229+
230+
/// @brief touch an existing compaction blocker
231+
int extendCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id, double ttl) override;
232+
233+
/// @brief remove an existing compaction blocker
234+
int removeCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id) override;
235+
236+
/// @brief a callback function that is run while it is guaranteed that there is no compaction ongoing
237+
void preventCompaction(TRI_vocbase_t* vocbase,
238+
std::function<void(TRI_vocbase_t*)> const& callback) override;
239+
240+
/// @brief a callback function that is run there is no compaction ongoing
241+
bool tryPreventCompaction(TRI_vocbase_t* vocbase,
242+
std::function<void(TRI_vocbase_t*)> const& callback,
243+
bool checkForActiveBlockers) override;
223244

224245
private:
225246
void verifyDirectories();
@@ -285,6 +306,19 @@ class MMFilesEngine final : public StorageEngine {
285306
std::vector<std::pair<std::string, std::string>> _deleted;
286307

287308
std::unordered_map<TRI_voc_tick_t, std::unordered_map<TRI_voc_cid_t, std::string>> _collectionPaths;
309+
310+
struct CompactionBlocker {
311+
CompactionBlocker(TRI_voc_tick_t id, double expires) : _id(id), _expires(expires) {}
312+
CompactionBlocker() = delete;
313+
314+
TRI_voc_tick_t _id;
315+
double _expires;
316+
};
317+
318+
// lock for compaction blockers
319+
arangodb::basics::ReadWriteLock _compactionBlockersLock;
320+
// cross-database map of compaction blockers
321+
std::unordered_map<TRI_voc_tick_t, std::vector<CompactionBlocker>> _compactionBlockers;
288322
};
289323

290324
}

arangod/StorageEngine/OtherEngine.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,37 @@ class OtherEngine final : public StorageEngine {
196196
// from the storage engine's realm
197197
void removeDocumentRevision(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
198198
arangodb::velocypack::Slice const& document) override;
199+
200+
/// @brief remove data of expired compaction blockers
201+
bool cleanupCompactionBlockers(TRI_vocbase_t* vocbase) override { return false; }
202+
203+
/// @brief insert a compaction blocker
204+
int insertCompactionBlocker(TRI_vocbase_t* vocbase, double ttl, TRI_voc_tick_t& id) override {
205+
id = 0;
206+
return TRI_ERROR_NO_ERROR;
207+
}
208+
209+
/// @brief touch an existing compaction blocker
210+
int extendCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id, double ttl) override {
211+
return TRI_ERROR_NO_ERROR;
212+
}
213+
214+
/// @brief remove an existing compaction blocker
215+
int removeCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id) override {
216+
return TRI_ERROR_NO_ERROR;
217+
}
218+
219+
/// @brief a callback function that is run while it is guaranteed that there is no compaction ongoing
220+
void preventCompaction(TRI_vocbase_t* vocbase,
221+
std::function<void(TRI_vocbase_t*)> const& callback) override {
222+
}
223+
224+
/// @brief a callback function that is run there is no compaction ongoing
225+
bool tryPreventCompaction(TRI_vocbase_t* vocbase,
226+
std::function<void(TRI_vocbase_t*)> const& callback,
227+
bool checkForActiveBlockers) override {
228+
return true;
229+
}
199230

200231
public:
201232
static std::string const EngineName;

arangod/StorageEngine/StorageEngine.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,28 @@ class StorageEngine : public application_features::ApplicationFeature {
210210
// from the storage engine's realm
211211
virtual void removeDocumentRevision(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
212212
arangodb::velocypack::Slice const& document) = 0;
213+
214+
/// @brief remove data of expired compaction blockers
215+
virtual bool cleanupCompactionBlockers(TRI_vocbase_t* vocbase) = 0;
216+
217+
/// @brief insert a compaction blocker
218+
virtual int insertCompactionBlocker(TRI_vocbase_t* vocbase, double ttl, TRI_voc_tick_t& id) = 0;
219+
220+
/// @brief touch an existing compaction blocker
221+
virtual int extendCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id, double ttl) = 0;
213222

223+
/// @brief remove an existing compaction blocker
224+
virtual int removeCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id) = 0;
225+
226+
/// @brief a callback function that is run while it is guaranteed that there is no compaction ongoing
227+
virtual void preventCompaction(TRI_vocbase_t* vocbase,
228+
std::function<void(TRI_vocbase_t*)> const& callback) = 0;
229+
230+
/// @brief a callback function that is run there is no compaction ongoing
231+
virtual bool tryPreventCompaction(TRI_vocbase_t* vocbase,
232+
std::function<void(TRI_vocbase_t*)> const& callback,
233+
bool checkForActiveBlockers) = 0;
234+
214235
protected:
215236
TRI_vocbase_col_t* registerCollection(bool doLock, TRI_vocbase_t* vocbase, TRI_col_type_e type, TRI_voc_cid_t cid,
216237
std::string const& name, TRI_voc_cid_t planId, std::string const& path) {

arangod/Utils/CollectionExport.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@
2424
#include "CollectionExport.h"
2525
#include "Basics/WriteLocker.h"
2626
#include "Indexes/PrimaryIndex.h"
27+
#include "StorageEngine/EngineSelectorFeature.h"
28+
#include "StorageEngine/StorageEngine.h"
2729
#include "Utils/CollectionGuard.h"
2830
#include "Utils/SingleCollectionTransaction.h"
2931
#include "Utils/StandaloneTransactionContext.h"
30-
#include "VocBase/CompactorThread.h"
3132
#include "VocBase/Ditch.h"
3233
#include "VocBase/collection.h"
3334
#include "VocBase/vocbase.h"
@@ -64,13 +65,13 @@ CollectionExport::~CollectionExport() {
6465
}
6566

6667
void CollectionExport::run(uint64_t maxWaitTime, size_t limit) {
67-
{
68-
// try to acquire the exclusive lock on the compaction
69-
WRITE_LOCKER_EVENTUAL(locker, _document->_vocbase->_compactionBlockers._lock, 5000);
68+
StorageEngine* engine = EngineSelectorFeature::ENGINE;
7069

70+
// try to acquire the exclusive lock on the compaction
71+
engine->preventCompaction(_document->_vocbase, [this](TRI_vocbase_t* vocbase) {
7172
// create a ditch under the compaction lock
7273
_ditch = _document->ditches()->createDocumentDitch(false, __FILE__, __LINE__);
73-
}
74+
});
7475

7576
// now we either have a ditch or not
7677
if (_ditch == nullptr) {

0 commit comments

Comments
 (0)
0