8000 [GORDO-1606] Read specific status out of pregel collection (by pid) (… · xuyanshi/arangodb@fcfb50c · GitHub
[go: up one dir, main page]

Skip to content

Commit fcfb50c

Browse files
authored
[GORDO-1606] Read specific status out of pregel collection (by pid) (arangodb#18575)
* supplied sleep attribute should then also be used ... * additionalOutput mimics now conductors toVelocypackmethod * handlePregelHistoryResult now has an additional parameter to only return first result out of an array, replace old status get API by new historic pregel read api * replace readResult logic by a query. add bindParameter support to executeQuery method. * also do not forget v8-pregel 8-) * make tests pass again * add user support to the collectionstatuswriter, implement user in readResult * add user to all read related methods * actually use the stats to read the stats ...,,,,,,,,,,,,,,,,,,................
1 parent 9d24729 commit fcfb50c

File tree

10 files changed

+197
-101
lines changed

10 files changed

+197
-101
lines changed

arangod/Pregel/Conductor/Conductor.cpp

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -754,13 +754,21 @@ void Conductor::persistPregelState(ExecutionState state) {
754754
stateBuilder.add("algorithm", VPackValue(_algorithm->name()));
755755
}
756756
stateBuilder.add("created", VPackValue(timepointToString(_created)));
757-
stateBuilder.add("state", VPackValue(pregel::ExecutionStateNames[_state]));
758-
stateBuilder.add("graphLoaded", VPackValue(_graphLoaded));
759-
stateBuilder.add("gss", VPackValue(_globalSuperstep));
760-
stateBuilder.add("ttl", VPackValue(_specifications.ttl.duration.count()));
761757
if (_expires != std::chrono::system_clock::time_point{}) {
762758
stateBuilder.add("expires", VPackValue(timepointToString(_expires)));
763759
}
760+
stateBuilder.add("ttl", VPackValue(_specifications.ttl.duration.count()));
761+
stateBuilder.add("state", VPackValue(pregel::ExecutionStateNames[_state]));
762+
stateBuilder.add("gss", VPackValue(_globalSuperstep));
763+
764+
// Additional attributes added during actor rework
765+
stateBuilder.add("graphLoaded", VPackValue(_graphLoaded));
766+
std::string user = ExecContext::current().user();
767+
if (!user.empty()) {
768+
stateBuilder.add("user", VPackValue(user));
769+
} else {
770+
stateBuilder.add("user", VPackSlice::nullSlice());
771+
}
764772
};
765773

766774
auto addAdditionalOutputToBuilder = [&](VPackBuilder& builder) -> void {
@@ -788,7 +796,21 @@ void Conductor::persistPregelState(ExecutionState state) {
788796
builder.add(VPackValue(gssTime.elapsedSeconds().count()));
789797
}
790798
}
799+
_masterContext->_aggregators->serializeValues(builder);
791800
_statistics.serializeValues(builder);
801+
if (_state != ExecutionState::RUNNING || ExecutionState::LOADING) {
802+
builder.add("vertexCount", VPackValue(_totalVerticesCount));
803+
builder.add("edgeCount", VPackValue(_totalEdgesCount));
804+
}
805+
builder.add("parallelism", VPackValue(_specifications.parallelism));
806+
if (_masterContext) {
807+
VPackObjectBuilder ob(&builder, "masterContext");
808+
_masterContext->serializeValues(builder);
809+
}
810+
811+
builder.add(VPackValue("detail"));
812+
auto conductorStatus = _status.accumulate();
813+
serialize(builder, conductorStatus);
792814
};
793815

