8000 Introduce PregelShard as its own struct · arangodb/arangodb@cd2c776 · GitHub
[go: up one dir, main page]

Skip to content 8000

Commit cd2c776

Browse files
author
Markus Pfeiffer
committed
Introduce PregelShard as its own struct
1 parent 9a5975b commit cd2c776

File tree

10 files changed

+140
-15
lines changed

10 files changed

+140
-15
lines changed

arangod/Pregel/Algos/DMID/VertexSumAggregator.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ struct VertexSumAggregator : public IAggregator {
5757

5858
void parseAggregate(VPackSlice const& slice) override {
5959
for (auto const& pair : VPackObjectIterator(slice)) {
60-
PregelShard shard = std::stoi(pair.key.copyString());
60+
auto shardId = std::stoi(pair.key.copyString());
61+
auto shard = PregelShard(static_cast<PregelShard::value_type>(shardId));
6162
std::string key;
6263
VPackValueLength i = 0;
6364
for (VPackSlice const& val : VPackArrayIterator(pair.value)) {
@@ -75,7 +76,8 @@ struct VertexSumAggregator : public IAggregator {
7576

7677
void setAggregatedValue(VPackSlice const& slice) override {
7778
for (auto const& pair : VPackObjectIterator(slice)) {
78-
PregelShard shard = std::stoi(pair.key.copyString());
79+
auto shardId = std::stoi(pair.key.copyString());
80+
auto shard = PregelShard(static_cast<PregelShard::value_type>(shardId));
7981
std::string key;
8082
VPackValueLength i = 0;
8183
for (VPackSlice const& val : VPackArrayIterator(pair.value)) {
@@ -92,7 +94,7 @@ struct VertexSumAggregator : public IAggregator {
9294
void serialize(std::string const& key, VPackBuilder& builder) const override {
9395
builder.add(key, VPackValue(VPackValueType::Object));
9496
for (auto const& pair1 : _entries) {
95-
builder.add(std::to_string(pair1.first),
97+
builder.add(std::to_string(pair1.first.value),
9698
VPackValue(VPackValueType::Array));
9799
for (auto const& pair2 : pair1.second) {
98100
builder.add(VPackValuePair(pair2.first.data(), pair2.first.size(),

arangod/Pregel/Algos/EffectiveCloseness/HLLCounter.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ inline uint8_t _get_leading_zero_count(uint32_t x, uint8_t b) {
5757
static uint32_t hashPregelId(VertexID const& pregelId) {
5858
uint32_t h1 =
5959
fasthash32(pregelId.key.data(), pregelId.key.length(), 0xf007ba11UL);
60-
uint64_t h2 = fasthash64_uint64(pregelId.shard, 0xdefec7edUL);
60+
uint64_t h2 = fasthash64_uint64(pregelId.shard.value, 0xdefec7edUL);
6161
uint32_t h3 = (uint32_t)(h2 - (h2 >> 32));
6262
return h1 ^ (h3 << 1);
6363
}

arangod/Pregel/GraphStore/PregelShard.h

Lines changed: 43 additions & 2 deletions
< 628C tr class="diff-line-row">
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,48 @@
2424

2525
#pragma once
2626

27+
#include <compare>
28+
29+
#include <velocypack/Value.h>
30+
2731
namespace arangodb::pregel {
28-
typedef uint16_t PregelShard;
29-
const PregelShard InvalidPregelShard = -1;
32+
struct PregelShard {
33+
using value_type = uint16_t;
34+
35+
constexpr PregelShard() : value(InvalidPregelShardMarker) {}
36+
PregelShard(value_type value) : value(value) {}
37+
38+
static constexpr value_type InvalidPregelShardMarker =
39+
std::numeric_limits<value_type>::max();
40+
41+
auto isValid() -> bool { return value != InvalidPregelShardMarker; }
42+
43+
auto operator<=>(PregelShard const& other) const = default;
44+
45+
// TODO: This is for backwards compatibility and easy transition; as soon as
46+
// all uses of VPackValue conversions on PregelShard are removed, this
47+
// operator can go, too
48+
explicit operator arangodb::velocypack::Value() const {
49+
return VPackValue(value);
50+
}
51+
52+
value_type value{};
53+
};
54+
constexpr auto InvalidPregelShard = PregelShard();
55+
56+
template<typename Inspector>
57+
auto inspect(Inspector& f, PregelShard& x) {
58+
return f.object(x).fields(f.field("shardID", x.value));
59+
}
60+
3061
} // namespace arangodb::pregel
62+
63+
namespace std {
64+
template<>
65+
struct hash<arangodb::pregel::PregelShard> {
66+
std::size_t operator()(
67+
const arangodb::pregel::PregelShard& k) const noexcept {
68+
return std::hash<arangodb::pregel::PregelShard::value_type>()(k.value);
69+
}
70+
};
71+
} // namespace std

arangod/Pregel/GraphStore/VertexID.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,8 @@
2727
namespace arangodb::pregel {
2828

2929
struct VertexID {
30-
std::string key;
31-
PregelShard shard;
32-
3330
VertexID() : shard(InvalidPregelShard) {}
34-
VertexID(PregelShard s, std::string k) : key(std::move(k)), shard(s) {}
31+
VertexID(PregelShard s, std::string k) : shard(s), key(std::move(k)) {}
3532

3633
bool operator==(const VertexID& rhs) const {
3734
return shard == rhs.shard && key == rhs.key;
@@ -48,6 +45,9 @@ struct VertexID {
4845
[[nodiscard]] bool isValid() const {
4946
return shard != InvalidPregelShard && !key.empty();
5047
}
48+
49+
PregelShard shard;
50+
std::string key;
5151
};
5252

5353
} // namespace arangodb::pregel
@@ -63,7 +63,7 @@ struct hash<arangodb::pregel::VertexID> {
6363
// second and third and combine them using XOR
6464
// and bit shifting:
6565
size_t h1 = std::hash<std::string>()(k.key);
66-
size_t h2 = std::hash<size_t>()(k.shard);
66+
size_t h2 = std::hash<arangodb::pregel::PregelShard>()(k.shard);
6767
return h2 ^ (h1 << 1);
6868
}
6969
};

arangod/Pregel/OutgoingCache.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ void ArrayOutCache<M>::flushMessages() {
145145
buffer.append(serialized.get().slice().begin(),
146146
serialized.get().slice().byteSize());
147147
responses.emplace_back(network::sendRequest(
148-
pool, "shard:" + this->_config->globalShardIDs()[shard],
148+
pool, "shard:" + this->_config->globalShardID(shard),
149149
fuerte::RestVerb::Post, this->_baseUrl + Utils::messagesPath,
150150
std::move(buffer), reqOpts));
151151

@@ -261,7 +261,7 @@ void CombiningOutCache<M>::flushMessages() {
261261
buffer.append(serialized.get().slice().begin(),
262262
serialized.get().slice().byteSize());
263263
responses.emplace_back(network::sendRequest(
264-
pool, "shard:" + this->_config->globalShardIDs()[shard],
264+
pool, "shard:" + this->_config->globalShardID(shard),
265265
fuerte::RestVerb::Post, this->_baseUrl + Utils::messagesPath,
266266
std::move(buffer), reqOpts));
267267

arangod/Pregel/Worker/GraphStore.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,7 @@ void GraphStore<V, E>::storeVertices(
709709
commitTransaction();
710710

711711
currentShard = it->shard();
712-
shard = globalShards[currentShard];
712+
shard = globalShards[currentShard.value];
713713

714714
auto ctx =
715715
transaction::StandaloneContext::Create(_vocbaseGuard.database());

arangod/Pregel/Worker/Worker.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,7 @@ auto Worker<V, E, M>::aqlResult(bool withId) const -> PregelResults {
567567
Vertex<V, E> const* vertexEntry = *it;
568568

569569
TRI_ASSERT(vertexEntry->shard() < _config.globalShardIDs().size());
570-
ShardID const& shardId = _config.globalShardIDs()[vertexEntry->shard()];
570+
ShardID const& shardId = _config.globalShardID(vertexEntry->shard());
571571

572572
results.openObject(/*unindexed*/ true);
573573

arangod/Pregel/Worker/WorkerConfig.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ class WorkerConfig {
100100
inline std::vector<ShardID> const& globalShardIDs() const {
101101
return _globalShardIDs;
102102
}
103+
[[nodiscard]] ShardID globalShardID(PregelShard shard) const {
104+
return _globalShardIDs.at(shard.value);
105+
}
103106

104107
// convenvience access without guaranteed order, same values as in
105108
// vertexCollectionShards

tests/Pregel/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# we intent to remove the TypedBuffer construct soon
44
add_library(arango_tests_pregel OBJECT
55
DurationTest.cpp
6+
PregelShardTest.cpp
67
StatusTest.cpp)
78

89
target_include_directories(arango_tests_pregel

tests/Pregel/PregelShardTest.cpp

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
////////////////////////////////////////////////////////////////////////////////
2+
/// DISCLAIMER
3+
///
4+
/// Copyright 2021-2021 ArangoDB GmbH, Cologne, Germany
5+
///
6+
/// Licensed under the Apache License, Version 2.0 (the "License");
7+
/// you may not use this file except in compliance with the License.
8+
/// You may obtain a copy of the License at
9+
///
10+
/// http://www.apache.org/licenses/LICENSE-2.0
11+
///
12+
/// Unless required by applicable law or agreed to in writing, software
13+
/// distributed under the License is distributed on an "AS IS" BASIS,
14+
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
/// See the License for the specific language governing permissions and
16+
/// limitations under the License.
17+
///
18+
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
///
20+
/// @author Markus Pfeiffer
21+
////////////////////////////////////////////////////////////////////////////////
22+
23+
#include <gtest/gtest.h>
24+
#include <fmt/core.h>
25+
26+
#include <Inspection/VPackWithErrorT.h>
27+
#include <Pregel/GraphStore/PregelShard.h< 1CF5 span class="pl-pds">>
28+
29+
#include <velocypack/Builder.h>
30+
31+
#include <functional>
32+
33+
using namespace arangodb::pregel;
34+
using namespace arangodb::inspection;
35+
using namespace arangodb::velocypack;
36+
37+
TEST(PregelShard, construction) {
38+
auto s = PregelShard();
39+
ASSERT_EQ(s, InvalidPregelShard);
40+
ASSERT_FALSE(s.isValid());
41+
42+
auto t = PregelShard(5);
43+
ASSERT_NE(t, InvalidPregelShard);
44+
ASSERT_TRUE(t.isValid());
45+
};
46+
47+
TEST(PregelShard, inspection_save) {
48+
const auto expected =
49+
fmt::format("{{\"shardID\":{}}}", PregelShard::InvalidPregelShardMarker);
50+
auto s = PregelShard();
51+
52+
auto res = serializeWithErrorT(s);
53+
ASSERT_TRUE(res.ok());
54+
ASSERT_EQ(res.get().toJson(), expected);
55+
}
56+
57+
TEST(PregelShard, inspection_load) {
58+
auto input = VPackBuilder();
59+
60+
input.openObject();
61+
input.add(VPackValue("shardID"));
62+
input.add(VPackValue(5));
63+
input.close();
64+
65+
auto res = deserializeWithErrorT<PregelShard>(input.sharedSlice());
66+
ASSERT_TRUE(res.ok());
67+
ASSERT_EQ(res.get().value, 5);
68+
}
69+
70+
TEST(PregelShard, hashable) {
71+
auto s = PregelShard();
72+
auto h1 = std::hash<PregelShard>()(s);
73+
(void)h1;
74+
75+
auto t = PregelShard(5);
76+
auto h2 = std::hash<PregelShard>()(t);
77+
(void)h2;
78+
}

0 commit comments

Comments
 (0)
0