10000 Feature/single server smart graph (#14821) · open-bigdata/arangodb@2e7fe93 · GitHub
[go: up one dir, main page]

Skip to content

Commit 2e7fe93

Browse files
authored
Feature/single server smart graph (arangodb#14821)
1 parent 02a72fb commit 2e7fe93

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+2539
-930
lines changed

CHANGELOG

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,23 @@ devel
2222
also contains the current followers. This means that from now on also active
2323
follower servers can be nominated as the leading server for that specific
2424
shard.
25-
25+
2626
* Fix issues during rolling upgrades from 3.8.0 to 3.8.x (x >= 1) and from
2727
3.7.x (x <= 12) to 3.8.3. The problem was that older versions did not handle
2828
following term ids that are sent from newer versions during synchronous
2929
replication operations.
3030

31+
* Added Enterprise Sharded Graphs Simulation: Now it is possible to test
32+
SmartGraphs and SatelliteGraphs on a single server instance and then to port
33+
them to a cluster with multiple servers. All existing types of SmartGraphs
34+
are eligible to this procedure: SmartGraphs themselves, Disjoint SmartGraphs,
35+
Hybrid SmartGraphs and Hybrid Disjoint SmartGraphs. One can create a graph of
36+
any of those types in the usual way, e.g., using `arangosh`, but on a single
37+
server, then dump it, start a cluster (with multiple servers) and restore the
38+
graph in the cluster. The graph and the collections will keep all properties
39+
that are kept when the graph is already created in a cluster. This feature is
40+
only available in the Enterprise Edition.
41+
3142
* Close a potential gap during shard synchronization when moving from the
3243
initial sync step to the WAL tailing step. In this small gap the leader
3344
could purge some of the WAL files that would be required by the following

arangod/Aql/Functions.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@
8686
#include "utils/math_utils.hpp"
8787
#include "utils/ngram_match_utils.hpp"
8888

89+
#ifdef USE_ENTERPRISE
90+
#include "Enterprise/VocBase/SmartVertexCollection.h"
91+
#endif
92+
8993
#include <boost/uuid/uuid.hpp>
9094
#include <boost/uuid/uuid_generators.hpp>
9195
#include <boost/uuid/uuid_io.hpp>
@@ -8793,6 +8797,21 @@ AqlValue Functions::MakeDistributeInputWithKeyCreation(
87938797
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_MUST_NOT_SPECIFY_KEY);
87948798
}
87958799

8800+
#ifdef USE_ENTERPRISE
8801+
// TODO: Remove me as soon SmartVertex Schema Validation is in place (!)
8802+
if (logicalCollection->isSmart() && logicalCollection->type() == TRI_COL_TYPE_DOCUMENT) {
8803+
transaction::BuilderLeaser sBuilder(&trx);
8804+
// smart vertex collection
8805+
auto svecol =
8806+
dynamic_cast<arangodb::SmartVertexCollection*>(logicalCollection.get());
8807+
auto sveRes = svecol->rewriteVertexOnInsert(input, *sBuilder.get(), false);
8808+
if (sveRes.fail()) {
8809+
THROW_ARANGO_EXCEPTION(sveRes.errorNumber());
8810+
}
8811+
return AqlValue{sBuilder->slice()};
8812+
}
8813+
#endif
8814+
87968815
if (buildNewObject) {
87978816
transaction::BuilderLeaser builder(&trx);
87988817
buildKeyObject(*builder.get(),

arangod/Cluster/ClusterHelpers.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ class ClusterHelpers {
3939
static bool compareServerLists(arangodb::velocypack::Slice plan,
4040
arangodb::velocypack::Slice current);
4141

42+
// @brief Returns true if both vectors are not empty, the first elements are equal
43+
// and the vectors are eqal as multisets (the same number of the same elements)
4244
// values are passed by value intentionally, as they will be sorted inside the
4345
// function
4446
static bool compareServerLists(std::vector<std::string>, std::vector<std::string>);

arangod/Cluster/ClusterInfo.cpp

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272

7373
#ifdef USE_ENTERPRISE
7474
#include "Enterprise/VocBase/SmartVertexCollection.h"
75-
#include "Enterprise/VocBase/VirtualCollection.h"
75+
#include "Enterprise/VocBase/VirtualClusterSmartEdgeCollection.h"
7676
#endif
7777

7878
#include <velocypack/Builder.h>
@@ -775,7 +775,7 @@ ClusterInfo::CollectionWithHash ClusterInfo::buildCollection(
775775
auto type = data.get(StaticStrings::DataSourceType);
776776

777777
if (type.isInteger() && type.getUInt() == TRI_COL_TYPE_EDGE) {
778-
return std::make_shared<VirtualSmartEdgeCollection>(vocbase, data);
778+
return std::make_shared<VirtualClusterSmartEdgeCollection>(vocbase, data);
779779
}
780780
return std::make_shared<SmartVertexCollection>(vocbase, data);
781781
}
@@ -2682,11 +2682,9 @@ Result ClusterInfo::createCollectionCoordinator( // create collection
26822682
}
26832683

26842684
/// @brief this method does an atomic check of the preconditions for the
2685-
/// collections to be created, using the currently loaded plan. it populates the
2686-
/// plan version used for the checks
2685+
/// collections to be created, using the currently loaded plan.
26872686
Result ClusterInfo::checkCollectionPreconditions(std::string const& databaseName,
2688-
std::vector<ClusterCollectionCreationInfo> const& infos,
2689-
uint64_t& planVersion) {
2687+
std::vector<ClusterCollectionCreationInfo> const& infos) {
26902688
for (auto const& info : infos) {
26912689
// Check if name exists.
26922690
if (info.name.empty() || !info.json.isObject() || !info.json.get("shards").isObject()) {
@@ -2738,7 +2736,7 @@ Result ClusterInfo::checkCollectionPreconditions(std::string const& databaseName
27382736
Result ClusterInfo::createCollectionsCoordinator(
27392737
std::string const& databaseName, std::vector<ClusterCollectionCreationInfo>& infos,
27402738
double endTime, bool isNewDatabase,
2741-
std::shared_ptr<LogicalCollection> const& colToDistributeShardsLike) {
2739+
std::shared_ptr<const LogicalCollection> const& colToDistributeShardsLike) {
27422740
TRI_ASSERT(ServerState::instance()->isCoordinator());
27432741
using arangodb::velocypack::Slice;
27442742

@@ -2747,8 +2745,6 @@ Result ClusterInfo::createCollectionsCoordinator(
27472745
<< " collections in database " << databaseName << " isNewDatabase: " << isNewDatabase
27482746
<< " first collection name: " << infos[0].name;
27492747

2750-
double const interval = getPollInterval();
2751-
27522748
// The following three are used for synchronization between the callback
27532749
// closure and the main thread executing this function. Note that it can
27542750
// happen that the callback is called only after we return from this
@@ -2815,7 +2811,11 @@ Result ClusterInfo::createCollectionsCoordinator(
28152811
shardServers.try_emplace(shardID, serverIds);
28162812
}
28172813

2818-
// The AgencyCallback will copy the closure will take responsibilty of it.
2814+
// Counts the elements of result in nrDone and checks that they match shardServers.
2815+
// Also checks that result matches info.
2816+
// Errors are stored in the database via dbServerResult, in errMsg and in info.state.
2817+
//
2818+
// The AgencyCallback will copy the closure and take responsibility of it.
28192819
auto closure = [cacheMutex, cacheMutexOwner, &info, dbServerResult, errMsg,
28202820
nrDone, isCleaned, shardServers, this](VPackSlice const& result) {
28212821
// NOTE: This ordering here is important to cover against a race in cleanup.
@@ -2838,6 +2838,7 @@ Result ClusterInfo::createCollectionsCoordinator(
28382838
std::string tmpError = "";
28392839

28402840
for (auto const& p : VPackObjectIterator(result)) {
2841+
// if p contains an error number, add it to tmpError as a string
28412842
if (arangodb::basics::VelocyPackHelper::getBooleanValue(p.value,
28422843
StaticStrings::Error, false)) {
28432844
F41A tmpError += " shardID:" + p.key.copyString() + ":";
@@ -2857,6 +2858,7 @@ Result ClusterInfo::createCollectionsCoordinator(
28572858
// wait that all followers have created our new collection
28582859
if (tmpError.empty() && info.waitForReplication) {
28592860
std::vector<ServerID> plannedServers;
2861+
// copy all servers which are in p from shardServers to plannedServers
28602862
{
28612863
READ_LOCKER(readLocker, _planProt.lock);
28622864
auto it = shardServers.find(p.key.copyString());
@@ -2926,10 +2928,11 @@ Result ClusterInfo::createCollectionsCoordinator(
29262928
}
29272929
}
29282930
return true;
2929-
};
2931+
}; // closure
2932+
29302933
// ATTENTION: The following callback calls the above closure in a
29312934
// different thread. Nevertheless, the closure accesses some of our
2932-
// local variables. Therefore we have to protect all accesses to them
2935+
// local variables. Therefore, we have to protect all accesses to them
29332936
// by a mutex. We use the mutex of the condition variable in the
29342937
// AgencyCallback for this.
29352938

@@ -2988,7 +2991,7 @@ Result ClusterInfo::createCollectionsCoordinator(
29882991
AgencyPrecondition::Type::EMPTY, true));
29892992
}
29902993

2991-
// We need to make sure our plan is up to date.
2994+
// We need to make sure our plan is up-to-date.
29922995
LOG_TOPIC("f4b14", DEBUG, Logger::CLUSTER)
29932996
<< "createCollectionCoordinator, loading Plan from agency...";
29942997

@@ -2997,7 +3000,7 @@ Result ClusterInfo::createCollectionsCoordinator(
29973000
READ_LOCKER(readLocker, _planProt.lock);
29983001
planVersion = _planVersion;
29993002
if (!isNewDatabase) {
3000-
Result res = checkCollectionPreconditions(databaseName, infos, planVersion);
3003+
Result res = checkCollectionPreconditions(databaseName, infos);
30013004
if (res.fail()) {
30023005
LOG_TOPIC("98762", DEBUG, Logger::CLUSTER)
30033006
<< "Failed createCollectionsCoordinator for " << infos.size()
@@ -3287,7 +3290,7 @@ Result ClusterInfo::createCollectionsCoordinator(
32873290
{
32883291
// This one has not responded, wait for it.
32893292
CONDITION_LOCKER(locker, agencyCallbacks[i]->_cv);
3290-
gotTimeout = agencyCallbacks[i]->executeByCallbackOrTimeout(interval);
3293+
gotTimeout = agencyCallbacks[i]->executeByCallbackOrTimeout(getPollInterval());
32913294
}
32923295
if (gotTimeout) {
32933296
++i;

arangod/Cluster/ClusterInfo.h

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ class ClusterInfo final {
424424
//////////////////////////////////////////////////////////////////////////////
425425

426426
void cleanup();
427-
427+
428428
/// @brief cancel all pending wait-for-syncer operations
429429
void drainSyncers();
430430

@@ -601,18 +601,18 @@ class ClusterInfo final {
601601
QueryAnalyzerRevisions getQueryAnalyzersRevision(
602602
DatabaseID const& databaseID);
603603

604-
/// @brief return shard statistics for the specified database,
604+
/// @brief return shard statistics for the specified database,
605605
/// optionally restricted to anything on the specified server
606606
Result getShardStatisticsForDatabase(DatabaseID const& dbName,
607607
std::string const& restrictServer,
608608
arangodb::velocypack::Builder& builder) const;
609-
609+
610610
/// @brief return shard statistics for all databases, totals,
611611
/// optionally restricted to anything on the specified server
612612
Result getShardStatisticsGlobal(std::string const& restrictServer,
613613
arangodb::velocypack::Builder& builder) const;
614-
615-
/// @brief return shard statistics, separate for each database,
614+
615+
/// @brief return shard statistics, separate for each database,
616616
/// optionally restricted to anything on the specified server
617617
Result getShardStatisticsGlobalDetailed(std::string const& restrictServer,
618618
arangodb::velocypack::Builder& builder) const;
@@ -663,7 +663,7 @@ class ClusterInfo final {
663663
std::string const& name, // database name
664664
double timeout // request timeout
665665
);
666-
666+
667667
//////////////////////////////////////////////////////////////////////////////
668668
/// @brief create collection in coordinator
669669
//////////////////////////////////////////////////////////////////////////////
@@ -677,21 +677,19 @@ class ClusterInfo final {
677677
std::shared_ptr<LogicalCollection> const& colToDistributeShardsLike);
678678

679679
/// @brief this method does an atomic check of the preconditions for the
680-
/// collections to be created, using the currently loaded plan. it populates
681-
/// the plan version used for the checks
680+
/// collections to be created, using the currently loaded plan.
682681
Result checkCollectionPreconditions(std::string const& databaseName,
683-
std::vector<ClusterCollectionCreationInfo> const& infos,
684-
uint64_t& planVersion);
682+
std::vector<ClusterCollectionCreationInfo> const& infos);
685683

686684
/// @brief create multiple collections in coordinator
687685
/// If any one of these collections fails, all creations will be
688686
/// rolled back.
689687
/// Note that in contrast to most other methods here, this method does not
690688
/// get a timeout parameter, but an endTime parameter!!!
691689
Result createCollectionsCoordinator(std::string const& databaseName,
692-
std::vector<ClusterCollectionCreationInfo>&,
690+
std::vector<ClusterCollectionCreationInfo>& ,
693691
double endTime, bool isNewDatabase,
694-
std::shared_ptr<LogicalCollection> const& colToDistributeShardsLike);
692+
std::shared_ptr<const LogicalCollection> const& colToDistributeShardsLike);
695693

696694
/// @brief drop collection in coordinator
697695
//////////////////////////////////////////////////////////////////////////////
@@ -877,7 +875,7 @@ class ClusterInfo final {
877875
//////////////////////////////////////////////////////////////////////////////
878876

879877
ServerID getCoordinatorByShortID(ServerShortID const& shortId);
880-
878+
881879
//////////////////////////////////////////////////////////////////////////////
882880
/// @brief invalidate current coordinators
883881
//////////////////////////////////////////////////////////////////////////////
@@ -904,12 +902,12 @@ class ClusterInfo final {
904902
void setServers(std::unordered_map<ServerID, std::string> servers);
905903

906904
void setServerAliases(std::unordered_map<ServerID, std::string> aliases);
907-
905+
908906
void setServerAdvertisedEndpoints(std::unordered_map<ServerID, std::string> advertisedEndpoints);
909907
#endif
910-
908+
911909
bool serverExists(ServerID const& serverId) const noexcept;
912-
910+
913911
bool serverAliasExists(std::string const& alias) const noexcept;
914912

915913
std::unordered_map<ServerID, std::string> getServers();
@@ -976,9 +974,9 @@ class ClusterInfo final {
976974
private:
977975
/// @brief worker function for dropIndexCoordinator
978976
Result dropIndexCoordinatorInner(
979-
std::string const& databaseName,
977+
std::string const& databaseName,
980978
std::string const& collectionID,
981-
IndexId iid,
979+
IndexId iid,
982980
double endTime);
983981

984982
/// @brief helper function to build a new LogicalCollection object from the velocypack
@@ -1086,7 +1084,7 @@ class ClusterInfo final {
10861084

10871085
cluster::RebootTracker _rebootTracker;
10881086

1089-
/// @brief error code sent to all remaining promises of the syncers at shutdown.
1087+
/// @brief error code sent to all remaining promises of the syncers at shutdown.
10901088
/// normally this is TRI_ERROR_SHUTTING_DOWN, but it can be overridden during testing
10911089
ErrorCode const _syncerShutdownCode;
10921090

arangod/Cluster/ClusterMethods.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ std::string const vertexUrl = "/_internal/traverser/vertex/";
110110
// with a shard leader and we always have to assume that some follower has
111111
// stopped writes for some time to get in sync:
112112
static double const CL_DEFAULT_LONG_TIMEOUT = 900.0;
113+
static double const CL_PERSIST_COLLECTION_TIMEOUT = 240.0;
113114

114115
namespace {
115116
template <typename T>
@@ -2564,7 +2565,7 @@ Result compactOnAllDBServers(ClusterFeature& feature,
25642565
}
25652566

25662567
#ifndef USE_ENTERPRISE
2567-
std::vector<std::shared_ptr<LogicalCollection>> ClusterMethods::createCollectionOnCoordinator(
2568+
std::vector<std::shared_ptr<LogicalCollection>> ClusterMethods::createCollectionsOnCoordinator(
25682569
TRI_vocbase_t& vocbase, velocypack::Slice parameters, bool ignoreDistributeShardsLikeErrors,
25692570
bool waitForSyncReplication, bool enforceReplicationFactor,
25702571
bool isNewDatabase, std::shared_ptr<LogicalCollection> const& colToDistributeShardsLike) {
@@ -2603,7 +2604,7 @@ std::vector<std::shared_ptr<LogicalCollection>> ClusterMethods::persistCollectio
26032604
"Trying to create an empty list of collections on coordinator.");
26042605
}
26052606

2606-
double const realTimeout = ClusterInfo::getTimeout(240.0);
2607+
double const realTimeout = ClusterInfo::getTimeout(CL_PERSIST_COLLECTION_TIMEOUT);
26072608
double const endTime = TRI_microtime() + realTimeout;
26082609

26092610
// We have at least one, take this collection's DB name

arangod/Cluster/ClusterMethods.h

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -321,13 +321,21 @@ class ClusterMethods {
321321
ClusterMethods() = delete;
322322
~ClusterMethods() = delete;
323323

324-
// @brief Create many new collections on coordinator from a Array of VPack
325-
// parameter Note that this returns a vector of newly allocated objects
326-
static std::vector<std::shared_ptr<LogicalCollection>> createCollectionOnCoordinator(
327-
TRI_vocbase_t& vocbase, arangodb::velocypack::Slice parameters,
324+
/// @brief Create many new collections on coordinator from a Array of VPack
325+
/// parameter Note that this returns a vector of newly allocated objects
326+
/// @param vocbase the actual database
341A
327+
/// @param parametersOfCollections array of parameters of collections to be created
328+
/// @param ignoreDistributeShardsLikeErrors
329+
/// @param waitForSyncReplication
330+
/// @param enforceReplicationFactor
331+
/// @param isNewDatabase
332+
/// @param colToDistributeShardsLike
333+
334+
static std::vector<std::shared_ptr<LogicalCollection>> createCollectionsOnCoordinator(
335+
TRI_vocbase_t& vocbase, arangodb::velocypack::Slice parametersOfCollections,
328336
bool ignoreDistributeShardsLikeErrors, bool waitForSyncReplication,
329337
bool enforceReplicationFactor, bool isNewDatabase,
330-
std::shared_ptr<LogicalCollection> const& colPtr);
338+
std::shared_ptr<LogicalCollection> const& colToDistributeShardsLike);
331339

332340
////////////////////////////////////////////////////////////////////////////////
333341
/// @brief Enterprise Relevant code to filter out hidden collections

arangod/Graph/Graph.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,7 @@ Graph::Graph(TRI_vocbase_t& vocbase, std::string&& graphName,
179179
_numberOfShards =
180180
Helper::getNumericValue<uint64_t>(options, StaticStrings::NumberOfShards, 1);
181181
if (Helper::getStringRef(options.get(StaticStrings::ReplicationFactor),
182-
velocypack::StringRef("")) == StaticStrings::Satellite &&
183-
arangodb::ServerState::instance()->isRunningInCluster()) {
182+
velocypack::StringRef("")) == StaticStrings::Satellite) {
184183
_isSatellite = true;
185184
setReplicationFactor(0);
186185
} else {
@@ -362,8 +361,12 @@ void Graph::toPersistence(VPackBuilder& builder) const {
362361
// The name
363362
builder.add(StaticStrings::KeyString, VPackValue(_graphName));
364363

365-
// Cluster Information
366-
if (arangodb::ServerState::instance()->isRunningInCluster()) {
364+
// Cluster related information:
365+
// Will be also persisted in SingleServer operation mode as we'll support creating
366+
// SmartGraphs and SatelliteGraphs in SingleServer mode as well. This information
367+
// will be necessary in case we dump & restore from SingleServer to a clustered
368+
// environment.
369+
if (arangodb::ServerState::instance()->isRunningInCluster() || isSmart() || isSatellite()) {
367370
builder.add(StaticStrings::NumberOfShards, VPackValue(_numberOfShards));
368371
if (isSatellite()) {
369372
builder.add(StaticStrings::ReplicationFactor, VPackValue(StaticStrings::Satellite));

0 commit comments

Comments
 (0)
0