@@ -65,31 +65,6 @@ inline bool startsWith(std::string const& path, char const* other) {
6565 path.compare (0 , size, other, size) == 0 );
6666}
6767
68- } // namespace
69-
70- // -----------------------------------------------------------------------------
71- // --SECTION-- constructors and destructors
72- // -----------------------------------------------------------------------------
73-
74- CommTask::CommTask (GeneralServer& server,
75- ConnectionInfo info)
76- : _server(server),
77- _connectionInfo(std::move(info)),
78- _connectionStatistics(ConnectionStatistics::acquire()),
79- _auth(AuthenticationFeature::instance()) {
80- TRI_ASSERT (_auth != nullptr );
81- _connectionStatistics.SET_START ();
82- }
83-
84- CommTask::~CommTask () {
85- _connectionStatistics.SET_END ();
86- }
87-
88- // -----------------------------------------------------------------------------
89- // --SECTION-- protected methods
90- // -----------------------------------------------------------------------------
91-
92- namespace {
9368TRI_vocbase_t* lookupDatabaseFromRequest (application_features::ApplicationServer& server,
9469 GeneralRequest& req) {
9570 // get database name from request
@@ -127,8 +102,53 @@ bool resolveRequestContext(application_features::ApplicationServer& server,
127102 // the "true" means the request is the owner of the context
128103 return true ;
129104}
105+
106+ bool queueTimeViolated (GeneralRequest const & req) {
107+ // check if the client sent the "x-arango-queue-time-seconds" header
108+ bool found = false ;
109+ std::string const & queueTimeValue = req.header (StaticStrings::XArangoQueueTimeSeconds, found);
110+ if (found) {
111+ // yes, now parse the sent time value. if the value sent by client cannot be
112+ // parsed as a double, then it will be handled as if "0.0" was sent - i.e. no
113+ // queuing time restriction
114+ double requestedQueueTime = StringUtils::doubleDecimal (queueTimeValue);
115+ if (requestedQueueTime > 0.0 ) {
116+ // value is > 0.0, so now check the last dequeue time that the scheduler reported
117+ double lastDequeueTime = static_cast <double >(
118+ SchedulerFeature::SCHEDULER->getLastLowPriorityDequeueTime ()) / 1000.0 ;
119+
120+ if (lastDequeueTime > requestedQueueTime) {
121+ // the log topic should actually be REQUESTS here, but the default log level
122+ // for the REQUESTS log topic is FATAL, so if we logged here in INFO level,
123+ // it would effectively be suppressed. thus we are using the Scheduler's
124+ // log topic here, which is somewhat related.
125+ SchedulerFeature::SCHEDULER->trackQueueTimeViolation ();
126+ LOG_TOPIC (" 1bbcc" , WARN, Logger::THREADS)
127+ << " dropping incoming request because the client-specified maximum queue time requirement ("
128+ << requestedQueueTime << " s) would be violated by current queue time (" << lastDequeueTime << " s)" ;
129+ return true ;
130+ }
131+ }
132+ }
133+ return false ;
134+ }
135+
130136} // namespace
131137
138+ CommTask::CommTask (GeneralServer& server,
139+ ConnectionInfo info)
140+ : _server(server),
141+ _connectionInfo(std::move(info)),
142+ _connectionStatistics(ConnectionStatistics::acquire()),
143+ _auth(AuthenticationFeature::instance()) {
144+ TRI_ASSERT (_auth != nullptr );
145+ _connectionStatistics.SET_START ();
146+ }
147+
148+ CommTask::~CommTask () {
149+ _connectionStatistics.SET_END ();
150+ }
151+
132152// / Must be called before calling executeRequest, will send an error
133153// / response if execution is supposed to be aborted
134154
@@ -311,6 +331,12 @@ void CommTask::finishExecution(GeneralResponse& res, std::string const& origin)
311331 // use "IfNotSet" to not overwrite an existing response header
312332 res.setHeaderNCIfNotSet (StaticStrings::XContentTypeOptions, StaticStrings::NoSniff);
313333 }
334+
335+ // add "x-arango-queue-time-seconds" header
336+ if (_server.server ().getFeature <GeneralServerFeature>().returnQueueTimeHeader ()) {
337+ res.setHeaderNC (StaticStrings::XArangoQueueTimeSeconds,
338+ std::to_string (static_cast <double >(SchedulerFeature::SCHEDULER->getLastLowPriorityDequeueTime ()) / 1000.0 ));
339+ }
314340}
315341
316342// / Push this request into the execution pipeline
@@ -336,8 +362,17 @@ void CommTask::executeRequest(std::unique_ptr<GeneralRequest> request,
336362 LOG_TOPIC (" 2cece" , WARN, Logger::REQUESTS)
337363 << " could not find corresponding request/response" ;
338364 }
339-
365+
340366 rest::ContentType const respType = request->contentTypeResponse ();
367+
368+ // check if "x-arango-queue-time-seconds" header was set, and its value
369+ // is above the current dequeing time
370+ if (::queueTimeViolated (*request)) {
371+ sendErrorResponse (rest::ResponseCode::PRECONDITION_FAILED,
372+ respType, messageId, TRI_ERROR_QUEUE_TIME_REQUIREMENT_VIOLATED);
373+ return ;
374+ }
375+
341376 // create a handler, this takes ownership of request and response
342377 auto & server = _server.server ();
343378 auto & factory = server.getFeature <GeneralServerFeature>().handlerFactory ();
@@ -351,7 +386,6 @@ void CommTask::executeRequest(std::unique_ptr<GeneralRequest> request,
351386 VPackBuffer<uint8_t >());
352387 return ;
353388 }
354-
355389 // forward to correct server if necessary
356390 bool forwarded;
357391 auto res = handler->forwardRequest (forwarded);
0 commit comments