8000 fix cluster selectivity estimates by jsteemann · Pull Request #6488 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

fix cluster selectivity estimates #6488

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 13, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix cluster selectivity estimates
  • Loading branch information
jsteemann committed Sep 13, 2018
commit 4a17d4cf024c1aebeaab6071aef0ad51d45100b8
36 changes: 18 additions & 18 deletions arangod/Cluster/ClusterInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ void ClusterInfo::cleanup() {
if (theInstance == nullptr) {
return;
}

MUTEX_LOCKER(mutexLocker, theInstance->_planProt.mutex);

TRI_ASSERT(theInstance->_newPlannedViews.empty()); // only non-empty during loadPlan()
theInstance->_plannedViews.clear();
Expand Down Expand Up @@ -719,16 +721,6 @@ void ClusterInfo::loadPlan() {
std::string const collectionId =
collectionPairSlice.key.copyString();

decltype(vocbase->lookupCollection(collectionId)->clusterIndexEstimates()) selectivity;
double selectivityTTL = 0;
if (isCoordinator) {
auto collection = _plannedCollections[databaseName][collectionId];
if(collection){
selectivity = collection->clusterIndexEstimates(/*do not update*/ true);
selectivityTTL = collection->clusterIndexEstimatesTTL();
}
}

try {
std::shared_ptr<LogicalCollection> newCollection;

Expand Down Expand Up @@ -758,14 +750,22 @@ void ClusterInfo::loadPlan() {

auto& collectionName = newCollection->name();

if (isCoordinator && !selectivity.empty()){
LOG_TOPIC(TRACE, Logger::CLUSTER) << "copy index estimates";
newCollection->clusterIndexEstimates(std::move(selectivity));
newCollection->clusterIndexEstimatesTTL(selectivityTTL);
for (std::shared_ptr<Index>& idx : newCollection->getIndexes()) {
auto it = selectivity.find(std::to_string(idx->id()));
if (it != selectivity.end()) {
idx->updateClusterSelectivityEstimate(it->second);
if (isCoordinator) {
// copying over index estimates from the old version of the collection
// into the new one
LOG_TOPIC(TRACE, Logger::CLUSTER) << "copying index estimates";
// it is effectively safe to access _plannedCollections in read-only mode
// here, as the only places that modify _plannedCollections are the shutdown
// and this function itself, which is protected by a mutex
auto it = _plannedCollections.find(databaseName);
if (it != _plannedCollections.end()) {
auto it2 = (*it).second.find(collectionId);
if (it2 != (*it).second.end()) {
auto estimates = (*it2).second->clusterIndexEstimates(false);
if (!estimates.empty()) {
// already have an estimate... now copy it over
newCollection->clusterIndexEstimates(std::move(estimates));
}
}
}
}
Expand Down
18 changes: 4 additions & 14 deletions arangod/Transaction/Methods.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3366,20 +3366,10 @@ transaction::Methods::indexesForCollectionCoordinator(
std::string const& name) const {
auto clusterInfo = arangodb::ClusterInfo::instance();
auto collection = clusterInfo->getCollection(vocbase().name(), name);
std::vector<std::shared_ptr<Index>> indexes = collection->getIndexes();

// update estimates in logical collection
auto selectivity = collection->clusterIndexEstimates();

// push updated values into indexes
for(std::shared_ptr<Index>& idx : indexes) {
auto it = selectivity.find(std::to_string(idx->id()));
if (it != selectivity.end()) {
idx->updateClusterSelectivityEstimate(it->second);
}
}

return indexes;

// update selectivity estimates if they were expired
collection->clusterIndexEstimates(true);
return collection->getIndexes();
}

/// @brief get the index by it's identifier. Will either throw or
Expand Down
144 changes: 106 additions & 38 deletions arangod/VocBase/LogicalCollection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ LogicalCollection::LogicalCollection(
_physical(
EngineSelectorFeature::ENGINE->createPhysicalCollection(*this, info)
),
_clusterEstimateTTL(0),
_sharding() {
_sharding(),
_clusterSelectivityEstimates(*this) {
TRI_ASSERT(info.isObject());

if (!TRI_vocbase_t::IsAllowedName(info)) {
Expand Down Expand Up @@ -419,47 +419,20 @@ bool LogicalCollection::isSmart() const { return _isSmart; }
std::unique_ptr<FollowerInfo> const& LogicalCollection::followers() const {
return _followers;
}

// SECTION: Indexes
std::unordered_map<std::string, double> LogicalCollection::clusterIndexEstimates(bool doNotUpdate) {
READ_LOCKER(readlock, _clusterEstimatesLock);
if (doNotUpdate) {
return _clusterEstimates;
}

double ctime = TRI_microtime(); // in seconds
auto needEstimateUpdate = [this, ctime]() {
if (_clusterEstimates.empty()) {
LOG_TOPIC(TRACE, Logger::CLUSTER) << "update because estimate is not available";
return true;
} else if (ctime - _clusterEstimateTTL > 60.0) {
LOG_TOPIC(TRACE, Logger::CLUSTER) << "update because estimate is too old: " << ctime - _clusterEstimateTTL;
return true;
}
return false;
};

if (needEstimateUpdate()) {
readlock.unlock();
WRITE_LOCKER(writelock, _clusterEstimatesLock);

if (needEstimateUpdate()) {
selectivityEstimatesOnCoordinator(vocbase().name(), name(), _clusterEstimates);
_clusterEstimateTTL = TRI_microtime();
}
return _clusterEstimates;
}

return _clusterEstimates;

std::unordered_map<std::string, double> LogicalCollection::clusterIndexEstimates(bool allowUpdate) {
return _clusterSelectivityEstimates.get(allowUpdate);
}

void LogicalCollection::clusterIndexEstimates(std::unordered_map<std::string, double>&& estimates) {
WRITE_LOCKER(lock, _clusterEstimatesLock);
_clusterEstimates = std::move(estimates);
_clusterSelectivityEstimates.set(std::move(estimates));
}

void LogicalCollection::flushClusterIndexEstimates() {
_clusterSelectivityEstimates.flush();
}

std::vector<std::shared_ptr<arangodb::Index>>
LogicalCollection::getIndexes() const {
std::vector<std::shared_ptr<arangodb::Index>> LogicalCollection::getIndexes() const {
return getPhysical()->getIndexes();
}

Expand Down Expand Up @@ -1100,3 +1073,98 @@ Result LogicalCollection::compareChecksums(VPackSlice checksumSlice, std::string

return Result();
}

LogicalCollection::ClusterSelectivityEstimates::ClusterSelectivityEstimates(LogicalCollection& collection)
: _collection(collection),
_expireStamp(0.0) {}

void LogicalCollection::ClusterSelectivityEstimates::flush() {
WRITE_LOCKER(lock, _lock);
_estimates.clear();
_expireStamp = 0.0;
}

std::unordered_map<std::string, double> LogicalCollection::ClusterSelectivityEstimates::get(bool allowUpdate) const {
double now;

{
READ_LOCKER(readLock, _lock);
if (!allowUpdate) {
// return whatever is there. may be empty as well
return _estimates;
}

now = TRI_microtime();
if (!_estimates.empty() && _expireStamp > now) {
// already have an estimate, and it is not yet expired
return _estimates;
}
}

// have no estimate yet, or it is already expired
// we have given up the read lock here
// because we now need to modify the estimates

int tries = 0;
while (true) {
decltype(_estimates) estimates;

WRITE_LOCKER(writeLock, _lock);

if (!_estimates.empty() && _expireStamp > now) {
// some other thread has updated the estimates for us... just use them
return _estimates;
}

int res = selectivityEstimatesOnCoordinator(_collection.vocbase().name(), _collection.name(), estimates);

if (res == TRI_ERROR_NO_ERROR) {
_estimates = estimates;
// let selectivity estimates expire less seldom for system collections
_expireStamp = now + defaultTtl * (_collection.name()[0] == '_' ? 10.0 : 1.0);

// give up the lock, and then update the selectivity values for each index
writeLock.unlock();

// push new selectivity values into indexes' cache
auto indexes = _collection.getIndexes();

for (std::shared_ptr<Index>& idx : indexes) {
auto it = estimates.find(std::to_string(idx->id()));

if (it != estimates.end()) {
idx->updateClusterSelectivityEstimate(it->second);
}
}

return estimates;
}

if (++tries == 3) {
return _estimates;
}
}
}

void LogicalCollection::ClusterSelectivityEstimates::set(std::unordered_map<std::string, double>&& estimates) {
double const now = TRI_microtime();

// push new selectivity values into indexes' cache
auto indexes = _collection.getIndexes();

for (std::shared_ptr<Index>& idx : indexes) {
auto it = estimates.find(std::to_string(idx->id()));

if (it != estimates.end()) {
idx->updateClusterSelectivityEstimate(it->second);
}
}

// finally update the cache
{
WRITE_LOCKER(writelock, _lock);
_estimates = std::move(estimates);
// let selectivity estimates expire less seldom for system collections
_expireStamp = now + defaultTtl * (_collection.name()[0] == '_' ? 10.0 : 1.0);
}
}
48 changes: 29 additions & 19 deletions arangod/VocBase/LogicalCollection.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,11 @@ class LogicalCollection: public LogicalDataSource {
uint64_t planVersion = 0
);
LogicalCollection(LogicalCollection const&) = delete;
LogicalCollection& operator=(LogicalCollection const&) = delete;
virtual ~LogicalCollection();

enum CollectionVersions { VERSION_30 = 5, VERSION_31 = 6, VERSION_33 = 7 };

LogicalCollection& operator=(LogicalCollection const&) = delete;

//////////////////////////////////////////////////////////////////////////////
/// @brief the category representing a logical collection
//////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -188,20 +187,32 @@ class LogicalCollection: public LogicalDataSource {
transaction::Methods* trx,
std::function<bool(LocalDocumentId const&)> callback);

// Estimates
std::unordered_map<std::string, double> clusterIndexEstimates(bool doNotUpdate=false);
//// SECTION: Indexes
class ClusterSelectivityEstimates {
public:
explicit ClusterSelectivityEstimates(LogicalCollection& collection);
void flush();
std::unordered_map<std::string, double> get(bool allowUpdate) const;
void set(std::unordered_map<std::string, double>&& estimates);
private:
LogicalCollection& _collection;
mutable basics::ReadWriteLock _lock;
mutable std::unordered_map<std::string, double> _estimates;
mutable double _expireStamp;

static constexpr double defaultTtl = 60.0;
};

/// @brief fetches current index selectivity estimates
/// if allowUpdate is true, will potentially make a cluster-internal roundtrip to
/// fetch current values!
std::unordered_map<std::string, double> clusterIndexEstimates(bool allowUpdate);

/// @brief sets the current index selectivity estimates
void clusterIndexEstimates(std::unordered_map<std::string, double>&& estimates);

double clusterIndexEstimatesTTL() const {
return _clusterEstimateTTL;
}

void clusterIndexEstimatesTTL(double ttl) {
_clusterEstimateTTL = ttl;
}
// End - Estimates

//// SECTION: Indexes
/// @brief flushes the current index selectivity estimates
void flushClusterIndexEstimates();

std::vector<std::shared_ptr<Index>> getIndexes() const;

Expand Down Expand Up @@ -401,17 +412,16 @@ class LogicalCollection: public LogicalDataSource {

mutable arangodb::Mutex _infoLock; // lock protecting the info

std::unordered_map<std::string, double> _clusterEstimates;
double _clusterEstimateTTL; //only valid if above vector is not empty
basics::ReadWriteLock _clusterEstimatesLock;

// the following contains in the cluster/DBserver case the information
// which other servers are in sync with this shard. It is unset in all
// other cases.
std::unique_ptr<FollowerInfo> _followers;

/// @brief sharding information
std::unique_ptr<ShardingInfo> _sharding;

/// @brief index selectivity estimates (only relevant on a coordinator)
ClusterSelectivityEstimates _clusterSelectivityEstimates;
};

} // namespace arangodb
Expand Down
4 changes: 2 additions & 2 deletions arangod/VocBase/Methods/Indexes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ Result Indexes::ensureIndex(LogicalCollection* collection,
}

// flush estimates
collection->clusterIndexEstimatesTTL(0.0);
collection->flushClusterIndexEstimates();

// the cluster won't set a proper id value
std::string iid = tmp.slice().get("id").copyString();
Expand Down Expand Up @@ -599,7 +599,7 @@ arangodb::Result Indexes::drop(LogicalCollection* collection,
}

// flush estimates
collection->clusterIndexEstimatesTTL(0.0);
collection->flushClusterIndexEstimates();

#ifdef USE_ENTERPRISE
return Indexes::dropCoordinatorEE(collection, iid);
Expand Down
0