794816
if (_state == ExecutionState::DONE) {

arangod/Pregel/REST/RestControlPregelHandler.cpp

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,6 @@ void RestControlPregelHandler::handleGetRequest() {
152152
std::vector<std::string> const& suffixes = _request->decodedSuffixes();
153153

154154
if (suffixes.empty()) {
155-
VPackBuilder builder;
156155
pregel::statuswriter::CollectionStatusWriter cWriter{_vocbase};
157156
return handlePregelHistoryResult(cWriter.readAllNonExpiredResults());
158157
}
@@ -166,30 +165,9 @@ void RestControlPregelHandler::handleGetRequest() {
166165
}
167166
auto executionNumber = arangodb::pregel::ExecutionNumber{
168167
arangodb::basics::StringUtils::uint64(suffixes[0])};
169-
auto c = _pregel.conductor(executionNumber);
170-
171-
if (nullptr == c) {
172-
auto status = _pregel.getStatus(executionNumber);
173-
if (not status.ok()) {
174-
generateError(rest::ResponseCode::NOT_FOUND, status.errorNumber(),
175-
status.errorMessage());
176-
return;
177-
}
178-
auto serializedState = inspection::serializeWithErrorT(status.get());
179-
if (!serializedState.ok()) {
180-
generateError(rest::ResponseCode::NOT_FOUND, TRI_ERROR_CURSOR_NOT_FOUND,
181-
fmt::format("Cannot serialize status: {}",
182-
serializedState.error().error()));
183-
return;
184-
}
185-
generateResult(rest::ResponseCode::OK, serializedState.get().slice());
186-
return;
187-
}
188-
189-
VPackBuilder builder;
190-
c->toVelocyPack(builder);
191-
generateResult(rest::ResponseCode::OK, builder.slice());
192-
return;
168+
pregel::statuswriter::CollectionStatusWriter cWriter{_vocbase,
169+
executionNumber};
170+
return handlePregelHistoryResult(cWriter.readResult(), true);
193171
} else if ((suffixes.size() >= 1 || suffixes.size() <= 2) &&
194172
suffixes.at(0) == "history") {
195173
if (_pregel.isStopping()) {
@@ -206,7 +184,7 @@ void RestControlPregelHandler::handleGetRequest() {
206184
arangodb::basics::StringUtils::uint64(suffixes.at(1))};
207185
pregel::statuswriter::CollectionStatusWriter cWriter{_vocbase,
208186
executionNumber};
209-
return handlePregelHistoryResult(cWriter.readResult());
187+
return handlePregelHistoryResult(cWriter.readResult(), true);
210188
}
211189
}
212190

@@ -216,7 +194,7 @@ void RestControlPregelHandler::handleGetRequest() {
216194
}
217195

218196
void RestControlPregelHandler::handlePregelHistoryResult(
219-
ResultT<OperationResult> result) {
197+
ResultT<OperationResult> result, bool onlyReturnFirstAqlResultEntry) {
220198
if (result.fail()) {
221199
// check outer ResultT result
222200
generateError(rest::ResponseCode::BAD, result.errorNumber(),
@@ -243,7 +221,20 @@ void RestControlPregelHandler::handlePregelHistoryResult(
243221
// Truncate does not deliver a proper slice in a Cluster.
244222
generateResult(rest::ResponseCode::OK, VPackSlice::trueSlice());
245223
} else {
246-
generateResult(rest::ResponseCode::OK, result.get().slice());
224+
if (onlyReturnFirstAqlResultEntry) {
225+
TRI_ASSERT(result->slice().isArray());
226+
if (result.get().slice().at(0).isNull()) {
227+
// due to AQL returning "null" values in case a document does not
228+
// exist ....
229+
Result nf = Result(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
230+
ResponseCode code = GeneralResponse::responseCode(nf.errorNumber());
231+
generateError(code, nf.errorNumber(), nf.errorMessage());
232+
} else {
233+
generateResult(rest::ResponseCode::OK, result.get().slice().at(0));
234+
}
235+
} else {
236+
generateResult(rest::ResponseCode::OK, result.get().slice());
237+
}
247238
}
248239
} else {
249240
// Should always have a Slice, doing this check to be sure.

arangod/Pregel/REST/RestControlPregelHandler.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ class RestControlPregelHandler : public arangodb::RestVocbaseBaseHandler {
5555
// - /_api/control_pregel/history[/<id>]
5656
void handleDeleteRequest();
5757

58-
void handlePregelHistoryResult(ResultT<OperationResult> opResult);
58+
void handlePregelHistoryResult(ResultT<OperationResult> opResult,
59+
bool onlyReturnFirstAqlResultEntry = false);
5960

6061
pregel::PregelFeature& _pregel;
6162
};

arangod/Pregel/StatusWriter/CollectionStatusWriter.cpp

Lines changed: 89 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ CollectionStatusWriter::CollectionStatusWriter(TRI_vocbase_t& vocbase,
5050
StaticStrings::PregelCollection);
5151
}
5252
_logicalCollection = std::move(logicalCollection);
53+
if (!ExecContext::current().user().empty()) {
54+
_user = ExecContext::current().user();
55+
}
5356
};
5457

5558
CollectionStatusWriter::CollectionStatusWriter(TRI_vocbase_t& vocbase)
@@ -62,6 +65,9 @@ CollectionStatusWriter::CollectionStatusWriter(TRI_vocbase_t& vocbase)
6265
StaticStrings::PregelCollection);
6366
}
6467
_logicalCollection = std::move(logicalCollection);
68+
if (!ExecContext::current().user().empty()) {
69+
_user = ExecContext::current().user();
70+
}
6571
};
6672

