10000 Rdb index background (preliminary) by graetzer · Pull Request #7644 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Rdb index background (preliminary) #7644

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Dec 21, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
3f6967c
Initial commit
graetzer Dec 3, 2018
38b3500
make sure index is hidden
graetzer Dec 3, 2018
0ddb321
Merge branch 'devel' of github.com:arangodb/arangodb into feature/rdb…
graetzer Dec 4, 2018
9bcc737
last changes
graetzer Dec 4, 2018
c57de4f
fix a bug
graetzer Dec 4, 2018
c798357
reduce conflicts
graetzer Dec 5, 2018
4766900
fix background indexing
graetzer Dec 6, 2018
b525712
remove unused code
graetzer Dec 6, 2018
0515650
fix link creation
graetzer Dec 6, 2018
70ef2f1
fix unique constraint violations
graetzer Dec 6, 2018
e6d23c1
fixed arangosearch cluster reporting
graetzer Dec 6, 2018
13bcd44
added test
graetzer Dec 6, 2018
9e5f960
fix test
graetzer Dec 6, 2018
6bfd840
make noncluster for now
graetzer Dec 6, 2018
3d20521
fix jslint
graetzer Dec 7, 2018
38005e0
Some test adjustments.
Dec 10, 2018
dc041fb
Merge branch 'devel' into feature/rdb-index-background
Dec 10, 2018
a7ae28a
Fix merge error.
Dec 10, 2018
db46a40
changes
graetzer Dec 10, 2018
9db5709
Merge branch ' 8000 feature/rdb-index-background' of github.com:arangodb/ar…
graetzer Dec 10, 2018
ef4bb05
adding inBackground flag
graetzer Dec 11, 2018
5546745
Merge branch 'devel' into feature/rdb-index-background
Dec 11, 2018
2ebf591
Fix merge errors.
Dec 11, 2018
2082f78
adding some docs
graetzer Dec 12, 2018
3efc632
Merge branch 'devel' into feature/rdb-index-background
Dec 12, 2018
98ddd16
Some small changes.
Dec 12, 2018
ee51fa5
Fixed removal bug and added test.
Dec 12, 2018
422e0ce
Added update test.
Dec 17, 2018
31d9d7b
Merge branch 'devel' of github.com:arangodb/arangodb into feature/rdb…
graetzer Dec 18, 2018
a34f928
forgot to comment out docs
graetzer Dec 18, 2018
021453c
fixing some code
graetzer Dec 19, 2018
f61b5e9
fix jslint
graetzer Dec 19, 2018
b013b15
remove some code
graetzer Dec 19, 2018
82f4b6e
fix reporting of unfinished indexes
graetzer Dec 20, 2018
b7d6a9d
Merge branch 'devel' of github.com:arangodb/arangodb into feature/rdb…
graetzer Dec 20, 2018
2f1cc80
fixing fillIndex for iresearch
graetzer Dec 21, 2018
371c738
revert a change
graetzer Dec 21, 2018
7f4e2e7
Merge branch 'devel' of github.com:arangodb/arangodb into feature/rdb…
graetzer Dec 21, 2018
3f46816
fixng a deadlock
graetzer Dec 21, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
changes
  • Loading branch information
