8000 [zkd] Unique Constraints by maierlars · Pull Request #13691 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

[zkd] Unique Constraints #13691

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions arangod/RocksDBEngine/RocksDBIndexFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,48 @@ struct SecondaryIndexFactory : public DefaultIndexFactory {
}
};

struct ZkdIndexFactory : public DefaultIndexFactory {
ZkdIndexFactory(arangodb::application_features::ApplicationServer& server)
: DefaultIndexFactory(server, Index::TRI_IDX_TYPE_ZKD_INDEX) {}

std::shared_ptr<arangodb::Index> instantiate(arangodb::LogicalCollection& collection,
arangodb::velocypack::Slice const& definition,
IndexId id,
bool isClusterConstructor) const override {
if (auto isUnique = definition.get(StaticStrings::IndexUnique).isTrue(); isUnique) {
return std::make_shared<RocksDBUniqueZkdIndex>(id, collection, definition);
}

return std::make_shared<RocksDBZkdIndex>(id, collection, definition);
}

virtual arangodb::Result normalize( // normalize definition
arangodb::velocypack::Builder& normalized, // normalized definition (out-param)
arangodb::velocypack::Slice definition, // source definition
bool isCreation, // definition for index creation
TRI_vocbase_t const& vocbase // index vocbase
) const override {
TRI_ASSERT(normalized.isOpenObject());
normalized.add(arangodb::StaticStrings::IndexType,
arangodb::velocypack::Value(
arangodb::Index::oldtypeName(Index::TRI_IDX_TYPE_ZKD_INDEX)));

if (isCreation && !ServerState::instance()->isCoordinator() &&
!definition.hasKey("objectId")) {
normalized.add("objectId",
arangodb::velocypack::Value(std::to_string(TRI_NewTickServer())));
}

if (auto isSparse = definition.get(StaticStrings::IndexSparse).isTrue(); isSparse) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_BAD_PARAMETER,
"zkd index does not support sparse property");
}

return IndexFactory::enhanceJsonIndexGeneric(definition, normalized, isCreation);
}
};

