8000 Fuerte simplify by graetzer · Pull Request #12270 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Fuerte simplify #12270

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Aug 24, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
2c04571
Simplify fuerte code
graetzer Jun 26, 2020
24c1529
fix #warning
graetzer Jun 26, 2020
3412930
Merge branch 'devel' of github.com:arangodb/arangodb into feature/fue…
graetzer Jun 29, 2020
002412b
change defaults
graetzer Jun 29, 2020
0a9ac62
slight fixes
graetzer Jun 30, 2020
95e2e4b
slight adjustments
graetzer Jul 1, 2020
9aa85fa
Starting HTTP/2 with Prior Knowledge
graetzer Jul 3, 2020
efa00a3
make consistent
graetzer Jul 3, 2020
46cb4a8
Merge branch 'feature/http2-prior-knowledge' into feature/fuerte-simp…
graetzer Jul 3, 2020
528a0f6
fix some requests
graetzer Jul 8, 2020
3fa1586
fix architectire
graetzer Jul 13, 2020
da6c3e4
Merge branch 'devel' of github.com:arangodb/arangodb into feature/fue…
graetzer Jul 13, 2020
a5e1da2
fix stuff
graetzer Jul 13, 2020
1426f9b
Properly cancel broken connections
graetzer Jul 14, 2020
e070136
remove extra log statements
graetzer Jul 14, 2020
05c578a
Merge branch 'devel' of github.com:arangodb/arangodb into feature/fue…
graetzer Jul 21, 2020
931c22e
remove unused code
graetzer Jul 21, 2020
a7bd312
some changes to vst
graetzer Jul 21, 2020
56e3a53
adjust TLA+ pluscal models
graetzer Jul 22, 2020
50da56c
Merge branch 'devel' of github.com:arangodb/arangodb into feature/fue…
graetzer Jul 22, 2020
b9208ea
fix some stuff
graetzer Jul 22, 2020
088a759
fix mistake
graetzer Jul 22, 2020
92a050f
fixing VST connection
graetzer Jul 23, 2020
67faddb
Merge remote-tracking branch 'origin/devel' into feature/fuerte-simplify
neunhoef Aug 20, 2020
b25c331
Correct mode: we call shutdownConnection when it is time.
neunhoef Aug 21, 2020
439abaa
File no longer necessary.
neunhoef Aug 21, 2020
17df268
Comment fixes.
neunhoef Aug 21, 2020
e4e9d4d
Add a forgotten alarm cancellation to model.
neunhoef Aug 21, 2020
092e27c
Fix cancellation in model.
neunhoef Aug 21, 2020
57e1777
Take latest transaction.
neunhoef Aug 21, 2020
d09a0a8
This is a sub-PR to suggest some changes. (#12495)
neunhoef Aug 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8000
Prev Previous commit
Next Next commit
fix architectire
  • Loading branch information
graetzer committed Jul 13, 2020
commit 3fa158635b63219a1283f461ffab99c9b39b471c
6 changes: 6 additions & 0 deletions 3rdParty/fuerte/include/fuerte/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ class ConnectionBuilder {
return *this;
}

/// @brief use an idle timeout
ConnectionBuilder& useIdleTimeout(bool t) {
_conf._useIdleTimeout = t;
return *this;
}

/// @brief connect retry pause (1s default)
inline std::chrono::milliseconds connectRetryPause() const {
return _conf._connectRetryPause;
Expand Down
2 changes: 2 additions & 0 deletions 3rdParty/fuerte/include/fuerte/helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,7 @@ void toLowerInPlace(std::string& str);
/// checks if connection was closed and returns
fuerte::Error translateError(asio_ns::error_code e,
fuerte::Error def);

std::string extractPathParameters(std::string const& path, StringMap& params);
}}} // namespace arangodb::fuerte::v1
#endif
2 changes: 2 additions & 0 deletions 3rdParty/fuerte/include/fuerte/types.h
< 8000 td id="diff-be343db5f75f90ead8ffec0283dae0a6f0e7a2f3715e3673187eb281005414d8R191" data-line-number="191" class="blob-num blob-num-context js-linkable-line-number js-blob-rnum">
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ struct ConnectionConfiguration {
_idleTimeout(300000),
_connectRetryPause(1000),
_maxConnectRetries(3),
_useIdleTimeout(true),
_authenticationType(AuthenticationType::None),
_user(""),
_password(""),
Expand All @@ -208,6 +209,7 @@ struct ConnectionConfiguration {
std::chrono::milliseconds _idleTimeout;
std::chrono::milliseconds _connectRetryPause;
unsigned _maxConnectRetries;
bool _useIdleTimeout;

AuthenticationType _authenticationType;
std::string _user;
Expand Down
105 changes: 57 additions & 48 deletions 3rdParty/fuerte/src/GeneralConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,52 @@ class GeneralConnection : public fuerte::Connection {
}

/// The following public methods can be called from any thread:

// Start an asynchronous request.
void sendRequest(std::unique_ptr<Request> req,
RequestCallback cb) override {
// construct RequestItem
auto item = this->createRequest(std::move(req), std::move(cb));
// set the point-in-time when this request ex 8000 pires
if (item->request->timeout().count() > 0) {
item->expires = std::chrono::steady_clock::now() + item->request->timeout();
} else {
item->expires = std::chrono::steady_clock::time_point::max();
}

// Don't send once in Closed state:
if (this->_state.load(std::memory_order_relaxed) == Connection::State::Closed) {
FUERTE_LOG_ERROR << "connection already failed\n";
item->invokeOnError(Error::Canceled);
return;
}

// Prepare a new request
this->_numQueued.fetch_add(1, std::memory_order_relaxed);
if (!this->_queue.push(item.get())) {
FUERTE_LOG_ERROR << "connection queue capacity exceeded\n";
uint32_t q = this->_numQueued.fetch_sub(1, std::memory_order_relaxed);
FUERTE_ASSERT(q > 0);
item->invokeOnError(Error::QueueCapacityExceeded);
return;
}
item.release(); // queue owns this now

FUERTE_LOG_HTTPTRACE << "queued item: this=" << this << "\n";

// Note that we have first posted on the queue with std::memory_order_seq_cst
// and now we check _active std::memory_order_seq_cst. This prevents a sleeping
// barber with the check-set-check combination in `asyncWriteNextRequest`.
// If we are the ones to exchange the value to `true`, then we post
// on the `_io_context` to activate the connection. Note that the
// connection can be in the `Disconnected` or `Connected` or `Failed`
// state, but not in the `Connecting` state in this case.
if (!this->_active.exchange(true)) {
this->_io_context->post([self(Connection::shared_from_this())] {
static_cast<GeneralConnection<ST, RT>&>(*self).activate();
});
}
}

/// @brief cancel the connection, unusable afterwards
void cancel() override {
Expand Down Expand Up @@ -106,8 +152,12 @@ class GeneralConnection : public fuerte::Connection {
FUERTE_LOG_DEBUG << ", msg = '" << msg<< "' ";
}
FUERTE_LOG_DEBUG<< "this=" << this << "\n";

_state.store(Connection::State::Closed);

auto exp = _state.load();
if (exp == Connection::State::Closed ||
!_state.compare_exchange_strong(exp, Connection::State::Closed)) {
return;
}

abortOngoingRequests(err);
drainQueue(err);
Expand All @@ -130,6 +180,7 @@ class GeneralConnection : public fuerte::Connection {
auto mutableBuff = _receiveBuffer.prepare(READ_BLOCK_SIZE);

_reading = true;
setIOTimeout(); // must be after setting _reading
_proto.socket.async_read_some(mutableBuff, [self = shared_from_this()]
(auto const& ec, size_t nread) {
FUERTE_LOG_TRACE << "received " << nread << " bytes\n";
Expand All @@ -154,42 +205,6 @@ class GeneralConnection : public fuerte::Connection {
}
}

/// Set timeout accordingly
void setTimeout() {
bool isIdle = false;
auto const timepoint = getTimeout(isIdle);
if (timepoint.time_since_epoch().count() == 0) {
this->_proto.timer.cancel();
return; // danger zone
}

_timeoutOnReadWrite = false;

// expires_after cancels pending ops
this->_proto.timer.expires_at(timepoint);
this->_proto.timer.async_wait(
[isIdle, self = Connection::weak_from_this()](auto const& ec) {
std::shared_ptr<Connection> s;
if (ec || !(s = self.lock())) { // was canceled / deallocated
return;
}
auto& me = static_cast<GeneralConnection<ST, RT>&>(*s);
me.abortExpiredRequests();

if (!isIdle && (me._writing || me._reading)) {
me._timeoutOnReadWrite = true;
me._proto.cancel();
// We simply cancel all ongoing asynchronous operations,
// completion handlers will do the rest.
return;
} else if (isIdle) {
me.handleIdleTimeout();
}
// In all other cases we do nothing, since we have been posted to the
// iocontext but the thing we should be timing out has already completed.
});
}

void activate() {
Connection::State state = this->_state.load();
FUERTE_ASSERT(state != Connection::State::Connecting);
Expand Down Expand Up @@ -243,17 +258,11 @@ class GeneralConnection : public fuerte::Connection {

/// abort ongoing / unfinished requests (locally)
virtual void abortOngoingRequests(const fuerte::Error) = 0;
// abort all expired requests
virtual void abortExpiredRequests() = 0;
// calculate smallest timeout
virtual std::chrono::steady_clock::time_point getTimeout(bool& isIdle) = 0;
// subclasses may override this for a gracefuly shutdown

virtual void setIOTimeout() = 0;

virtual void handleIdleTimeout() {
if (!_active.load() && _state.load() == Connection::State::Connected) {
shutdownConnection(Error::CloseRequested);
}
}
virtual std::unique_ptr<RT> createRequest(std::unique_ptr<Request>&& req,
RequestCallback&& cb) = 0;

private:

Expand Down
Loading
0