6773
auto CollectionStatusWriter::createResult(velocypack::Slice data)
@@ -89,47 +95,94 @@ auto CollectionStatusWriter::createResult(velocypack::Slice data)
8995
}
9096

9197
auto CollectionStatusWriter::readResult() -> OperationResult {
92-
if (_executionNumber.value == 0) {
93-
return OperationResult(Result(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND), {});
98+
std::shared_ptr<VPackBuilder> bindParameter =
99+
std::make_shared<VPackBuilder>();
100+
bindParameter->openObject();
101+
bindParameter->add("pid", VPackValue(_executionNumber.value));
102+
bindParameter->add("collectionName",
103+
VPackValue(StaticStrings::PregelCollection));
104+
if (_user.has_value() && _user.value() != "root") {
105+
bindParameter->add("user", _user.value());
94106
}
95-
OperationData opData(_executionNumber.value);
96-
auto accessModeType = AccessMode::Type::READ;
97-
SingleCollectionTransaction trx(ctx(), StaticStrings::PregelCollection,
98-
accessModeType);
99-
trx.addHint(transaction::Hints::Hint::SINGLE_OPERATION);
100-
OperationOptions options(ExecContext::current());
107+
bindParameter->close();
101108

102-
// begin transaction
103-
Result transactionResult = trx.begin();
104-
if (transactionResult.fail()) {
105-
return OperationResult{std::move(transactionResult), options};
109+
// TODO: GORDO-1607
110+
// Note: As soon as we introduce an inspectable struct to the data we actually
111+
// write into the pregel collection, we can remove change "entry.data" to
112+
// just "entry".
113+
std::string queryString;
114+
if (_user.has_value() && _user.value() != "root") {
115+
queryString = R"(
116+
LET potentialDocument = DOCUMENT(CONCAT(@collectionName, '/', @pid)).data
117+
RETURN potentialDocument.user == @user ? potentialDocument : null
118+
)";
119+
} else {
120+
queryString = R"(
121+
RETURN DOCUMENT(CONCAT(@collectionName, '/', @pid)).data
122+
)";
106123
}
107-
auto payload = inspection::serializeWithErrorT(opData);
108-
return handleOperationResult(
109-
trx, options, transactionResult,
110-
trx.documentAsync(StaticStrings::PregelCollection, payload->slice(), {})
111-
.get());
124+
125+
return executeQuery(queryString, bindParameter);
112126
}
113127

