@@ -119,7 +119,7 @@ bool SocketTask::start() {
119
119
120
120
if (_closeRequested.load (std::memory_order_acquire)) {
121
121
LOG_TOPIC (DEBUG, Logger::COMMUNICATION)
122
- << " cannot start, close alread in progress" ;
122
+ << " cannot start, close already in progress" ;
123
123
return false ;
124
124
}
125
125
@@ -191,6 +191,7 @@ void SocketTask::closeStream() {
191
191
if (_abandoned.load (std::memory_order_acquire)) {
192
192
return ;
193
193
}
194
+
194
195
// strand::dispatch may execute this immediately if this
195
196
// is called on a thread inside the same strand
196
197
auto self = shared_from_this ();
@@ -204,7 +205,7 @@ void SocketTask::closeStream() {
204
205
void SocketTask::closeStreamNoLock () {
205
206
TRI_ASSERT (_peer != nullptr );
206
207
TRI_ASSERT (_peer->runningInThisThread ());
207
-
208
+
208
209
bool mustCloseSend = !_closedSend.load (std::memory_order_acquire);
209
210
bool mustCloseReceive = !_closedReceive.load (std::memory_order_acquire);
210
211
@@ -291,6 +292,7 @@ bool SocketTask::trySyncRead() {
291
292
292
293
asio_ns::error_code err;
293
294
TRI_ASSERT (_peer != nullptr );
295
+
294
296
if (0 == _peer->available (err)) {
295
297
return false ;
296
298
}
@@ -315,17 +317,15 @@ bool SocketTask::trySyncRead() {
315
317
316
318
_readBuffer.increaseLength (bytesRead);
317
319
318
- if (err) {
319
- if (err == asio_ns::error::would_block) {
320
- return false ;
321
- } else {
322
- LOG_TOPIC (DEBUG, Logger::COMMUNICATION) << " trySyncRead failed with: "
323
- << err.message ();
324
- return false ;
325
- }
320
+ if (!err) {
321
+ return true ;
326
322
}
327
323
328
- return true ;
324
+ if (err != asio_ns::error::would_block && err != asio_ns::error::try_again) {
325
+ LOG_TOPIC (DEBUG, Logger::COMMUNICATION) << " trySyncRead failed with: " << err.message ();
326
+ }
327
+
328
+ return false ;
329
329
}
330
330
331
331
// caller must hold the _lock
@@ -379,43 +379,49 @@ void SocketTask::asyncReadSome() {
379
379
TRI_ASSERT (_peer != nullptr );
380
380
TRI_ASSERT (_peer->runningInThisThread ());
381
381
382
- try {
383
- size_t const MAX_DIRECT_TRIES = 2 ;
384
- size_t n = 0 ;
385
-
386
- while (++n <= MAX_DIRECT_TRIES &&
387
- !_abandoned.load (std::memory_order_acquire)) {
388
- if (!trySyncRead ()) {
389
- if (n < MAX_DIRECT_TRIES) {
390
- std::this_thread::yield ();
382
+ if (this ->canUseMixedIO ()) {
383
+ // try some direct read only for non-SSL mode
384
+ // in SSL mode it will fall apart when mixing direct reads and async
385
+ // reads later
386
+ try {
387
+ size_t const MAX_DIRECT_TRIES = 2 ;
388
+ size_t n = 0 ;
389
+
390
+ while (++n <= MAX_DIRECT_TRIES &&
391
+ !_abandoned.load (std::memory_order_acquire)) {
392
+ if (!trySyncRead ()) {
393
+ if (n < MAX_DIRECT_TRIES) {
394
+ std::this_thread::yield ();
395
+ }
396
+ continue ;
391
397
}
392
- continue ;
393
- }
394
398
395
- if (_abandoned.load (std::memory_order_acquire)) {
396
- return ;
399
+ if (_abandoned.load (std::memory_order_acquire)) {
400
+ return ;
401
+ }
402
+
403
+ // ignore the result of processAll, try to read more bytes down below
404
+ processAll ();
405
+ compactify ();
397
406
}
407
+ } catch (asio_ns::system_error const & err) {
408
+ LOG_TOPIC (DEBUG, Logger::COMMUNICATION) << " sync read failed with: "
409
+ << err.what ();
410
+ closeStreamNoLock ();
411
+ return ;
412
+ } catch (...) {
413
+ LOG_TOPIC (DEBUG, Logger::COMMUNICATION) << " general error on stream" ;
398
414
399
- // ignore the result of processAll, try to read more bytes down below
400
- processAll ();
401
- compactify ();
415
+ closeStreamNoLock ();
416
+ return ;
402
417
}
403
- } catch (asio_ns::system_error const & err) {
404
- LOG_TOPIC (DEBUG, Logger::COMMUNICATION) << " sync read failed with: "
405
- << err.what ();
406
- closeStreamNoLock ();
407
- return ;
408
- } catch (...) {
409
- LOG_TOPIC (DEBUG, Logger::COMMUNICATION) << " general error on stream" ;
410
-
411
- closeStreamNoLock ();
412
- return ;
413
418
}
414
-
419
+
415
420
// try to read more bytes
416
421
if (_abandoned.load (std::memory_order_acquire)) {
417
422
return ;
418
- } else if (!reserveMemory ()) {
423
+ }
424
+ if (!reserveMemory ()) {
419
425
LOG_TOPIC (TRACE, Logger::COMMUNICATION) << " failed to reserve memory" ;
420
426
return ;
421
427
}
@@ -460,54 +466,72 @@ void SocketTask::asyncWriteSome() {
460
466
if (_writeBuffer.empty ()) {
461
467
return ;
462
468
}
469
+
470
+ TRI_ASSERT (_writeBuffer._buffer != nullptr );
463
471
size_t total = _writeBuffer._buffer ->length ();
464
472
size_t written = 0 ;
465
473
466
474
TRI_ASSERT (!_abandoned);
467
- TRI_ASSERT (_peer != nullptr );
468
475
469
476
asio_ns::error_code err;
470
- err.clear ();
471
- while (true ) {
472
- RequestStatistics::SET_WRITE_START (_writeBuffer._statistics );
473
- written = _peer->writeSome (_writeBuffer._buffer , err);
477
+
478
+ if (this ->canUseMixedIO ()) {
479
+ // try some direct writes only for non-SSL mode
480
+ // in SSL mode it will fall apart when mixing direct writes and async
481
+ // writes later
482
+ while (true ) {
483
+ TRI_ASSERT (_writeBuffer._buffer != nullptr );
484
+
485
+ // we can directly skip sending empty buffers
486
+ if (_writeBuffer._buffer ->length () > 0 ) {
487
+ RequestStatistics::SET_WRITE_START (_writeBuffer._statistics );
488
+ written = _peer->writeSome (_writeBuffer._buffer , err);
489
+
490
+ RequestStatistics::ADD_SENT_BYTES (_writeBuffer._statistics , written);
491
+
492
+ if (err || written != total) {
493
+ // unable to write everything at once, might be a lot of data
494
+ // above code does not update the buffer positon
495
+ break ;
496
+ }
474
497
475
- if (err) {
476
- break ;
477
- }
498
+ TRI_ASSERT (written > 0 );
499
+ }
478
500
479
- RequestStatistics::ADD_SENT_BYTES (_writeBuffer._statistics , written);
501
+ if (!completedWriteBuffer ()) {
502
+ return ;
503
+ }
480
504
481
- if (written != total) {
482
- // unable to write everything at once, might be a lot of data
483
- // above code does not update the buffer positon
484
- break ;
505
+ // try to send next buffer
506
+ TRI_ASSERT (_writeBuffer._buffer != nullptr );
507
+ total = _writeBuffer._buffer ->length ();
485
508
}
486
509
487
- if (!completedWriteBuffer ()) {
510
+ // write could have blocked which is the only acceptable error
511
+ if (err && err != asio_ns::error::would_block && err != asio_ns::error::try_again) {
512
+ LOG_TOPIC (DEBUG, Logger::COMMUNICATION) << " sync write on stream failed with: "
513
+ << err.message ();
514
+ closeStreamNoLock ();
488
515
return ;
489
516
}
517
+ } // !_peer->isEncrypted
490
518
491
- // try to send next buffer
492
- total = _writeBuffer._buffer ->length ();
493
- written = 0 ;
494
- }
495
-
496
- // write could have blocked which is the only acceptable error
497
- if (err && err != ::asio_ns::error::would_block) {
498
- LOG_TOPIC (DEBUG, Logger::COMMUNICATION) << " sync write on failed with: "
499
- << err.message ();
500
- closeStreamNoLock ();
501
- return ;
502
- }
519
+ // we will be getting here in the following cases
520
+ // - encrypted mode (SSL)
521
+ // - we send only parts of the write buffer, but have more to send
522
+ // - we got the error would_block/try_again when sending data
523
+ // in this case we dispatch an async write
503
524
504
525
if (_abandoned.load (std::memory_order_acquire)) {
505
526
return ;
506
527
}
507
-
528
+
529
+ TRI_ASSERT (_writeBuffer._buffer != nullptr );
530
+
508
531
// so the code could have blocked at this point or not all data
509
532
// was written in one go, begin writing at offset (written)
510
533
auto self = shared_from_this ();
534
+
511
535
_peer->asyncWrite (
512
536
asio_ns::buffer (_writeBuffer._buffer ->begin () + written, total - written),
513
537
[self, this ](const asio_ns::error_code& ec, std::size_t transferred) {
@@ -516,7 +540,8 @@ void SocketTask::asyncWriteSome() {
516
540
517
541
if (_abandoned.load (std::memory_order_acquire)) {
518
542
return ;
519
- } else if (ec) {
543
+ }
544
+ if (ec) {
520
545
LOG_TOPIC (DEBUG, Logger::COMMUNICATION) << " write on failed with: "
521
546
<< ec.message ();
522
547
closeStream ();
@@ -527,11 +552,9 @@ void SocketTask::asyncWriteSome() {
527
552
transferred);
528
553
529
554
if (completedWriteBuffer ()) {
530
- _peer->post ([self, this ] {
531
- if (!_abandoned.load (std::memory_order_acquire)) {
532
- asyncWriteSome ();
533
- }
534
- });
555
+ if (!_abandoned.load (std::memory_order_acquire)) {
556
+ asyncWriteSome ();
557
+ }
535
558
}
536
559
});
537
560
}
0 commit comments