8000 Bug fix/uaf communicator request abort (#3216) · MohammedDeveloper/arangodb@f0ae3d3 · GitHub
[go: up one dir, main page]

Skip to content

Commit f0ae3d3

Browse files
m0ppersfceller
authored andcommitted
Bug fix/uaf communicator request abort (arangodb#3216)
* Fix abortion of requests * Fix progress callback
1 parent 02014da commit f0ae3d3

File tree

7 files changed

+151
-10
lines changed

7 files changed

+151
-10
lines changed

Installation/Pipeline/Jenkinsfile.groovy

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,9 @@ def executeTests(os, edition, maintainer, mode, engine, portInit, archDir, arch,
709709

710710
withEnv(["TMPDIR=${tmpDir}", "TEMPDIR=${tmpDir}", "TMP=${tmpDir}"]) {
711711
if (os == "windows") {
712-
echo "executing ${command}"
712+
def hostname = powershell(returnStdout: true, script: "hostname")
713+
714+
echo "executing ${command} on ${hostname}"
713715
powershell "cd ${runDir} ; ${command} | Add-Content -PassThru ${logFile}"
714716
}
715717
else {

arangod/Cluster/ClusterComm.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1212,6 +1212,10 @@ void ClusterCommThread::run() {
12121212
}
12131213
}
12141214
_cc->communicator()->abortRequests();
1215+
LOG_TOPIC(DEBUG, Logger::CLUSTER) << "waiting for curl to stop remaining handles";
1216+
while (_cc->communicator()->work_once() > 0) {
1217+
usleep(10);
1218+
}
12151219

12161220
LOG_TOPIC(DEBUG, Logger::CLUSTER) << "stopped ClusterComm thread";
12171221
}

lib/SimpleHttpClient/Communicator.cpp

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,12 @@ void Communicator::createRequestInProgress(NewRequest const& newRequest) {
304304
curl_easy_setopt(handle, CURLOPT_URL, url.c_str());
305305
curl_easy_setopt(handle, CURLOPT_VERBOSE, 1L);
306306
curl_easy_setopt(handle, CURLOPT_PROXY, "");
307+
308+
// the xfer/progress options are only used to handle request abortions
309+
curl_easy_setopt(handle, CURLOPT_NOPROGRESS, 0L);
310+
curl_easy_setopt(handle, CURLOPT_XFERINFOFUNCTION, Communicator::curlProgress);
311+
curl_easy_setopt(handle, CURLOPT_XFERINFODATA, handleInProgress->_rip.get());
312+
307313
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, Communicator::readBody);
308314
curl_easy_setopt(handle, CURLOPT_WRITEDATA, handleInProgress->_rip.get());
309315
curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, Communicator::readHeaders);
@@ -394,6 +400,10 @@ void Communicator::handleResult(CURL* handle, CURLcode rc) {
394400
if (rip == nullptr) {
395401
return;
396402
}
403+
404+
if (rip->_options._curlRcFn) {
405+
(*rip->_options._curlRcFn)(rc);
406+
}
397407
std::string prefix("Communicator(" + std::to_string(rip->_ticketId) +
398408
") // ");
399409
LOG_TOPIC(TRACE, Logger::COMMUNICATION)
@@ -434,7 +444,23 @@ void Communicator::handleResult(CURL* handle, CURLcode rc) {
434444
case CURLE_OPERATION_TIMEDOUT:
435445
case CURLE_RECV_ERROR:
436446
case CURLE_GOT_NOTHING:
437-
rip->_callbacks._onError(TRI_ERROR_CLUSTER_TIMEOUT, {nullptr});
447+
if (rip->_aborted) {
448+
rip->_callbacks._onError(TRI_COMMUNICATOR_REQUEST_ABORTED, {nullptr});
449+
} else {
450+
rip->_callbacks._onError(TRI_ERROR_CLUSTER_TIMEOUT, {nullptr});
451+
}
452+
break;
453+
case CURLE_WRITE_ERROR:
454+
if (rip->_aborted) {
455+
rip->_callbacks._onError(TRI_COMMUNICATOR_REQUEST_ABORTED, {nullptr});
456+
} else {
457+
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "Got a write error from curl but request was not aborted";
458+
rip->_callbacks._onError(TRI_ERROR_INTERNAL, {nullptr});
459+
}
460+
break;
461+
case CURLE_ABORTED_BY_CALLBACK:
462+
TRI_ASSERT(rip->_aborted);
463+
rip->_callbacks._onError(TRI_COMMUNICATOR_REQUEST_ABORTED, {nullptr});
438464
break;
439465
default:
440466
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "Curl return " << rc;
@@ -455,9 +481,11 @@ void Communicator::transformResult(CURL* handle,
455481

456482
size_t Communicator::readBody(void* data, size_t size, size_t nitems,
457483
void* userp) {
458-
size_t realsize = size * nitems;
459-
460484
RequestInProgress* rip = (struct RequestInProgress*)userp;
485+
if (rip->_aborted) {
486+
return 0;
487+
}
488+
size_t realsize = size * nitems;
461489
try {
462490
rip->_responseBody->appendText((char*)data, realsize);
463491
return realsize;
@@ -491,6 +519,13 @@ void Communicator::logHttpHeaders(std::string const& prefix,
491519
}
492520
}
493521

522+
int Communicator::curlProgress(void* userptr, curl_off_t dltotal,
523+
curl_off_t dlnow, curl_off_t ultotal,
524+
curl_off_t ulnow) {
525+
RequestInProgress* rip = (struct RequestInProgress*)userptr;
526+
return (int) rip->_aborted;
527+
}
528+
494529
int Communicator::curlDebug(CURL* handle, curl_infotype type, char* data,
495530
size_t size, void* userptr) {
496531
arangodb::communicator::RequestInProgress* request = nullptr;
@@ -530,6 +565,9 @@ size_t Communicator::readHeaders(char* buffer, size_t size, size_t nitems,
530565
void* userptr) {
531566
size_t realsize = size * nitems;
532567
RequestInProgress* rip = (struct RequestInProgress*)userptr;
568+
if (rip->_aborted) {
569+
return 0;
570+
}
533571

534572
std::string const header(buffer, realsize);
535573
size_t pivot = header.find_first_of(':');
@@ -607,11 +645,9 @@ void Communicator::abortRequestInternal(Ticket ticketId) {
607645
if (handle == _handlesInProgress.end()) {
608646
return;
609647
}
648+
610649
std::string prefix("Communicator(" + std::to_string(handle->second->_rip->_ticketId) +
611650
") // ");
612651
LOG_TOPIC(WARN, Logger::REQUESTS) << prefix << "aborting request to " << handle->second->_rip->_destination.url();
613-
handle->second->_rip->_callbacks._onError(TRI_COMMUNICATOR_REQUEST_ABORTED,
614-
{nullptr});
615-
_handlesInProgress.erase(ticketId);
616-
}
617-
652+
handle->second->_rip->_aborted = true;
653+
}

lib/SimpleHttpClient/Communicator.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ struct RequestInProgress {
5151
_requestHeaders(nullptr),
5252
_startTime(0.0),
5353
_responseBody(new basics::StringBuffer(false)),
54-
_options(options) {
54+
_options(options),
55+
_aborted(false) {
5556
_errorBuffer[0] = '\0';
5657
}
5758

@@ -77,6 +78,7 @@ struct RequestInProgress {
7778
Options _options;
7879

7980
char _errorBuffer[CURL_ERROR_SIZE];
81+
bool _aborted;
8082
};
8183

8284
struct CurlHandle {
@@ -166,6 +168,7 @@ class Communicator {
166168
static size_t readHeaders(char* buffer, size_t size, size_t nitems,
167169
void* userdata);
168170
static int curlDebug(CURL*, curl_infotype, char*, size_t, void*);
171+
static int curlProgress(void*, curl_off_t, curl_off_t, curl_off_t, curl_off_t);
169172
static void logHttpHeaders(std::string const&, std::string const&);
170173
static void logHttpBody(std::string const&, std::string const&);
171174
};

lib/SimpleHttpClient/Options.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,17 @@
2424
#ifndef ARANGODB_SIMPLE_HTTP_CLIENT_OPTIONS_H
2525
#define ARANGODB_SIMPLE_HTTP_CLIENT_OPTIONS_H 1
2626

27+
#include <memory>
28+
#include <functional>
29+
#include "curl/curl.h"
30+
2731
namespace arangodb {
2832
namespace communicator {
2933
class Options {
3034
public:
3135
double requestTimeout = 120.0;
3236
double connectionTimeout = 2.0;
37+
std::shared_ptr<std::function<void(CURLcode)>> _curlRcFn = nullptr;
3338
};
3439
}
3540
}

tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ add_executable(
5757
RocksDBEngine/KeyTest.cpp
5858
RocksDBEngine/IndexEstimatorTest.cpp
5959
RocksDBEngine/TypeConversionTest.cpp
60+
SimpleHttpClient/CommunicatorTest.cpp
6061
main.cpp
6162
)
6263

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
////////////////////////////////////////////////////////////////////////////////
2+
/// @brief test suite for CuckooFilter based index selectivity estimator
3+
///
4+
/// @file
5+
///
6+
/// DISCLAIMER
7+
///
8+
/// Copyright 2017 ArangoDB GmbH, Cologne, Germany
9+
///
10+
/// Licensed under the Apache License, Version 2.0 (the "License");
11+
/// you may not use this file except in compliance with the License.
12+
/// You may obtain a copy of the License at
13+
///
14+
/// http://www.apache.org/licenses/LICENSE-2.0
15+
///
16+
/// Unless required by applicable law or agreed to in writing, software
17+
/// distributed under the License is distributed on an "AS IS" BASIS,
18+
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19+
/// See the License for the specific language governing permissions and
20+
/// limitations under the License.
21+
///
22+
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
23+
///
24+
/// @author Andreas Streichardt
25+
/// @author Copyright 2017, ArangoDB GmbH, Cologne, Germany
26+
////////////////////////////////////////////////////////////////////////////////
27+
28+
#include "catch.hpp"
29+
30+
#include "Rest/HttpRequest.h"
31+
#include "SimpleHttpClient/Communicator.h"
32+
#include "SimpleHttpClient/Callbacks.h"
33+
#include "SimpleHttpClient/Destination.h"
34+
35+
using namespace arangodb;
36+
using namespace arangodb::communicator;
37+
38+
TEST_CASE("requests are properly aborted", "[communicator]" ) {
39+
Communicator communicator;
40+
41+
bool callbacksCalled = false;
42+
43+
communicator::Callbacks callbacks([&callbacksCalled](std::unique_ptr<GeneralResponse> response) {
44+
WARN("RESULT: " << GeneralResponse::responseString(response->responseCode()));
45+
REQUIRE(false); // it should be aborted?!
46+
callbacksCalled = true;
47+
}, [&callbacksCalled](int errorCode, std::unique_ptr<GeneralResponse> response) {
48+
REQUIRE(!response);
49+
REQUIRE(errorCode == TRI_COMMUNICATOR_REQUEST_ABORTED);
50+
callbacksCalled = true;
51+
});
52+
auto request = std::unique_ptr<HttpRequest>(HttpRequest::createHttpRequest(rest::ContentType::TEXT, "", 0, {}));
53+
request->setRequestType(RequestType::GET);
54+
communicator::Options opt;
55+
auto destination = Destination("http://www.example.com");
56+
communicator.addRequest(destination, std::move(request), callbacks, opt);
57+
communicator.work_once();
58+
communicator.abortRequests();
59+
while (communicator.work_once() > 0) {
60+
usleep(1);
61+
}
62+
REQUIRE(callbacksCalled);
63+
}
64+
65+
TEST_CASE("requests will call the progress callback", "[communicator]") {
66+
Communicator communicator;
67+
68+
communicator::Callbacks callbacks([](std::unique_ptr<GeneralResponse> response) {
69+
}, [](int errorCode, std::unique_ptr<GeneralResponse> response) {
70+
});
71+
72+
auto request = std::unique_ptr<HttpRequest>(HttpRequest::createHttpRequest(rest::ContentType::TEXT, "", 0, {}));
73+
request->setRequestType(RequestType::GET);
74+
75+
// CURL_LAST /* never use! */ HA! FOOL!
76+
CURLcode curlRc = CURL_LAST;
77+
communicator::Options opt;
78+
opt._curlRcFn = std::make_shared<std::function<void(CURLcode)>>([&curlRc](CURLcode rc) {
79+
curlRc = rc;
80+
});
81+
82+
auto destination = Destination("http://www.example.com");
83+
communicator.addRequest(destination, std::move(request), callbacks, opt);
84+
communicator.work_once();
85+
communicator.abortRequests();
86+
while (communicator.work_once() > 0) {
87+
usleep(1);
88+
}
89+
REQUIRE(curlRc == CURLE_ABORTED_BY_CALLBACK); // curlRcFn was called
90+
}

0 commit comments

Comments
 (0)
0