@@ -91,6 +91,7 @@ MessageID VstConnection<ST>::sendRequest(std::unique_ptr<Request> req,
91
91
this ->startConnection ();
92
92
} else if (state == Connection::State::Failed) {
93
93
FUERTE_LOG_ERROR << " queued request on failed connection\n " ;
94
+ drainQueue (fuerte::Error::ConnectionClosed);
94
95
}
95
96
return mid;
96
97
}
@@ -118,15 +119,13 @@ void VstConnection<ST>::finishConnect() {
118
119
auto self = Connection::shared_from_this ();
119
120
asio_ns::async_write (
120
121
this ->_protocol .socket , asio_ns::buffer (vstHeader, strlen (vstHeader)),
121
- [self](asio_ns::error_code const & ec, std::size_t transferred ) {
122
+ [self](asio_ns::error_code const & ec, std::size_t nsend ) {
122
123
auto * thisPtr = static_cast <VstConnection<ST>*>(self.get ());
123
124
if (ec) {
124
125
FUERTE_LOG_ERROR << ec.message () << " \n " ;
125
- thisPtr->shutdownConnection (Error::CouldNotConnect);
126
+ thisPtr->shutdownConnection (Error::CouldNotConnect,
127
+ " unable to connect: " + ec.message ());
126
128
thisPtr->drainQueue (Error::CouldNotConnect);
127
- thisPtr->onFailure (
128
- Error::CouldNotConnect,
129
- " unable to initialize connection: error=" + ec.message ());
130
129
return ;
131
130
}
132
131
FUERTE_LOG_CALLBACKS << " VST connection established\n " ;
@@ -150,22 +149,23 @@ void VstConnection<ST>::sendAuthenticationRequest() {
150
149
auto item = std::make_shared<RequestItem>();
151
150
item->_messageID = vstMessageId.fetch_add (1 , std::memory_order_relaxed);
152
151
item->_expires = std::chrono::steady_clock::now () + Request::defaultTimeout;
153
- item->_request = nullptr ; // should not break anything
154
-
155
152
auto self = Connection::shared_from_this ();
156
153
item->_callback = [self](Error error, std::unique_ptr<Request>,
157
154
std::unique_ptr<Response> resp) {
155
+ auto * thisPtr = static_cast <VstConnection<ST>*>(self.get ());
158
156
if (error != Error::NoError || resp->statusCode () != StatusOK) {
159
- auto * thisPtr = static_cast <VstConnection<ST>*>(self.get ());
160
157
thisPtr->_state .store (Connection::State::Failed,
161
158
std::memory_order_release);
162
- thisPtr->shutdownConnection (Error::CouldNotConnect);
163
- thisPtr->onFailure (error, " authentication failed" );
159
+ thisPtr->shutdownConnection (Error::VstUnauthorized,
160
+ " could not authenticate" );
161
+ thisPtr->drainQueue (Error::VstUnauthorized);
162
+ } else {
163
+ thisPtr->_state .store (Connection::State::Connected);
164
+ thisPtr->startWriting ();
164
165
}
165
166
};
166
167
167
168
_messageStore.add (item); // add message to store
168
- setTimeout (); // set request timeout
169
169
170
170
if (this ->_config ._authenticationType == AuthenticationType::Basic) {
171
171
vst::message::authBasic (this ->_config ._user , this ->_config ._password ,
@@ -176,25 +176,23 @@ void VstConnection<ST>::sendAuthenticationRequest() {
176
176
assert (item->_buffer .size () < defaultMaxChunkSize);
177
177
178
178
// actually send auth request
179
- asio_ns::post (*this ->_io_context , [this , self, item] {
180
- auto cb = [self, item, this ](asio_ns::error_code const & ec,
181
- std::size_t transferred) {
182
- if (ec) {
183
- asyncWriteCallback (ec, transferred, std::move (item)); // error handling
184
- return ;
185
- }
186
- this ->_state .store (Connection::State::Connected,
187
- std::memory_order_release);
188
- asyncWriteCallback (ec, transferred,
189
- std::move (item)); // calls startReading()
190
- startWriting (); // start writing if something was queued
191
- };
192
- std::vector<asio_ns::const_buffer> buffers;
193
- vst::message::prepareForNetwork (
194
- _vstVersion, item->messageID (), item->_buffer ,
195
- /* payload*/ asio_ns::const_buffer (), buffers);
196
- asio_ns::async_write (this ->_protocol .socket , buffers, std::move (cb));
197
- });
179
+ auto cb = [this , self, item](asio_ns::error_code const & ec,
180
+ std::size_t nsend) {
181
+ if (ec) {
182
+ this ->_state .store (Connection::State::Failed);
183
+ this -> shutdownConnection (Error::CouldNotConnect,
184
+ " authorization message failed" );
185
+ this ->drainQueue (Error::CouldNotConnect);
186
+ } else {
187
+ asyncWriteCallback (ec, nsend, item);
188
+ }
189
+ };
190
+ std::vector<asio_ns::const_buffer> buffers;
191
+ vst::message::prepareForNetwork (_vstVersion, item->messageID (), item->_buffer ,
192
+ /* payload*/ asio_ns::const_buffer (), buffers);
193
+ asio_ns::async_write (this ->_protocol .socket , buffers, std::move (cb));
194
+
195
+ setTimeout ();
198
196
}
199
197
200
198
// ------------------------------------
@@ -268,7 +266,7 @@ void VstConnection<ST>::asyncWriteNextRequest() {
268
266
// callback of async_write function that is called in sendNextRequest.
269
267
template <SocketType ST>
270
268
void VstConnection<ST>::asyncWriteCallback(asio_ns::error_code const & ec,
271
- std::size_t transferred ,
269
+ std::size_t nsend ,
272
270
std::shared_ptr<RequestItem> item) {
273
271
// auto pendingAsyncCalls = --_connection->_async_calls;
274
272
if (ec) {
@@ -288,7 +286,7 @@ void VstConnection<ST>::asyncWriteCallback(asio_ns::error_code const& ec,
288
286
}
289
287
// Send succeeded
290
288
FUERTE_LOG_CALLBACKS << " asyncWriteCallback (vst): send succeeded, "
291
- << transferred << " bytes transferred \n " ;
289
+ << nsend << " bytes send \n " ;
292
290
293
291
// request is written we no longer need data for that
294
292
item->resetSendData ();
@@ -389,8 +387,8 @@ void VstConnection<ST>::asyncReadCallback(asio_ns::error_code const& ec) {
389
387
if (parser::ChunkState::Incomplete == state) {
390
388
break ;
391
389
} else if (parser::ChunkState::Invalid == state) {
392
- FUERTE_LOG_ERROR << " Invalid VST chunk " ;
393
- this -> shutdownConnection (Error::ProtocolError );
390
+ this -> shutdownConnection (Error::ProtocolError,
391
+ " Invalid VST chunk " );
394
392
return ;
395
393
}
396
394
@@ -496,12 +494,12 @@ std::unique_ptr<fu::Response> VstConnection<ST>::createResponse(
496
494
// adjust the timeouts (only call from IO-Thread)
497
495
template <SocketType ST>
498
496
void VstConnection<ST>::setTimeout() {
499
- asio_ns::error_code ec;
500
- this ->_timeout .cancel (ec);
501
- if (ec) {
502
- FUERTE_LOG_ERROR << " error on timeout cancel: " << ec.message ();
503
- return ; // bail out
504
- }
497
+ // asio_ns::error_code ec;
498
+ // this->_timeout.cancel(ec);
499
+ // if (ec) {
500
+ // FUERTE_LOG_ERROR << "error on timeout cancel: " << ec.message();
501
+ // return; // bail out
502
+ // }
505
503
506
504
// set to smallest point in time
507
505
auto expires = std::chrono::steady_clock::time_point::max ();
0 commit comments