@@ -120,7 +120,7 @@ bool SocketTask::start() {
120
120
121
121
if (_closeRequested.load (std::memory_order_acquire)) {
122
122
LOG_TOPIC (DEBUG, Logger::COMMUNICATION)
123
- << " cannot start, close alread in progress" ;
123
+ << " cannot start, close already in progress" ;
124
124
return false ;
125
125
}
126
126
@@ -192,6 +192,7 @@ void SocketTask::closeStream() {
192
192
if (_abandoned.load (std::memory_order_acquire)) {
193
193
return ;
194
194
}
195
+
195
196
// strand::dispatch may execute this immediately if this
196
197
// is called on a thread inside the same strand
197
198
auto self = shared_from_this ();
@@ -291,6 +292,7 @@ bool SocketTask::trySyncRead() {
291
292
292
293
asio_ns::error_code err;
293
294
TRI_ASSERT (_peer != nullptr );
295
+
294
296
if (0 == _peer->available (err)) {
295
297
return false ;
296
298
}
@@ -315,17 +317,15 @@ bool SocketTask::trySyncRead() {
315
317
316
318
_readBuffer.increaseLength (bytesRead);
317
319
318
- if (err) {
319
- if (err == asio_ns::error::would_block) {
320
- return false ;
321
- } else {
322
- LOG_TOPIC (DEBUG, Logger::COMMUNICATION) << " trySyncRead failed with: "
323
- << err.message ();
324
- return false ;
325
- }
320
+ if (!err) {
321
+ return true ;
326
322
}
327
323
328
- return true ;
324
+ if (err != asio_ns::error::would_block && err != asio_ns::error::try_again) {
325
+ LOG_TOPIC (DEBUG, Logger::COMMUNICATION) << " trySyncRead failed with: " << err.message ();
326
+ }
327
+
328
+ return false ;
329
329
}
330
330
331
331
// must run in strand
@@ -379,39 +379,44 @@ void SocketTask::asyncReadSome() {
379
379
TRI_ASSERT (_peer != nullptr );
380
380
TRI_ASSERT (_peer->strand .running_in_this_thread ());
381
381
382
- try {
383
- size_t const MAX_DIRECT_TRIES = 2 ;
384
- size_t n = 0 ;
385
-
386
- while (++n <= MAX_DIRECT_TRIES &&
387
- !_abandoned.load (std::memory_order_acquire)) {
388
- if (!trySyncRead ()) {
389
- if (n < MAX_DIRECT_TRIES) {
390
- std::this_thread::yield ();
382
+ if (this ->canUseMixedIO ()) {
383
+ // try some direct reads only for non-SSL mode
384
+ // in SSL mode it will fall apart when mixing direct reads and async
385
+ // reads later
386
+ try {
387
+ size_t const MAX_DIRECT_TRIES = 2 ;
388
+ size_t n = 0 ;
389
+
390
+ while (++n <= MAX_DIRECT_TRIES &&
391
+ !_abandoned.load (std::memory_order_acquire)) {
392
+ if (!trySyncRead ()) {
393
+ if (n < MAX_DIRECT_TRIES) {
394
+ std::this_thread::yield ();
395
+ }
396
+ continue ;
391
397
}
392
- continue ;
393
- }
394
398
395
- if (_abandoned.load (std::memory_order_acquire)) {
396
- return ;
399
+ if (_abandoned.load (std::memory_order_acquire)) {
400
+ return ;
401
+ }
402
+
403
+ // ignore the result of processAll, try to read more bytes down below
404
+ processAll ();
405
+ compactify ();
397
406
}
407
+ } catch (asio_ns::system_error const & err) {
408
+ LOG_TOPIC (DEBUG, Logger::COMMUNICATION) << " sync read failed with: "
409
+ << err.what ();
410
+ closeStreamNoLock ();
411
+ return ;
412
+ } catch (...) {
413
+ LOG_TOPIC (DEBUG, Logger::COMMUNICATION) << " general error on stream" ;
398
414
399
- // ignore the result of processAll, try to read more bytes down below
400
- processAll ();
401
- compactify ();
415
+ closeStreamNoLock ();
416
+ return ;
402
417
}
403
- } catch (asio_ns::system_error const & err) {
404
- LOG_TOPIC (DEBUG, Logger::COMMUNICATION) << " sync read failed with: "
405
- << err.what ();
406
- closeStreamNoLock ();
407
- return ;
408
- } catch (...) {
409
- LOG_TOPIC (DEBUG, Logger::COMMUNICATION) << " general error on stream" ;
410
-
411
- closeStreamNoLock ();
412
- return ;
413
418
}
414
-
419
+
415
420
// try to read more bytes
416
421
if (_abandoned.load (std::memory_order_acquire)) {
417
422
return ;
@@ -461,54 +466,72 @@ void SocketTask::asyncWriteSome() {
461
466
if (_writeBuffer.empty ()) {
462
467
return ;
463
468
}
469
+
470
+ TRI_ASSERT (_writeBuffer._buffer != nullptr );
464
471
size_t total = _writeBuffer._buffer ->length ();
465
472
size_t written = 0 ;
466
473
467
474
TRI_ASSERT (!_abandoned);
468
- TRI_ASSERT (_peer != nullptr );
469
475
470
476
asio_ns::error_code err;
471
- err.clear ();
472
- while (true ) {
473
- RequestStatistics::SET_WRITE_START (_writeBuffer._statistics );
474
- written = _peer->writeSome (_writeBuffer._buffer , err);
477
+
478
+ if (this ->canUseMixedIO ()) {
479
+ // try some direct writes only for non-SSL mode
480
+ // in SSL mode it will fall apart when mixing direct writes and async
481
+ // writes later
482
+ while (true ) {
483
+ TRI_ASSERT (_writeBuffer._buffer != nullptr );
484
+
485
+ // we can directly skip sending empty buffers
486
+ if (_writeBuffer._buffer ->length () > 0 ) {
487
+ RequestStatistics::SET_WRITE_START (_writeBuffer._statistics );
488
+ written = _peer->writeSome (_writeBuffer._buffer , err);
489
+
490
+ RequestStatistics::ADD_SENT_BYTES (_writeBuffer._statistics , written);
491
+
492
+ if (err || written != total) {
493
+ // unable to write everything at once, might be a lot of data
494
+ // above code does not update the buffer positon
495
+ break ;
496
+ }
475
497
476
- if (err) {
477
- break ;
478
- }
498
+ TRI_ASSERT (written > 0 );
499
+ }
479
500
480
- RequestStatistics::ADD_SENT_BYTES (_writeBuffer._statistics , written);
501
+ if (!completedWriteBuffer ()) {
502
+ return ;
503
+ }
481
504
482
- if (written != total) {
483
- // unable to write everything at once, might be a lot of data
484
- // above code does not update the buffer positon
485
- break ;
505
+ // try to send next buffer
506
+ TRI_ASSERT (_writeBuffer._buffer != nullptr );
507
+ total = _writeBuffer._buffer ->length ();
486
508
}
487
509
488
- if (!completedWriteBuffer ()) {
510
+ // write could have blocked which is the only acceptable error
511
+ if (err && err != asio_ns::error::would_block && err != asio_ns::error::try_again) {
512
+ LOG_TOPIC (DEBUG, Logger::COMMUNICATION) << " sync write on stream failed with: "
513
+ << err.message ();
514
+ closeStreamNoLock ();
489
515
return ;
490
516
}
517
+ } // !_peer->isEncrypted
491
518
492
- // try to send next buffer
493
- total = _writeBuffer._buffer ->length ();
494
- written = 0 ;
495
- }
496
-
497
- // write could have blocked which is the only acceptable error
498
- if (err && err != ::asio_ns::error::would_block) {
499
- LOG_TOPIC (DEBUG, Logger::COMMUNICATION) << " sync write on failed with: "
500
- << err.message ();
501
- closeStreamNoLock ();
502
- return ;
503
- }
519
+ // we will be getting here in the following cases
520
+ // - encrypted mode (SSL)
521
+ // - we send only parts of the write buffer, but have more to send
522
+ // - we got the error would_block/try_again when sending data
523
+ // in this case we dispatch an async write
504
524
505
525
if (_abandoned.load (std::memory_order_acquire)) {
506
526
return ;
507
527
}
508
-
528
+
529
+ TRI_ASSERT (_writeBuffer._buffer != nullptr );
530
+
509
531
// so the code could have blocked at this point or not all data
510
532
// was written in one go, begin writing at offset (written)
511
533
auto self = shared_from_this ();
534
+
512
535
_peer->asyncWrite (
513
536
asio_ns::buffer (_writeBuffer._buffer ->begin () + written, total - written),
514
537
[self, this ](const asio_ns::error_code& ec, std::size_t transferred) {
@@ -517,7 +540,8 @@ void SocketTask::asyncWriteSome() {
517
540
518
541
if (_abandoned.load (std::memory_order_acquire)) {
519
542
return ;
520
- } else if (ec) {
543
+ }
544
+ if (ec) {
521
545
LOG_TOPIC (DEBUG, Logger::COMMUNICATION) << " write on failed with: "
522
546
<< ec.message ();
523
547
closeStream ();
0 commit comments