graetzer committed Dec 10, 2018
commit db46a40334f50b706f1510f21cd37230b96c7a4a
7 changes: 2 additions & 5 deletions arangod/Cluster/ClusterInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2433,14 +2433,11 @@ int ClusterInfo::ensureIndexCoordinator(

// check index id
uint64_t iid = 0;

VPackSlice const idSlice = slice.get(StaticStrings::IndexId);
if (idSlice.isString()) {
// use predefined index id
if (idSlice.isString()) { // use predefined index id
iid = arangodb::basics::StringUtils::uint64(idSlice.copyString());
}
if (iid == 0) {
// no id set, create a new one!
if (iid == 0) { // no id set, create a new one!
iid = uniqid();
}
std::string const idString = arangodb::basics::StringUtils::itoa(iid);
Expand Down
14 changes: 6 additions & 8 deletions arangod/ClusterEngine/ClusterCollection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,14 +368,12 @@ std::shared_ptr<Index> ClusterCollection::createIndex(
WRITE_LOCKER(guard, _exclusiveLock);
std::shared_ptr<Index> idx;

{
WRITE_LOCKER(guard, _indexesLock);
idx = findIndex(info, _indexes);
if (idx) {
created = false;
// We already have this index.
return idx;
}
WRITE_LOCKER(guard2, _indexesLock);
idx = lookupIndex(info);
if (idx) {
created = false;
// We already have this index.
return idx;
}

StorageEngine* engine = EngineSelectorFeature::ENGINE;
Expand Down
68 changes: 30 additions & 38 deletions arangod/RocksDBEngine/RocksDBBuilderIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,30 +71,19 @@ Result RocksDBBuilderIndex::insertInternal(transaction::Methods* trx, RocksDBMet
arangodb::velocypack::Slice const& slice,
OperationMode mode) {
Result r = _wrapped->insertInternal(trx, mthd, documentId, slice, mode);
if (!r.ok() && !_hasError.load(std::memory_order_acquire)) {
_errorResult = r;
_hasError.store(true, std::memory_order_release);
if (r.is(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) ||
r.is(TRI_ERROR_LOCK_TIMEOUT) ||
r.is(TRI_ERROR_DEADLOCK) ||
r.is(TRI_ERROR_ARANGO_CONFLICT)) {
bool expected = false;
if (!r.ok() && !_hasError.compare_exchange_strong(expected, true, std::memory_order_release)) {
std::lock_guard<std::mutex> guard(_errorMutex);
_errorResult = r;
}
}
return Result();
}

//Result RocksDBBuilderIndex::updateInternal(transaction::Methods* trx, RocksDBMethods* mthd,
// LocalDocumentId const& oldDocumentId,
// arangodb::velocypack::Slice const& oldDoc,
// LocalDocumentId const& newDocumentId,
// arangodb::velocypack::Slice const& newDoc,
// OperationMode mode) {
// // It is illegal to call this method on the primary index
// // RocksDBPrimaryIndex must override this method accordingly
// TRI_ASSERT(type() != TRI_IDX_TYPE_PRIMARY_INDEX);
//
// Result res = removeInternal(trx, mthd, oldDocumentId, oldDoc, mode);
// if (!res.ok()) {
// return res;
// }
// return insertInternal(trx, mthd, newDocumentId, newDoc, mode);
//}

/// remove index elements and put it in the specified write batch.
Result RocksDBBuilderIndex::removeInternal(transaction::Methods* trx, RocksDBMethods* mthd,
LocalDocumentId const& documentId,
Expand All @@ -112,9 +101,15 @@ Result RocksDBBuilderIndex::removeInternal(transaction::Methods* trx, RocksDBMet
}

Result r = _wrapped->removeInternal(trx, mthd, documentId, slice, mode);
if (!r.ok() && !_hasError.load(std::memory_order_acquire)) {
_errorResult = r;
_hasError.store(true, std::memory_order_release);
if (r.is(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) ||
r.is(TRI_ERROR_LOCK_TIMEOUT) ||
r.is(TRI_ERROR_DEADLOCK) ||
r.is(TRI_ERROR_ARANGO_CONFLICT)) {
bool expected = false;
if (!r.ok() && !_hasError.compare_exchange_strong(expected, true, std::memory_order_release)) {
std::lock_guard<std::mutex> guard(_errorMutex);
_errorResult = r;
}
}
return Result();
}
Expand All @@ -128,6 +123,12 @@ arangodb::Result RocksDBBuilderIndex::fillIndexBackground(std::function<void()>
// 3. Avoid conflicts on unique index keys by using rocksdb::Transaction snapshot conflict checking
// 4. Supress unique constraint violations / conflicts or client drivers

auto lockedDocsGuard = scopeGuard([&] { // clear all the processed documents
std::lock_guard<std::mutex> guard(_lockedDocsMutex);
_lockedDocs.clear();
_lockedDocsCond.notify_all();
});

// fillindex can be non transactional, we just need to clean up
RocksDBEngine* engine = rocksutils::globalRocksEngine();
RocksDBCollection* rcoll = static_cast<RocksDBCollection*>(_collection.getPhysical());
Expand Down Expand Up @@ -177,7 +178,7 @@ arangodb::Result RocksDBBuilderIndex::fillIndexBackground(std::function<void()>
} else {
rtrx->DisableIndexing(); // we never check for existing index keys
}
RocksDBSubTrxMethods batched(state, rtrx.get());
RocksDBSideTrxMethods batched(state, rtrx.get());

RocksDBIndex* internal = _wrapped.get();
TRI_ASSERT(internal != nullptr);
Expand All @@ -186,6 +187,7 @@ arangodb::Result RocksDBBuilderIndex::fillIndexBackground(std::function<void()>
while (it->Valid() && it->key().compare(upper) < 0) {

if (_hasError.load(std::memory_order_acquire)) {
std::lock_guard<std::mutex> guard(_errorMutex);
res = _errorResult; // a Writer got an error
break;
}
Expand All @@ -197,7 +199,7 @@ arangodb::Result RocksDBBuilderIndex::fillIndexBackground(std::function<void()>
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to remove it from the set here, right? We won't encounter the same revision again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes but lets avoid re-hashing it

}
std::lock_guard<std::mutex> guard2(_lockedDocsMutex);
_lockedDocs.insert(docId.id());// must be done under _removedDocsMutex
_lockedDocs.insert(docId.id());
}

res = internal->insertInternal(&trx, &batched, docId,
Expand All @@ -215,7 +217,7 @@ arangodb::Result RocksDBBuilderIndex::fillIndexBackground(std::function<void()>
break;
}
{ // clear all the processed documents
std::lock_guard<std::mutex> guard2(_lockedDocsMutex);
std::lock_guard<std::mutex> guard(_lockedDocsMutex);
_lockedDocs.clear();
_lockedDocsCond.notify_all();
}
Expand All @@ -240,11 +242,6 @@ arangodb::Result RocksDBBuilderIndex::fillIndexBackground(std::function<void()>
res = trx.commit(); // required to commit selectivity estimates
}

// clear all the processed documents
std::lock_guard<std::mutex> guard2(_lockedDocsMutex);
_lockedDocs.clear();
_lockedDocsCond.notify_all();

return res;
}

Expand Down Expand Up @@ -331,15 +328,12 @@ static arangodb::Result fillIndexFast(transaction::Methods& trx,

/// non-transactional: fill index with existing documents
/// from this collection
arangodb::Result RocksDBBuilderIndex::fillIndex(std::function<void()> const& unlock) {
// TRI_ASSERT(trx.state()->collection(_collection.id(), AccessMode::Type::WRITE));
arangodb::Result RocksDBBuilderIndex::fillIndexFast() {

#if 0
RocksDBCollection* coll = static_cast<RocksDBCollection*>(_collection.getPhysical());
unlock(); // we do not need the outer lock
SingleCollectionTransaction trx(transaction::StandaloneContext::Create(_collection.vocbase()),
_collection, AccessMode::Type::EXCLUSIVE);
res = trx.begin();
Result res = trx.begin();
if (!res.ok()) {
THROW_ARANGO_EXCEPTION(res);
}
Expand All @@ -355,6 +349,4 @@ arangodb::Result RocksDBBuilderIndex::fillIndex(std::function<void()> const& unl
rocksdb::WriteBatch batch(32 * 1024 * 1024);
return ::fillIndexFast<rocksdb::WriteBatch, RocksDBBatchedMethods>(trx, this, coll, batch);
}
#endif
return fillIndexBackground(unlock);
}
21 changes: 9 additions & 12 deletions arangod/RocksDBEngine/RocksDBBuilderIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

namespace arangodb {

/// Dummy index class that contains the logic to build indexes
/// without an exclusive lock. It wraps the actual index implementation
/// and adds some required synchronization logic on top
class RocksDBBuilderIndex final : public arangodb::RocksDBIndex {

public:
Expand Down Expand Up @@ -89,13 +92,6 @@ class RocksDBBuilderIndex final : public arangodb::RocksDBIndex {
LocalDocumentId const& documentId,
arangodb::velocypack::Slice const&,
OperationMode mode) override;
//
// Result updateInternal(transaction::Methods* trx, RocksDBMethods*,
// LocalDocumentId const& oldDocumentId,
// arangodb::velocypack::Slice const& oldDoc,
// LocalDocumentId const& newDocumentId,
// velocypack::Slice const& newDoc,
// OperationMode mode) override;

/// remove index elements and put it in the specified write batch.
Result removeInternal(transaction::Methods* trx, RocksDBMethods*,
Expand All @@ -116,17 +112,18 @@ class RocksDBBuilderIndex final : public arangodb::RocksDBIndex {
_wrapped->recalculateEstimates();
}

/// @brief fill the index
/// @param unlock will be called when the index lock can be released
Result fillIndex(std::function<void()> const& unlock);

private:
/// @brief fill index, will exclusively lock the collection
Result fillIndexFast();

/// @brief fill the index, assume already locked exclusively
/// @param unlock called when collection lock can be released
Result fillIndexBackground(std::function<void()> const& unlock);

private:
std::shared_ptr<arangodb::RocksDBIndex> _wrapped;

std::atomic<bool> _hasError;
std::mutex _errorMutex;
Result _errorResult;

std::mutex _removedDocsMutex;
Expand Down
27 changes: 17 additions & 10 deletions arangod/RocksDBEngine/RocksDBCollection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ void RocksDBCollection::prepareIndexes(
}

std::shared_ptr<Index> RocksDBCollection::createIndex(
arangodb::velocypack::Slice const& info, bool restore,
bool& created) {
arangodb::velocypack::Slice const& info,
bool restore, bool& created) {
TRI_ASSERT(info.isObject());
Result res;

Expand Down Expand Up @@ -345,7 +345,7 @@ std::shared_ptr<Index> RocksDBCollection::createIndex(
}

RocksDBEngine* engine = static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);

// Step 2. We are sure that we do not have an index of this type.
// We also hold the lock. Create it
const bool generateKey = !restore;
Expand All @@ -367,11 +367,10 @@ std::shared_ptr<Index> RocksDBCollection::createIndex(
}

auto buildIdx = std::make_shared<RocksDBBuilderIndex>(std::static_pointer_cast<RocksDBIndex>(idx));

// Step 3. add index to collection entry (for removal after a crash)
if (!engine->inRecovery()) {
// read collection info from database
RocksDBKey key;
if (!engine->inRecovery()) { // manually modify collection entry, other methods need lock
RocksDBKey key; // read collection info from database
key.constructCollection(_logicalCollection.vocbase().id(), _logicalCollection.id());
rocksdb::PinnableSlice value;
rocksdb::Status s = engine->db()->Get(rocksdb::ReadOptions(),
Expand Down Expand Up @@ -399,13 +398,21 @@ std::shared_ptr<Index> RocksDBCollection::createIndex(
RocksDBLogValue::Empty());
}
}

bool inBackground = basics::VelocyPackHelper::getBooleanValue(
info, StaticStrings::IndexInBackground, false);

// Step 4. fill index
if (res.ok()) {
_indexes.emplace_back(buildIdx); // add index to indexes list
res = buildIdx->fillIndex([&] {
if (inBackground) { // allow concurrent inserts into index
_indexes.emplace_back(buildIdx);
res = buildIdx->fillIndexBackground([&] {
unlockGuard.fire();
});
} else {
unlockGuard.fire();
});
res = buildIdx->fillIndexFast(); // will lock again internally
}
}

// Step 5. cleanup
Expand Down
10 changes: 6 additions & 4 deletions arangod/RocksDBEngine/RocksDBIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ using namespace arangodb::rocksutils;

uint64_t const arangodb::RocksDBIndex::ESTIMATOR_SIZE = 4096;

inline static uint64_t ensureObjectId(uint64_t oid) {
return (oid != 0) ? oid : TRI_NewTickServer();
namespace {
inline uint64_t ensureObjectId(uint64_t oid) {
return (oid != 0) ? oid : TRI_NewTickServer();
}
}

RocksDBIndex::RocksDBIndex(
Expand All @@ -68,7 +70,7 @@ RocksDBIndex::RocksDBIndex(
bool useCache
)
: Index(id, collection, attributes, unique, sparse),
_objectId(ensureObjectId(objectId)),
_objectId(::ensureObjectId(objectId)),
_cf(cf),
_cache(nullptr),
_cachePresent(false),
Expand All @@ -94,7 +96,7 @@ RocksDBIndex::RocksDBIndex(
bool useCache
)
: Index(id, collection, info),
_objectId(ensureObjectId(basics::VelocyPackHelper::stringUInt64(info.get("objectId")))),
_objectId(::ensureObjectId(basics::VelocyPackHelper::stringUInt64(info.get("objectId")))),
_cf(cf),
_cache(nullptr),
_cachePresent(false),
Expand Down
Loading
0