114128
auto CollectionStatusWriter::readAllNonExpiredResults() -> OperationResult {
129+
std::shared_ptr<VPackBuilder> bindParameter =
130+
std::make_shared<VPackBuilder>();
131+
bindParameter->openObject();
132+
bindParameter->add("@collectionName",
133+
VPackValue(StaticStrings::PregelCollection));
134+
if (_user.has_value() && _user.value() != "root") {
135+
bindParameter->add("user", _user.value());
136+
}
137+
bindParameter->close();
138+
115139
// TODO: GORDO-1607
116140
// Note: As soon as we introduce an inspectable struct to the data we actually
117141
// write into the pregel collection, we can remove change "entry.data" to
118142
// just "entry".
119-
std::string queryString = R"(
120-
FOR entry IN _pregel_queries
121-
FILTER DATE_DIFF(DATE_NOW(), DATE_TIMESTAMP(entry.data.expires), "s") >= 0
122-
OR entry.data.expires == null
123-
RETURN entry.data
124-
)";
125-
126-
return executeQuery(queryString);
143+
std::string queryString;
144+
if (_user.has_value() && _user.value() != "root") {
145+
queryString = R"(
146+
FOR entry IN @@collectionName
147+
FILTER (entry.data.user == @user AND DATE_DIFF(DATE_NOW(), DATE_TIMESTAMP(entry.data.expires), "s") >= 0)
148+
OR (entry.data.user == @user AND entry.data.expires == null)
149+
RETURN entry.data
150+
)";
151+
} else {
152+
queryString = R"(
153+
FOR entry IN @@collectionName
154+
FILTER DATE_DIFF(DATE_NOW(), DATE_TIMESTAMP(entry.data.expires), "s") >= 0
155+
OR entry.data.expires == null
156+
RETURN entry.data
157+
)";
158+
}
159+
160+
return executeQuery(queryString, bindParameter);
127161
}
128162

129163
auto CollectionStatusWriter::readAllResults() -> OperationResult {
164+
std::shared_ptr<VPackBuilder> bindParameter =
165+
std::make_shared<VPackBuilder>();
166+
bindParameter->openObject();
167+
bindParameter->add("@collectionName",
168+
VPackValue(StaticStrings::PregelCollection));
169+
if (_user.has_value() && _user.value() != "root") {
170+
bindParameter->add("user", _user.value());
171+
}
172+
bindParameter->close();
173+
130174
// TODO: GORDO-1607
131-
std::string queryString = "FOR entry IN _pregel_queries RETURN entry";
132-
return executeQuery(queryString);
175+
std::string queryString;
176+
if (_user.has_value() && _user.value() != "root") {
177+
queryString = R"(
178+
FOR entry IN @@collectionName
179+
FILTER entry.data.user == @user
180+
RETURN entry.data
181+
)";
182+
} else {
183+
queryString = "FOR entry IN @@collectionName RETURN entry.data";
184+
}
185+
return executeQuery(queryString, bindParameter);
133186
}
134187

135188
auto CollectionStatusWriter::updateResult(velocypack::Slice data)
@@ -193,10 +246,17 @@ auto CollectionStatusWriter::deleteAllResults() -> OperationResult {
193246
trx.truncateAsync(StaticStrings::PregelCollection, options).get());
194247
}
195248

196-
auto CollectionStatusWriter::executeQuery(std::string queryString)
249+
auto CollectionStatusWriter::executeQuery(
250+
std::string queryString,
251+
std::optional<std::shared_ptr<VPackBuilder>> bindParameters)
197252
-> OperationResult {
253+
std::shared_ptr<VPackBuilder> bindParams = nullptr;
254+
if (bindParameters.has_value()) {
255+
bindParams = bindParameters.value();
256+
}
257+
198258
auto query = arangodb::aql::Query::create(
199-
ctx(), arangodb::aql::QueryString(queryString), nullptr);
259+
ctx(), arangodb::aql::QueryString(std::move(queryString)), bindParams);
200260
query->queryOptions().skipAudit = true;
201261
aql::QueryResult queryResult = query->executeSync();
202262
if (queryResult.result.fail()) {

arangod/Pregel/StatusWriter/CollectionStatusWriter.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,10 @@ struct CollectionStatusWriter : StatusWriterInterface {
7272
};
7373

7474
private:
75-
[[nodiscard]] auto executeQuery(std::string queryString) -> OperationResult;
75+
[[nodiscard]] auto executeQuery(
76+
std::string queryString,
77+
std::optional<std::shared_ptr<VPackBuilder>> bindParameters)
78+
-> OperationResult;
7679
[[nodiscard]] auto handleOperationResult(SingleCollectionTransaction& trx,
7780
OperationOptions& options,
7881
Result& transactionResult,
@@ -83,6 +86,7 @@ struct CollectionStatusWriter : StatusWriterInterface {
8386
private:
8487
DatabaseGuard _vocbaseGuard;
8588
ExecutionNumber _executionNumber;
89+
std::optional<std::string> _user;
8690
std::shared_ptr<LogicalCollection> _logicalCollection;
8791
};
8892

0 commit comments

Comments
 (0)
0