struct TtlIndexFactory : public DefaultIndexFactory {
explicit TtlIndexFactory(arangodb::application_features::ApplicationServer& server,
arangodb::Index::IndexType type)
Expand Down Expand Up @@ -348,8 +390,7 @@ RocksDBIndexFactory::RocksDBIndexFactory(application_features::ApplicationServer
server);
static const TtlIndexFactory ttlIndexFactory(server, arangodb::Index::TRI_IDX_TYPE_TTL_INDEX);
static const PrimaryIndexFactory primaryIndexFactory(server);
static const SecondaryIndexFactory<arangodb::RocksDBZkdIndex, arangodb::Index::TRI_IDX_TYPE_ZKD_INDEX> zkdIndexFactory(
server);
static const ZkdIndexFactory zkdIndexFactory(server);

emplace("edge", edgeIndexFactory);
emplace("fulltext", fulltextIndexFactory);
Expand Down
1 change: 1 addition & 0 deletions arangod/RocksDBEngine/RocksDBKeyBounds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ rocksdb::ColumnFamilyHandle* RocksDBKeyBounds::columnFamily() const {
case RocksDBEntryType::LegacyGeoIndexValue:
case RocksDBEntryType::GeoIndexValue:
case RocksDBEntryType::ZkdIndexValue:
case RocksDBEntryType::UniqueZkdIndexValue:
return RocksDBColumnFamilyManager::get(RocksDBColumnFamilyManager::Family::GeoIndex);
case RocksDBEntryType::Database:
case RocksDBEntryType::Collection:
Expand Down
8 changes: 8 additions & 0 deletions arangod/RocksDBEngine/RocksDBTypes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ static rocksdb::Slice RevisionTreeValue(
static RocksDBEntryType zkdIndexValue = RocksDBEntryType::ZkdIndexValue;
static rocksdb::Slice ZdkIndexValue(
reinterpret_cast<std::underlying_type<RocksDBEntryType>::type*>(&zkdIndexValue), 1);

static RocksDBEntryType uniqueZkdIndexValue = RocksDBEntryType::UniqueZkdIndexValue;
static rocksdb::Slice UniqueZdkIndexValue(
reinterpret_cast<std::underlying_type<RocksDBEntryType>::type*>(&uniqueZkdIndexValue), 1);
} // namespace

char const* arangodb::rocksDBEntryTypeName(arangodb::RocksDBEntryType type) {
Expand Down Expand Up @@ -147,6 +151,8 @@ char const* arangodb::rocksDBEntryTypeName(arangodb::RocksDBEntryType type) {
return "RevisionTreeValue";
case arangodb::RocksDBEntryType::ZkdIndexValue:
return "ZkdIndexValue";
case arangodb::RocksDBEntryType::UniqueZkdIndexValue:
return "UniqueZkdIndexValue";
}
return "Invalid";
}
Expand Down Expand Up @@ -247,6 +253,8 @@ rocksdb::Slice const& arangodb::rocksDBSlice(RocksDBEntryType const& type) {
return RevisionTreeValue;
case RocksDBEntryType::ZkdIndexValue:
return ZdkIndexValue;
case RocksDBEntryType::UniqueZkdIndexValue:
return UniqueZdkIndexValue;
}

return Placeholder; // avoids warning - errorslice instead ?!
Expand Down
3 changes: 2 additions & 1 deletion arangod/RocksDBEngine/RocksDBTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ enum class RocksDBEntryType : char {
View = '>',
GeoIndexValue = '?',
RevisionTreeValue = '@',
ZkdIndexValue = 'Z'
ZkdIndexValue = 'z',
UniqueZkdIndexValue = 'Z'
};

char const* rocksDBEntryTypeName(RocksDBEntryType);
Expand Down
5 changes: 5 additions & 0 deletions arangod/RocksDBEngine/RocksDBValue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ RocksDBValue RocksDBValue::ZkdIndexValue() {
return RocksDBValue(RocksDBEntryType::ZkdIndexValue);
}

RocksDBValue RocksDBValue::UniqueZkdIndexValue(LocalDocumentId const& docId) {
return RocksDBValue(RocksDBEntryType::UniqueZkdIndexValue, docId, RevisionId::none());
}

RocksDBValue RocksDBValue::UniqueVPackIndexValue(LocalDocumentId const& docId) {
return RocksDBValue(RocksDBEntryType::UniqueVPackIndexValue, docId, RevisionId::none());
}
Expand Down Expand Up @@ -152,6 +156,7 @@ RocksDBValue::RocksDBValue(RocksDBEntryType type, LocalDocumentId const& docId,
: _type(type), _buffer() {
switch (_type) {
case RocksDBEntryType::UniqueVPackIndexValue:
case RocksDBEntryType::UniqueZkdIndexValue:
case RocksDBEntryType::PrimaryIndexValue: {
if (!revision) {
_buffer.reserve(sizeof(uint64_t));
Expand Down
1 change: 1 addition & 0 deletions arangod/RocksDBEngine/RocksDBValue.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class RocksDBValue {
static RocksDBValue EdgeIndexValue(arangodb::velocypack::StringRef const& vertexId);
static RocksDBValue VPackIndexValue();
static RocksDBValue ZkdIndexValue();
static RocksDBValue UniqueZkdIndexValue(LocalDocumentId const& docId);
static RocksDBValue UniqueVPackIndexValue(LocalDocumentId const& docId);
static RocksDBValue View(VPackSlice const& data);
static RocksDBValue ReplicationApplierConfig(VPackSlice const& data);
Expand Down
156 changes: 119 additions & 37 deletions arangod/RocksDBEngine/RocksDBZkdIndex.cpp
F438
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <Aql/Variable.h>
#include <Containers/Enumerate.h>
#include <Transaction/Helpers.h>
#include "RocksDBColumnFamilyManager.h"
#include "RocksDBMethods.h"
#include "RocksDBZkdIndex.h"
Expand All @@ -52,9 +53,11 @@ auto coordsToVector(zkd::byte_string_view bs, size_t dim) -> std::vector<double>
}*/

namespace arangodb {

template<bool isUnique = false>
class RocksDBZkdIndexIterator final : public IndexIterator {
public:
RocksDBZkdIndexIterator(LogicalCollection* collection, RocksDBZkdIndex* index,
RocksDBZkdIndexIterator(LogicalCollection* collection, RocksDBZkdIndexBase* index,
transaction::Methods* trx, zkd::byte_string min,
zkd::byte_string max, std::size_t dim)
: IndexIterator(collection, trx),
Expand Down Expand Up @@ -109,7 +112,13 @@ class RocksDBZkdIndexIterator final : public IndexIterator {
_iterState = IterState::SEEK_ITER_TO_CUR;
}
} else {
auto const documentId = RocksDBKey::indexDocumentId(rocksKey);
auto const documentId = std::invoke([&]{
if constexpr(isUnique) {
return RocksDBValue::documentId(_iter->value());
} else {
return RocksDBKey::indexDocumentId(rocksKey);
}
});
std::ignore = callback(documentId);
++i;
_iter->Next();
Expand Down Expand Up @@ -146,7 +155,7 @@ class RocksDBZkdIndexIterator final : public IndexIterator {
IterState _iterState = IterState::SEEK_ITER_TO_CUR;

std::unique_ptr<rocksdb::Iterator> _iter;
RocksDBZkdIndex* _index = nullptr;
RocksDBZkdIndexBase* _index = nullptr;
};

} // namespace arangodb
Expand Down Expand Up @@ -193,14 +202,51 @@ auto readDocumentKey(VPackSlice doc,
}
auto dv = value.getNumericValue<double>();
if (std::isnan(dv)) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "NaN is not allowed");
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUERY_INVALID_ARITHMETIC_VALUE, "NaN is not allowed");
}
v.emplace_back(convertDouble(dv));
}

return zkd::interleave(v);
}

auto boundsForIterator(arangodb::Index const* index, const arangodb::aql::AstNode* node,
const arangodb::aql::Variable* reference,
const arangodb::IndexIteratorOptions& opts)
-> std::pair<zkd::byte_string, zkd::byte_string> {
TRI_ASSERT(node->type == arangodb::aql::NODE_TYPE_OPERATOR_NARY_AND);

std::unordered_map<size_t, zkd::ExpressionBounds> extractedBounds;
std::unordered_set<aql::AstNode const*> unusedExpressions;
extractBoundsFromCondition(index, node, reference, extractedBounds, unusedExpressions);

TRI_ASSERT(unusedExpressions.empty());

const size_t dim = index->fields().size();
std::vector<zkd::byte_string> min;
min.resize(dim);
std::vector<zkd::byte_string> max;
max.resize(dim);

static const auto ByteStringPosInfinity = zkd::byte_string{std::byte{0x80}};
static const auto ByteStringNegInfinity = zkd::byte_string{std::byte{0}};

for (auto&& [idx, field] : enumerate(index->fields())) {
if (auto it = extractedBounds.find(idx); it != extractedBounds.end()) {
auto const& bounds = it->second;
min[idx] = nodeExtractDouble(bounds.lower.bound_value).value_or(ByteStringNegInfinity);
max[idx] = nodeExtractDouble(bounds.upper.bound_value).value_or(ByteStringPosInfinity);
} else {
min[idx] = ByteStringNegInfinity;
max[idx] = ByteStringPosInfinity;
}
}

TRI_ASSERT(min.size() == dim);
TRI_ASSERT(max.size() == dim);

return std::make_pair(zkd::interleave(min), zkd::interleave(max));
}
} // namespace


Expand Down Expand Up @@ -368,7 +414,7 @@ auto zkd::specializeCondition(arangodb::Index const* index, arangodb::aql::AstNo

}

arangodb::Result arangodb::RocksDBZkdIndex::insert(
arangodb::Result arangodb::RocksDBZkdIndexBase::insert(
arangodb::transaction::Methods& trx, arangodb::RocksDBMethods* methods,
const arangodb::LocalDocumentId& documentId,
arangodb::velocypack::Slice doc, const arangodb::OperationOptions& options) {
Expand All @@ -389,7 +435,7 @@ arangodb::Result arangodb::RocksDBZkdIndex::insert(
return Result();
}

arangodb::Result arangodb::RocksDBZkdIndex::remove(arangodb::transaction::Methods& trx,
arangodb::Result arangodb::RocksDBZkdIndexBase::remove(arangodb::transaction::Methods& trx,
arangodb::RocksDBMethods* methods,
const arangodb::LocalDocumentId& documentId,
arangodb::velocypack::Slice doc) {
Expand All @@ -410,7 +456,7 @@ arangodb::Result arangodb::RocksDBZkdIndex::remove(arangodb::transaction::Method
return Result();
}

arangodb::RocksDBZkdIndex::RocksDBZkdIndex(arangodb::IndexId iid,
arangodb::RocksDBZkdIndexBase::RocksDBZkdIndexBase(arangodb::IndexId iid,
arangodb::LogicalCollection& coll,
const arangodb::velocypack::Slice& info)
: RocksDBIndex(iid, coll, info,
Expand All @@ -420,62 +466,98 @@ arangodb::RocksDBZkdIndex::RocksDBZkdIndex(arangodb::IndexId iid,
RocksDBColumnFamilyManager::get(RocksDBColumnFamilyManager::Family::GeoIndex),
false) {}

void arangodb::RocksDBZkdIndex::toVelocyPack(
void arangodb::RocksDBZkdIndexBase::toVelocyPack(
arangodb::velocypack::Builder& builder,
std::underlying_type<arangodb::Index::Serialize>::type type) const {
VPackObjectBuilder ob(&builder);
RocksDBIndex::toVelocyPack(builder, type);
builder.add("dimension", VPackValue(_fields.size()));
}

arangodb::Index::FilterCosts arangodb::RocksDBZkdIndex::supportsFilterCondition(
arangodb::Index::FilterCosts arangodb::RocksDBZkdIndexBase::supportsFilterCondition(
const std::vector<std::shared_ptr<arangodb::Index>>& allIndexes,
const arangodb::aql::AstNode* node,
const arangodb::aql::Variable* reference, size_t itemsInIndex) const {

return zkd::supportsFilterCondition(this, allIndexes, node, reference, itemsInIndex);
}

arangodb::aql::AstNode* arangodb::RocksDBZkdIndex::specializeCondition(
arangodb::aql::AstNode* arangodb::RocksDBZkdIndexBase::specializeCondition(
arangodb::aql::AstNode* condition, const arangodb::aql::Variable* reference) const {
return zkd::specializeCondition(this, condition, reference);
}

std::unique_ptr<IndexIterator> arangodb::RocksDBZkdIndex::iteratorForCondition(
std::unique_ptr<IndexIterator> arangodb::RocksDBZkdIndexBase::iteratorForCondition(
arangodb::transaction::Methods* trx, const arangodb::aql::AstNode* node,
const arangodb::aql::Variable* reference, const arangodb::IndexIteratorOptions& opts) {

TRI_ASSERT(node->type == arangodb::aql::NODE_TYPE_OPERATOR_NARY_AND);
auto&& [min, max] = boundsForIterator(this, node, reference, opts);

std::unordered_map<size_t, zkd::ExpressionBounds> extractedBounds;
std::unordered_set<aql::AstNode const*> unusedExpressions;
extractBoundsFromCondition(this, node, reference, extractedBounds, unusedExpressions);
return std::make_unique<RocksDBZkdIndexIterator<false>>(&_collection, this, trx,
std::move(min), std::move(max),
fields().size());
}

TRI_ASSERT(unusedExpressions.empty());

const size_t dim = _fields.size();
std::vector<zkd::byte_string> min;
min.resize(dim);
std::vector<zkd::byte_string> max;
max.resize(dim);
std::unique_ptr<IndexIterator> arangodb::RocksDBUniqueZkdIndex::iteratorForCondition(
arangodb::transaction::Methods* trx, const arangodb::aql::AstNode* node,
const arangodb::aql::Variable* reference, const arangodb::IndexIteratorOptions& opts) {

static const auto ByteStringPosInfinity = zkd::byte_string {std::byte{0x80}};
static const auto ByteStringNegInfinity = zkd::byte_string {std::byte{0}};
auto&& [min, max] = boundsForIterator(this, node, reference, opts);

for (auto&& [idx, field] : enumerate(fields())) {
if (auto it = extractedBounds.find(idx); it != extractedBounds.end()) {
auto const& bounds = it->second;
min[idx] = nodeExtractDouble(bounds.lower.bound_value).value_or(ByteStringNegInfinity);
max[idx] = nodeExtractDouble(bounds.upper.bound_value).value_or(ByteStringPosInfinity);
} else {
min[idx] = ByteStringNegInfinity;
max[idx] = ByteStringPosInfinity;
return std::make_unique<RocksDBZkdIndexIterator<true>>(&_collection, this, trx,
std::move(min), std::move(max),
fields().size());
}


arangodb::Result arangodb::RocksDBUniqueZkdIndex::insert(
arangodb::transaction::Methods& trx, arangodb::RocksDBMethods* methods,
const arangodb::LocalDocumentId& documentId,
arangodb::velocypack::Slice doc, const arangodb::OperationOptions& options) {
TRI_ASSERT(_unique == true);
TRI_ASSERT(_sparse == false);

auto key_value = readDocumentKey(doc, _fields);

RocksDBKey rocks_key;
rocks_key.constructZkdIndexValue(objectId(), key_value);

if (!options.checkUniqueConstraintsInPreflight) {
transaction::StringLeaser leased(&trx);
rocksdb::PinnableSlice existing(leased.get());
if (auto s = methods->GetForUpdate(_cf, rocks_key.string(), &existing); s.ok()) { // detected conflicting index entry
return Result(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED);
} else if (!s.IsNotFound()) {
return Result(rocksutils::convertStatus(s));
}
}

TRI_ASSERT(min.size() == dim);
TRI_ASSERT(max.size() == dim);
return std::make_unique<RocksDBZkdIndexIterator>(&_collection, this, trx,
zkd::interleave(min),
zkd::interleave(max), dim);
auto value = RocksDBValue::UniqueZkdIndexValue(documentId);

if (auto s = methods->PutUntracked(_cf, rocks_key, value.string()); !s.ok()) {
return rocksutils::convertStatus(s);
}

return Result();
}

arangodb::Result arangodb::RocksDBUniqueZkdIndex::remove(arangodb::transaction::Methods& trx,
arangodb::RocksDBMethods* methods,
const arangodb::LocalDocumentId& documentId,
arangodb::velocypack::Slice doc) {
TRI_ASSERT(_unique == true);
TRI_ASSERT(_sparse == false);

auto key_value = readDocumentKey(doc, _fields);

RocksDBKey rocks_key;
rocks_key.constructZkdIndexValue(objectId(), key_value);

auto value = RocksDBValue::ZkdIndexValue();
auto s = methods->SingleDelete(_cf, rocks_key);
if (!s.ok()) {
return rocksutils::convertStatus(s);
}

return Result();
}
Loading
0