8000 [PREGEL] Split Graph.h, make PregelShard its own struct by markuspf · Pull Request #17963 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

[PREGEL] Split Graph.h, make PregelShard its own struct #17963

8000 New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Feb 6, 2023
Prev Previous commit
Next Next commit
Introduce PregelShard as its own struct
  • Loading branch information
Markus Pfeiffer committed Feb 6, 2023
commit 4e93ad7d45faeb763fac4cdbf1b1fc3dcc86e10f
8 changes: 5 additions & 3 deletions arangod/Pregel/Algos/DMID/VertexSumAggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ struct VertexSumAggregator : public IAggregator {

void parseAggregate(VPackSlice const& slice) override {
for (auto const& pair : VPackObjectIterator(slice)) {
PregelShard shard = std::stoi(pair.key.copyString());
auto shardId = std::stoi(pair.key.copyString());
auto shard = PregelShard(static_cast<PregelShard::value_type>(shardId));
std::string key;
VPackValueLength i = 0;
for (VPackSlice const& val : VPackArrayIterator(pair.value)) {
Expand All @@ -75,7 +76,8 @@ struct VertexSumAggregator : public IAggregator {

void setAggregatedValue(VPackSlice const& slice) override {
for (auto const& pair : VPackObjectIterator(slice)) {
PregelShard shard = std::stoi(pair.key.copyString());
auto shardId = std::stoi(pair.key.copyString());
auto shard = PregelShard(static_cast<PregelShard::value_type>(shardId));
std::string key;
VPackValueLength i = 0;
for (VPackSlice const& val : VPackArrayIterator(pair.value)) {
Expand All @@ -92,7 +94,7 @@ struct VertexSumAggregator : public IAggregator {
void serialize(std::string const& key, VPackBuilder& builder) const override {
builder.add(key, VPackValue(VPackValueType::Object));
for (auto const& pair1 : _entries) {
builder.add(std::to_string(pair1.first),
builder.add(std::to_string(pair1.first.value),
VPackValue(VPackValueType::Array));
for (auto const& pair2 : pair1.second) {
builder.add(VPackValuePair(pair2.first.data(), pair2.first.size(),
Expand Down
2 changes: 1 addition & 1 deletion arangod/Pregel/Algos/EffectiveCloseness/HLLCounter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ inline uint8_t _get_leading_zero_count(uint32_t x, uint8_t b) {
static uint32_t hashPregelId(VertexID const& pregelId) {
uint32_t h1 =
fasthash32(pregelId.key.data(), pregelId.key.length(), 0xf007ba11UL);
uint64_t h2 = fasthash64_uint64(pregelId.shard, 0xdefec7edUL);
uint64_t h2 = fasthash64_uint64(pregelId.shard.value, 0xdefec7edUL);
uint32_t h3 = (uint32_t)(h2 - (h2 >> 32));
return h1 ^ (h3 << 1);
}
Expand Down
45 changes: 43 additions & 2 deletions arangod/Pregel/GraphStore/PregelShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,48 @@

#pragma once

#include <compare>

#include <velocypack/Value.h>

namespace arangodb::pregel {
typedef uint16_t PregelShard;
const PregelShard InvalidPregelShard = -1;
struct PregelShard {
using value_type = uint16_t;

constexpr PregelShard() : value(InvalidPregelShardMarker) {}
PregelShard(value_type value) : value(value) {}

static constexpr value_type InvalidPregelShardMarker =
std::numeric_limits<value_type>::max();

auto isValid() -> bool { return value != InvalidPregelShardMarker; }

auto operator<=>(PregelShard const& other) const = default;

// TODO: This is for backwards compatibility and easy transition; as soon as
// all uses of VPackValue conversions on PregelShard are removed, this
// operator can go, too
explicit operator arangodb::velocypack::Value() const {
return VPackValue(value);
}

value_t 8000 ype value{};
};
constexpr auto InvalidPregelShard = PregelShard();

template<typename Inspector>
auto inspect(Inspector& f, PregelShard& x) {
return f.object(x).fields(f.field("shardID", x.value));
}

} // namespace arangodb::pregel

namespace std {
template<>
struct hash<arangodb::pregel::PregelShard> {
std::size_t operator()(
const arangodb::pregel::PregelShard& k) const noexcept {
return std::hash<arangodb::pregel::PregelShard::value_type>()(k.value);
}
};
} // namespace std
10 changes: 5 additions & 5 deletions arangod/Pregel/GraphStore/VertexID.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,8 @@
namespace arangodb::pregel {

struct VertexID {
std::string key;
PregelShard shard;

VertexID() : shard(InvalidPregelShard) {}
VertexID(PregelShard s, std::string k) : key(std::move(k)), shard(s) {}
VertexID(PregelShard s, std::string k) : shard(s), key(std::move(k)) {}

bool operator==(const VertexID& rhs) const {
return shard == rhs.shard && key == rhs.key;
Expand All @@ -48,6 +45,9 @@ struct VertexID {
[[nodiscard]] bool isValid() const {
return shard != InvalidPregelShard && !key.empty();
}

PregelShard shard;
std::string key;
};

} // namespace arangodb::pregel
Expand All @@ -63,7 +63,7 @@ struct hash<arangodb::pregel::VertexID> {
// second and third and combine them using XOR
// and bit shifting:
size_t h1 = std::hash<std::string>()(k.key);
size_t h2 = std::hash<size_t>()(k.shard);
size_t h2 = std::hash<arangodb::pregel::PregelShard>()(k.shard);
return h2 ^ (h1 << 1);
}
};
Expand Down
4 changes: 2 additions & 2 deletions arangod/Pregel/OutgoingCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ void ArrayOutCache<M>::flushMessages() {
buffer.append(serialized.get().slice().begin(),
serialized.get().slice().byteSize());
responses.emplace_back(network::sendRequest(
pool, "shard:" + this->_config->globalShardIDs()[shard],
pool, "shard:" + this->_config->globalShardID(shard),
fuerte::RestVerb::Post, this->_baseUrl + Utils::messagesPath,
std::move(buffer), reqOpts));

Expand Down Expand Up @@ -261,7 +261,7 @@ void CombiningOutCache<M>::flushMessages() {
buffer.append(serialized.get().slice().begin(),
serialized.get().slice().byteSize());
responses.emplace_back(network::sendRequest(
pool, "shard:" + this->_config->globalShardIDs()[shard],
pool, "shard:" + this->_config->globalShardID(shard),
fuerte::RestVerb::Post, this->_baseUrl + Utils::messagesPath,
std::move(buffer), reqOpts));

Expand Down
2 changes: 1 addition & 1 deletion arangod/Pregel/Worker/GraphStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ void GraphStore<V, E>::storeVertices(
commitTransaction();

currentShard = it->shard();
shard = globalShards[currentShard];
shard = globalShards[currentShard.value];

auto ctx =
transaction::StandaloneContext::Create(_vocbaseGuard.database());
Expand Down
2 changes: 1 addition & 1 deletion arangod/Pregel/Worker/Worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ auto Worker<V, E, M>::aqlResult(bool withId) const -> PregelResults {
Vertex<V, E> const* vertexEntry = *it;

TRI_ASSERT(vertexEntry->shard() < _config.globalShardIDs().size());
ShardID const& shardId = _config.globalShardIDs()[vertexEntry->shard()];
ShardID const& shardId = _config.globalShardID(vertexEntry->shard());

results.openObject(/*unindexed*/ true);

Expand Down
3 changes: 3 additions & 0 deletions arangod/Pregel/Worker/WorkerConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ class WorkerConfig {
inline std::vector<ShardID> const& globalShardIDs() const {
return _globalShardIDs;
}
[[nodiscard]] ShardID globalShardID(PregelShard shard) const {
return _globalShardIDs.at(shard.value);
}

// convenvience access without guaranteed order, same values as in
// vertexCollectionShards
Expand Down
1 change: 1 addition & 0 deletions tests/Pregel/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# we intent to remove the TypedBuffer construct soon
add_library(arango_tests_pregel OBJECT
DurationTest.cpp
PregelShardTest.cpp
StatusTest.cpp)

target_include_directories(arango_tests_pregel
Expand Down
78 changes: 78 additions & 0 deletions tests/Pregel/PregelShardTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2021-2021 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Markus Pfeiffer
////////////////////////////////////////////////////////////////////////////////

#include <gtest/gtest.h>
#include <fmt/core.h>

#include <Inspection/VPackWithErrorT.h>
#include <Pregel/GraphStore/PregelShard.h>

#include <velocypack/Builder.h>

#include <functional>

using namespace arangodb::pregel;
using namespace arangodb::inspection;
using namespace arangodb::velocypack;

TEST(PregelShard, construction) {
auto s = PregelShard();
ASSERT_EQ(s, InvalidPregelShard);
ASSERT_FALSE(s.isValid());

auto t = PregelShard(5);
ASSERT_NE(t, InvalidPregelShard);
ASSERT_TRUE(t.isValid());
};

TEST(PregelShard, inspection_save) {
const auto expected =
fmt::format("{{\"shardID\":{}}}", PregelShard::InvalidPregelShardMarker);
auto s = PregelShard();

auto res = serializeWithErrorT(s);
ASSERT_TRUE(res.ok());
ASSERT_EQ(res.get().toJson(), expected);
}

TEST(PregelShard, inspection_load) {
auto input = VPackBuilder();

input.openObject();
input.add(VPackValue("shardID"));
input.add(VPackValue(5));
input.close();

auto res = deserializeWithErrorT<PregelShard>(input.sharedSlice());
ASSERT_TRUE(res.ok());
ASSERT_EQ(res.get().value, 5);
}

TEST(PregelShard, hashable) {
auto s = PregelShard();
auto h1 = std::hash<PregelShard>()(s);
(void)h1;

auto t = PregelShard(5);
auto h2 = std::hash<PregelShard>()(t);
(void)h2;
}
0