8000 Bug fix 3.5/internal issue #651 by gnusi · Pull Request #10388 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Bug fix 3.5/internal issue #651 #10388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Nov 12, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
backport fix for cluster startup from 3.5.2 branch
  • Loading branch information
gnusi committed Nov 9, 2019
commit 042ca408a2a08e756903bc073c3891839338e491
227 changes: 81 additions & 146 deletions arangod/IResearch/IResearchAnalyzerFeature.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ static const irs::text_format::type_id VPACK("vpack");

const type_id& vpack_t() { return VPACK; }

}
}
} // iresearch
} // text_format

namespace {

Expand Down Expand Up @@ -556,34 +556,16 @@ bool equalAnalyzer(
/// @note cannot use arangodb::CollectionNameResolver::getCollection(...) since
/// it will try to resolve via vocbase for the case of db-server
////////////////////////////////////////////////////////////////////////////////
std::shared_ptr<arangodb::LogicalCollection> getAnalyzerCollection( // get collection
TRI_vocbase_t const& vocbase // collection vocbase
) {
std::shared_ptr<arangodb::LogicalCollection> getAnalyzerCollection(
TRI_vocbase_t const& vocbase) {
if (arangodb::ServerState::instance()->isSingleServer()) {
return vocbase.lookupCollection(arangodb::StaticStrings::AnalyzersCollection);
}

try {
auto* ci = arangodb::ClusterInfo::instance();
auto* ci = arangodb::ClusterInfo::instance();

if (ci) {
return ci->getCollectionNT(vocbase.name(), arangodb::StaticStrings::AnalyzersCollection);
}

LOG_TOPIC("00001", WARN, arangodb::iresearch::TOPIC)
<< "failure to find 'ClusterInfo' instance while looking up Analyzer collection '" << arangodb::StaticStrings::AnalyzersCollection << "' in vocbase '" << vocbase.name() << "'";
} catch (arangodb::basics::Exception& e) {
LOG_TOPIC("00002", WARN, arangodb::iresearch::TOPIC)
<< "caught exception while looking up Analyzer collection '" << arangodb::StaticStrings::AnalyzersCollection << "' in vocbase '" << vocbase.name() << "': " << e.code() << " " << e.what();
IR_LOG_EXCEPTION();
} catch (std::exception& e) {
LOG_TOPIC("00003", WARN, arangodb::iresearch::TOPIC)
<< "caught exception while looking up Analyzer collection '" << arangodb::StaticStrings::AnalyzersCollection << "' in vocbase '" << vocbase.name() << "': " << e.what();
IR_LOG_EXCEPTION();
} catch (...) {
LOG_TOPIC("00004", WARN, arangodb::iresearch::TOPIC)
<< "caught exception while looking up Analyzer collection '" << arangodb::StaticStrings::AnalyzersCollection << "' in vocbase '" << vocbase.name() << "'";
IR_LOG_EXCEPTION();
if (ci) {
return ci->getCollectionNT(vocbase.name(), arangodb::StaticStrings::AnalyzersCollection);
}

return nullptr;
Expand All @@ -593,79 +575,78 @@ std::shared_ptr<arangodb::LogicalCollection> getAnalyzerCollection( // get colle
/// @brief read analyzers from vocbase
/// @return visitation completed fully
////////////////////////////////////////////////////////////////////////////////
arangodb::Result visitAnalyzers( // visit analyzers
TRI_vocbase_t& vocbase, // vocbase to visit
std::function<arangodb::Result(arangodb::velocypack::Slice const& slice)> const& visitor // visitor
) {
arangodb::Result visitAnalyzers(
TRI_vocbase_t& vocbase,
std::function<arangodb::Result(VPackSlice const&)> const& visitor) {
static const auto resultVisitor = [](
std::function<arangodb::Result(arangodb::velocypack::Slice const& slice)> const& visitor, // visitor
TRI_vocbase_t const& vocbase, // vocbase
arangodb::velocypack::Slice const& slice // slice to visit
)->arangodb::Result {
std::function<arangodb::Result(VPackSlice const&)> const& visitor,
TRI_vocbase_t const& vocbase,
VPackSlice const& slice) -> arangodb::Result {
if (!slice.isArray()) {
return arangodb::Result( // result
TRI_ERROR_INTERNAL, // code
std::string("failed to parse contents of collection '") + arangodb::StaticStrings::AnalyzersCollection + "' in database '" + vocbase.name() + " while visiting analyzers"
);
return {
TRI_ERROR_INTERNAL,
"failed to parse contents of collection '" + arangodb::StaticStrings::AnalyzersCollection +
"' in database '" + vocbase.name() + " while visiting analyzers" };
}

for (arangodb::velocypack::ArrayIterator itr(slice); itr.valid(); ++itr) {
auto res = visitor(itr.value().resolveExternal());
for (VPackArrayIterator itr(slice); itr.valid(); ++itr) {
auto const res = visitor(itr.value().resolveExternal());

if (!res.ok()) {
return res;
}
}

return arangodb::Result();
return {};
};

// FIXME TODO find a better way to query a cluster collection
// workaround for aql::Query failing to execute on a cluster collection
static const auto queryString = arangodb::aql::QueryString(
"FOR d IN " + arangodb::StaticStrings::AnalyzersCollection + " RETURN d");

if (arangodb::ServerState::instance()->isDBServer()) {
auto cc = arangodb::ClusterComm::instance();

if (!cc) {
return arangodb::Result( // result
TRI_ERROR_INTERNAL, // code
std::string("failure to find 'ClusterComm' instance while visiting Analyzer collection '") + arangodb::StaticStrings::AnalyzersCollection + "' in vocbase '" + vocbase.name() + "'"
);
return {
TRI_ERROR_INTERNAL,
"failure to find 'ClusterComm' instance while visiting Analyzer collection '" +
arangodb::StaticStrings::AnalyzersCollection +
"' in vocbase '" + vocbase.name() + "'"
};
}

auto collection = getAnalyzerCollection(vocbase);
auto ci = arangodb::ClusterInfo::instance();

if (!collection) {
return arangodb::Result(); // nothing to load
if (!ci) {
return {
TRI_ERROR_INTERNAL,
"failure to find 'ClusterInfo' instance while visiting Analyzer collection '" +
arangodb::StaticStrings::AnalyzersCollection +
"' in vocbase '" + vocbase.name() + "'"
};
}

static const std::string body("{}"); // RestSimpleQueryHandler::allDocuments() expects opbject (calls get() on slice)
std::vector<arangodb::ClusterCommRequest> requests;
static const std::string BODY("{ \"query\" : \"" + queryString.string() + "\" }");
auto const coords = ci->getCurrentCoordinators();

// create a request for every shard
//for (auto& entry: collection->errorNum()) {
for (auto& entry: *(collection->shardIds())) {
auto& shardId = entry.first;
auto url = // url
"/_db/" + arangodb::basics::StringUtils::urlEncode(vocbase.name())
+ arangodb::RestVocbaseBaseHandler::SIMPLE_QUERY_ALL_PATH
+ "?collection=" + shardId;
std::vector<arangodb::ClusterCommRequest> requests(1);
auto& request = requests[0];
request.path = "/_api/cursor";
request.body = std::shared_ptr<std::string const>(std::shared_ptr<std::string const>(), &BODY);
request.requestType = arangodb::rest::RequestType::POST;

requests.emplace_back( // add shard request
"shard:" + shardId, // shard
arangodb::rest::RequestType::PUT, // request type as per SimpleQueryHandker
url, // request url
std::shared_ptr<std::string const>(&body, [](std::string const*)->void {}) // body
);
}
arangodb::Result res;
for (auto const& coord : coords) {
res = {};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there is no need to reinit res on each iteration. It will be overwritten by timeout error or by resultVisitor without current res value check.

request.destination = "server:" + coord;

// same timeout as in ClusterMethods::getDocumentOnCoordinator()
cc->performRequests( // execute requests
requests, 120.0, arangodb::iresearch::TOPIC, false, false // args
);
// same timeout as in ClusterMethods::getDocumentOnCoordinator()
static double const CL_DEFAULT_TIMEOUT = 120.0;
cc->performRequests(requests, CL_DEFAULT_TIMEOUT, arangodb::iresearch::TOPIC, false, false);

for (auto& request: requests) {
if (TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND == request.result.errorCode) {
continue; // treat missing collection as if there are no analyzers
if (TRI_ERROR_CLUSTER_TIMEOUT == request.result.errorCode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check for TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND here same way as in non DBServer branch below ?

res = { request.result.errorCode, request.result.errorMessage };
continue; // try another coordinator
}

if (TRI_ERROR_NO_ERROR != request.result.errorCode) {
Expand All @@ -686,67 +667,29 @@ arangodb::Result visitAnalyzers( // visit analyzers
arangodb::StaticStrings::AnalyzersCollection + "' in vocbase '" + vocbase.name() + "'" };
}

auto res = resultVisitor(visitor, vocbase, slice.get("result"));
res = resultVisitor(visitor, vocbase, slice.get("result"));

if (!res.ok()) {
return res;
}
break;
}

return arangodb::Result();
}

if (arangodb::ServerState::instance()->isClusterRole()) {
if (!getAnalyzerCollection(vocbase)) {
return arangodb::Result(); // treat missing collection as if there are no analyzers
}

static const auto queryString = arangodb::aql::QueryString( // query to execute
std::string("FOR d IN ") + arangodb::StaticStrings::AnalyzersCollection + " RETURN d" // query
);
arangodb::aql::Query query( // query
false, vocbase, queryString, nullptr, nullptr, arangodb::aql::PART_MAIN // args
);
auto* queryRegistry = arangodb::QueryRegistryFeature::registry();
auto result = query.executeSync(queryRegistry);

if (TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND == result.result.errorNumber()) {
return arangodb::Result(); // treat missing collection as if there are no analyzers
}

if (result.result.fail()) {
return result.result;
}

auto slice = result.data->slice();

return resultVisitor(visitor, vocbase, slice);
return res;
}

if (!vocbase.lookupCollection(arangodb::StaticStrings::AnalyzersCollection)) {
return arangodb::Result(); // treat missing collection as if there are no analyzers
}
arangodb::aql::Query query(false, vocbase, queryString,
nullptr, nullptr, arangodb::aql::PART_MAIN);

arangodb::OperationOptions options;
arangodb::SingleCollectionTransaction trx( // transaction
arangodb::transaction::StandaloneContext::Create(vocbase), // context
arangodb::StaticStrings::AnalyzersCollection, // collection
arangodb::AccessMode::Type::READ // access more
);
auto res = trx.begin();
auto* queryRegistry = arangodb::QueryRegistryFeature::registry();
auto result = query.executeSync(queryRegistry);

if (!res.ok()) {
return res;
if (TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND == result.result.errorNumber()) {
return {}; // treat missing collection as if there are no analyzers
}

auto commit = irs::make_finally([&trx]()->void { trx.commit(); }); // end read-only transaction
auto result = trx.all(arangodb::StaticStrings::AnalyzersCollection, 0, 0, options);

if (!result.result.ok()) {
if (result.result.fail()) {
return result.result;
}

auto slice = arangodb::velocypack::Slice(result.buffer->data());
auto slice = result.data->slice();

return resultVisitor(visitor, vocbase, slice);
}
Expand All @@ -760,7 +703,7 @@ inline std::string normalizedAnalyzerName(
typedef irs::async_utils::read_write_mutex::read_mutex ReadMutex;
typedef irs::async_utils::read_write_mutex::write_mutex WriteMutex;

} // namespace
} // namespace

namespace arangodb {
namespace iresearch {
Expand Down Expand Up @@ -1514,15 +1457,8 @@ Result IResearchAnalyzerFeature::loadAnalyzers(
Result res;
std::unordered_set<irs::hashed_string_ref> seen;
auto visitor = [this, &res, &seen](TRI_vocbase_t& vocbase)->void {
if (!vocbase.lookupCollection(arangodb::StaticStrings::AnalyzersCollection)) {
// skip databases lacking 'arangodb::StaticStrings::AnalyzersCollection'
// (no analyzers there, not an error)
return;
}

auto name = irs::make_hashed_ref( // vocbase name
irs::string_ref(vocbase.name()), std::hash<irs::string_ref>() // args
);
auto name = irs::make_hashed_ref(irs::string_ref(vocbase.name()),
std::hash<irs::string_ref>());
auto result = loadAnalyzers(name);
auto itr = _lastLoad.find(name);

Expand All @@ -1546,9 +1482,8 @@ Result IResearchAnalyzerFeature::loadAnalyzers(

// remove unseen databases from timestamp list
for (auto itr = _lastLoad.begin(), end = _lastLoad.end(); itr != end;) {
auto name = irs::make_hashed_ref( // vocbase name
irs::string_ref(itr->first), std::hash<irs::string_ref>() // args
);
auto name = irs::make_hashed_ref(irs::string_ref(itr->first),
std::hash<irs::string_ref>());
auto seenItr = seen.find(name);

if (seenItr == seen.end()) {
Expand All @@ -1562,8 +1497,8 @@ Result IResearchAnalyzerFeature::loadAnalyzers(
// remove no longer valid analyzers (force remove)
for (auto itr = _analyzers.begin(), end = _analyzers.end(); itr != end;) {
auto split = splitAnalyzerName(itr->first);
auto unseenItr = // ignore static analyzers
split.first.null() ? unseen.end() : unseen.find(split.first);
// ignore static analyzers
auto unseenItr = split.first.null() ? unseen.end() : unseen.find(split.first);

if (unseenItr != unseen.end()) {
itr = _analyzers.erase(itr);
Expand Down Expand Up @@ -1786,7 +1721,8 @@ Result IResearchAnalyzerFeature::loadAnalyzers(
result.first->second->features())) {
return {
TRI_ERROR_BAD_PARAMETER,
"name collision detected while re-registering a duplicate arangosearch analizer name '" + std::string(result.first->second->name()) +
"name collision detected while re-registering a duplicate arangosearch analizer name '" +
std::string(result.first->second->name()) +
"' type '" + std::string(result.first->second->type()) +
"' properties '" + result.first->second->properties().toString() +
"', previous registration type '" + std::string(entry.second->type()) +
Expand All @@ -1808,10 +1744,14 @@ Result IResearchAnalyzerFeature::loadAnalyzers(
}

if (itr->second // valid new entry
&& !equalAnalyzer(*(entry.second), itr->second->type(), itr->second->properties(), itr->second->features())) {
&& !equalAnalyzer(*(entry.second),
itr->second->type(),
itr->second->properties(),
itr->second->features())) {
return {
TRI_ERROR_BAD_PARAMETER,
"name collision detected while registering a duplicate arangosearch analizer name '" + std::string(itr->second->name()) +
"name collision detected while registering a duplicate arangosearch analizer name '" +
std::string(itr->second->name()) +
"' type '" + std::string(itr->second->type()) +
"' properties '" + itr->second->properties().toString() +
"', previous registration type '" + std::string(entry.second->type()) +
Expand Down Expand Up @@ -2121,12 +2061,6 @@ void IResearchAnalyzerFeature::start() {
"IResearch functions";
}
}

auto res = loadAnalyzers();

if (!res.ok()) {
THROW_ARANGO_EXCEPTION(res);
}
}

void IResearchAnalyzerFeature::stop() {
Expand Down Expand Up @@ -2179,6 +2113,7 @@ Result IResearchAnalyzerFeature::storeAnalyzer(AnalyzerPool& pool) {

try {
auto collection = getAnalyzerCollection(*vocbase);

if (!collection) {
return {
TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND,
Expand Down
4 changes: 2 additions & 2 deletions arangod/VocBase/vocbase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1019,8 +1019,8 @@ std::shared_ptr<arangodb::LogicalCollection> TRI_vocbase_t::lookupCollectionByUu
}

/// @brief looks up a data-source by identifier
std::shared_ptr<arangodb::LogicalDataSource> TRI_vocbase_t::lookupDataSource(TRI_voc_cid_t id) const
noexcept {
std::shared_ptr<arangodb::LogicalDataSource> TRI_vocbase_t::lookupDataSource(
TRI_voc_cid_t id) const noexcept {
RECURSIVE_READ_LOCKER(_dataSourceLock, _dataSourceLockWriteOwner);
auto itr = _dataSourceById.find(id);

Expand Down
0