8000 Bug fix/fix ssl vst (#6547) · arangodb/arangodb@8b26c9d · GitHub
[go: up one dir, main page]

Skip to content

Commit 8b26c9d

Browse files
authored
Bug fix/fix ssl vst (#6547)
1 parent 203b141 commit 8b26c9d

File tree

5 files changed

+115
-75
lines changed

5 files changed

+115
-75
lines changed

arangod/GeneralServer/HttpCommTask.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,12 @@ class HttpCommTask final : public GeneralCommTask {
2323
arangodb::Endpoint::TransportType transportType() override {
2424
return arangodb::Endpoint::TransportType::HTTP;
2525
}
26-
26+
27+
// whether or not this task can mix sync and async I/O
28+
// this is always true for the HTTPCommTask, because we are not
29+
// multiplexing I/O
30+
bool canUseMixedIO() const override { return true; }
31+
2732
private:
2833
bool processRead(double startTime) override;
2934
void compactify() override;

arangod/GeneralServer/VstCommTask.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,13 @@ VstCommTask::VstCommTask(Scheduler* scheduler, GeneralServer* server,
9595
ServerFeature>("Server")
9696
->vstMaxSize();
9797
}
98+
99+
// whether or not this task can mix sync and async I/O
100+
bool VstCommTask::canUseMixedIO() const {
101+
// in case SSL is used, we cannot use a combination of sync and async I/O
102+
// because that will make TLS fall apart
103+
return !_peer->isEncrypted();
104+
}
98105

99106
/// @brief send simple response including response body
100107
void VstCommTask::addSimpleResponse(rest::ResponseCode code, rest::ContentType respType,

arangod/GeneralServer/VstCommTask.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ class VstCommTask final : public GeneralCommTask {
4646
return arangodb::Endpoint::TransportType::VST;
4747
}
4848

49+
// whether or not this task can mix sync and async I/O
50+
bool canUseMixedIO() const override;
51+
4952
protected:
5053
// read data check if chunk and message are complete
5154
// if message is complete execute a request

arangod/Scheduler/SocketTask.cpp

Lines changed: 95 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ bool SocketTask::start() {
119119

120120
if (_closeRequested.load(std::memory_order_acquire)) {
121121
LOG_TOPIC(DEBUG, Logger::COMMUNICATION)
122-
<< "cannot start, close alread in progress";
122+
<< "cannot start, close already in progress";
123123
return false;
124124
}
125125

@@ -191,6 +191,7 @@ void SocketTask::closeStream() {
191191
if (_abandoned.load(std::memory_order_acquire)) {
192192
return;
193193
}
194+
194195
// strand::dispatch may execute this immediately if this
195196
// is called on a thread inside the same strand
196197
auto self = shared_from_this();
@@ -204,7 +205,7 @@ void SocketTask::closeStream() {
204205
void SocketTask::closeStreamNoLock() {
205206
TRI_ASSERT(_peer != nullptr);
206207
TRI_ASSERT(_peer->runningInThisThread());
207-
208+
208209
bool mustCloseSend = !_closedSend.load(std::memory_order_acquire);
209210
bool mustCloseReceive = !_closedReceive.load(std::memory_order_acquire);
210211

@@ -291,6 +292,7 @@ bool SocketTask::trySyncRead() {
291292

292293
asio_ns::error_code err;
293294
TRI_ASSERT(_peer != nullptr);
295+
294296
if (0 == _peer->available(err)) {
295297
return false;
296298
}
@@ -315,17 +317,15 @@ bool SocketTask::trySyncRead() {
315317

316318
_readBuffer.increaseLength(bytesRead);
317319

318-
if (err) {
319-
if (err == asio_ns::error::would_block) {
320-
return false;
321-
} else {
322-
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "trySyncRead failed with: "
323-
<< err.message();
324-
return false;
325-
}
320+
if (!err) {
321+
return true;
326322
}
327323

328-
return true;
324+
if (err != asio_ns::error::would_block && err != asio_ns::error::try_again) {
325+
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "trySyncRead failed with: " << err.message();
326+
}
327+
328+
return false;
329329
}
330330

331331
// caller must hold the _lock
@@ -379,43 +379,49 @@ void SocketTask::asyncReadSome() {
379379
TRI_ASSERT(_peer != nullptr);
380380
TRI_ASSERT(_peer->runningInThisThread());
381381

382-
try {
383-
size_t const MAX_DIRECT_TRIES = 2;
384-
size_t n = 0;
385-
386-
while (++n <= MAX_DIRECT_TRIES &&
387-
!_abandoned.load(std::memory_order_acquire)) {
388-
if (!trySyncRead()) {
389-
if (n < MAX_DIRECT_TRIES) {
390-
std::this_thread::yield();
382+
if (this->canUseMixedIO()) {
383+
// try some direct read only for non-SSL mode
384+
// in SSL mode it will fall apart when mixing direct reads and async
385+
// reads later
386+
try {
387+
size_t const MAX_DIRECT_TRIES = 2;
388+
size_t n = 0;
389+
390+
while (++n <= MAX_DIRECT_TRIES &&
391+
!_abandoned.load(std::memory_order_acquire)) {
392+
if (!trySyncRead()) {
393+
if (n < MAX_DIRECT_TRIES) {
394+
std::this_thread::yield();
395+
}
396+
continue;
391397
}
392-
continue;
393-
}
394398

395-
if (_abandoned.load(std::memory_order_acquire)) {
396-
return;
399+
if (_abandoned.load(std::memory_order_acquire)) {
400+
return;
401+
}
402+
403+
// ignore the result of processAll, try to read more bytes down below
404+
processAll();
405+
compactify();
397406
}
407+
} catch (asio_ns::system_error const& err) {
408+
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "sync read failed with: "
409+
<< err.what();
410+
closeStreamNoLock();
411+
return;
412+
} catch (...) {
413+
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "general error on stream";
398414

399-
// ignore the result of processAll, try to read more bytes down below
400-
processAll();
401-
compactify();
415+
closeStreamNoLock();
416+
return;
402417
}
403-
} catch (asio_ns::system_error const& err) {
404-
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "sync read failed with: "
405-
<< err.what();
406-
closeStreamNoLock();
407-
return;
408-
} catch (...) {
409-
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "general error on stream";
410-
411-
closeStreamNoLock();
412-
return;
413418
}
414-
419+
415420
// try to read more bytes
416421
if (_abandoned.load(std::memory_order_acquire)) {
417422
return;
418-
} else if (!reserveMemory()) {
423+
}
424+
if (!reserveMemory()) {
419425
LOG_TOPIC(TRACE, Logger::COMMUNICATION) << "failed to reserve memory";
420426
return;
421427
}
@@ -460,54 +466,72 @@ void SocketTask::asyncWriteSome() {
460466
if (_writeBuffer.empty()) {
461467
return;
462468
}
469+
470+
TRI_ASSERT(_writeBuffer._buffer != nullptr);
463471
size_t total = _writeBuffer._buffer->length();
464472
size_t written = 0;
465473

466474
TRI_ASSERT(!_abandoned);
467-
TRI_ASSERT(_peer != nullptr);
468475

469476
asio_ns::error_code err;
470-
err.clear();
471-
while (true) {
472-
RequestStatistics::SET_WRITE_START(_writeBuffer._statistics);
473-
written = _peer->writeSome(_writeBuffer._buffer, err);
477+
478+
if (this->canUseMixedIO()) {
479+
// try some direct writes only for non-SSL mode
480+
// in SSL mode it will fall apart when mixing direct writes and async
481+
// writes later
482+
while (true) {
483+
TRI_ASSERT(_writeBuffer._buffer != nullptr);
484+
485+
// we can directly skip sending empty buffers
486+
if (_writeBuffer._buffer->length() > 0) {
487+
RequestStatistics::SET_WRITE_START(_writeBuffer._statistics);
488+
written = _peer->writeSome(_writeBuffer._buffer, err);
489+
490+
RequestStatistics::ADD_SENT_BYTES(_writeBuffer._statistics, written);
491+
492+
if (err || written != total) {
493+
// unable to write everything at once, might be a lot of data
494+
// above code does not update the buffer positon
495+
break;
496+
}
474497

475-
if (err) {
476-
break;
477-
}
498+
TRI_ASSERT(written > 0);
499+
}
478500

479-
RequestStatistics::ADD_SENT_BYTES(_writeBuffer._statistics, written);
501+
if (!completedWriteBuffer()) {
502+
return;
503+
}
480504

481-
if (written != total) {
482-
// unable to write everything at once, might be a lot of data
483-
// above code does not update the buffer positon
484-
break;
505+
// try to send next buffer
506+
TRI_ASSERT(_writeBuffer._buffer != nullptr);
507+
total = _writeBuffer._buffer->length();
485508
}
486509

487-
if (!completedWriteBuffer()) {
510+
// write could have blocked which is the only acceptable error
511+
if (err && err != asio_ns::error::would_block && err != asio_ns::error::try_again) {
512+
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "sync write on stream failed with: "
513+
<< err.message();
514+
closeStreamNoLock();
488515
return;
489516
}
517+
} // !_peer->isEncrypted
490518

491-
// try to send next buffer
492-
total = _writeBuffer._buffer->length();
493-
written = 0;
494-
}
495-
496-
// write could have blocked which is the only acceptable error
497-
if (err && err != ::asio_ns::error::would_block) {
498-
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "sync write on failed with: "
499-
<< err.message();
500-
closeStreamNoLock();
501-
return;
502-
}
519+
// we will be getting here in the following cases
520+
// - encrypted mode (SSL)
521+
// - we send only parts of the write buffer, but have more to send
522+
// - we got the error would_block/try_again when sending data
523+
// in this case we dispatch an async write
503524

504525
if (_abandoned.load(std::memory_order_acquire)) {
505526
return;
506527
}
507-
528+
529+
TRI_ASSERT(_writeBuffer._buffer != nullptr);
530+
508531
// so the code could have blocked at this point or not all data
509532
// was written in one go, begin writing at offset (written)
510533
auto self = shared_from_this();
534+
511535
_peer->asyncWrite(
512536
asio_ns::buffer(_writeBuffer._buffer->begin() + written, total - written),
513537
[self, this](const asio_ns::error_code& ec, std::size_t transferred) {
@@ -516,7 +540,8 @@ void SocketTask::asyncWriteSome() {
516540

517541
if (_abandoned.load(std::memory_order_acquire)) {
518542
return;
519-
} else if (ec) {
543+
}
544+
if (ec) {
520545
LOG_TOPIC(DEBUG, Logger::COMMUNICATION) << "write on failed with: "
521546
<< ec.message();
522547
closeStream();
@@ -527,11 +552,9 @@ void SocketTask::asyncWriteSome() {
527552
transferred);
528553

529554
if (completedWriteBuffer()) {
530-
_peer->post([self, this] {
531-
if (!_abandoned.load(std::memory_order_acquire)) {
532-
asyncWriteSome();
533-
}
534-
});
555+
if (!_abandoned.load(std::memory_order_acquire)) {
556+
asyncWriteSome();
557+
}
535558
}
536559
});
537560
}

arangod/Scheduler/SocketTask.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ class SocketTask : virtual public Task {
5555

5656
public:
5757
bool start();
58+
59+
// whether or not this task can mix sync and async I/O
60+
virtual bool canUseMixedIO() const = 0;
5861

5962
protected:
6063
// caller will hold the _lock
@@ -144,11 +147,10 @@ class SocketTask : virtual public Task {
144147
// method returns true. Used for VST upgrade
145148
bool abandon() { return !(_abandoned.exchange(true)); }
146149

147-
/// lease a string buffer from pool
150+
// lease a string buffer from pool
148151
basics::StringBuffer* leaseStringBuffer(size_t length);
149152
void returnStringBuffer(basics::StringBuffer*);
150153

151-
protected:
152154
bool processAll();
153155
void triggerProcessAll();
154156

0 commit comments

Comments
 (0)
0