8000 start searching for the local port of a socket by dothebart · Pull Request #21833 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

start searching for the local port of a socket #21833

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jul 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions 3rdParty/fuerte/include/fuerte/asio_ns.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include <asio/steady_timer.hpp>
#include <asio/streambuf.hpp>
#include <asio/write.hpp>
#include <asio/detail/socket_ops.hpp>

namespace asio_ns = asio;

Expand All @@ -57,6 +58,7 @@ namespace asio_ns = asio;
#include <boost/asio/ssl.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/asio/detail/socket_ops.hpp>

namespace boost { namespace asio {
using error_code = boost::system::error_code;
Expand Down
3 changes: 3 additions & 0 deletions 3rdParty/fuerte/include/fuerte/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ class Connection : public std::enable_shared_from_this<Connection> {
/// @brief endpoint we are connected to
std::string endpoint() const;

/// @brief endpoint which we connect from
virtual std::string localEndpoint() = 0;

protected:
Connection(detail::ConnectionConfiguration const& conf) : _config(conf) {}

Expand Down
4 changes: 2 additions & 2 deletions 3rdParty/fuerte/include/fuerte/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ struct ConnectionConfiguration {
_maxConnectRetries(3),
#ifdef ARANGODB_USE_GOOGLE_TESTS
_failConnectAttempts(0),
#endif
#endif
_useIdleTimeout(true),
_authenticationType(AuthenticationType::None),
_user(""),
Expand All @@ -232,7 +232,7 @@ struct ConnectionConfiguration {
unsigned _maxConnectRetries;
#ifdef ARANGODB_USE_GOOGLE_TESTS
unsigned _failConnectAttempts;
#endif
#endif
bool _useIdleTimeout;

AuthenticationType _authenticationType;
Expand Down
67 changes: 45 additions & 22 deletions 3rdParty/fuerte/src/AsioSockets.h
9E81
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ template <typename SocketT, typename F, typename IsAbortedCb>
void resolveConnect(detail::ConnectionConfiguration const& config,
asio_ns::ip::tcp::resolver& resolver, SocketT& socket,
F&& done, IsAbortedCb&& isAborted) {
auto cb = [&socket,
auto cb = [&socket,
#ifdef ARANGODB_USE_GOOGLE_TESTS
fail = config._failConnectAttempts > 0,
#endif
#endif
done = std::forward<F>(done),
isAborted = std::forward<IsAbortedCb>(isAborted)](auto ec, auto it) mutable {
#ifdef ARANGODB_USE_GOOGLE_TESTS
Expand Down Expand Up @@ -90,6 +90,17 @@ void resolveConnect(detail::ConnectionConfiguration const& config,
resolver.async_resolve(config._host, config._port, std::move(cb));
#endif
}

template <typename SocketT>
std::string getConnectionNameS(SocketT& socket) {
boost::system::error_code ec;
asio_ns::ip::tcp::endpoint local_endpoint = socket.lowest_layer().local_endpoint(ec);

if (ec) {
return "Error getting local endpoint: " + ec.message();
}
return local_endpoint.address().to_string() + ":" + std::to_string(local_endpoint.port());
}
} // namespace

enum class ConnectTimerRole {
Expand All @@ -105,9 +116,9 @@ struct Socket<SocketType::Tcp> {
Socket(EventLoopService&, asio_ns::io_context& ctx)
: resolver(ctx), socket(ctx), timer(ctx) {}

~Socket() {
~Socket() {
try {
this->cancel();
this->cancel();
} catch (std::exception const& ex) {
FUERTE_LOG_ERROR << "caught exception during tcp socket shutdown: " << ex.what() << "\n";
}
Expand Down Expand Up @@ -136,13 +147,17 @@ struct Socket<SocketType::Tcp> {
});
}

std::string getConnectionName() {
return getConnectionNameS(socket);
}

bool isOpen() const {
return socket.is_open();
}

void rearm() {
canceled = false;
}
}

void cancel() {
canceled = true;
Expand Down Expand Up @@ -173,7 +188,7 @@ struct Socket<SocketType::Tcp> {
}
} catch (std::exception const& ex) {
// an exception is unlikely to occur here, as we are using the error-code
// variants of cancel/shutdown/close above
// variants of cancel/shutdown/close above
FUERTE_LOG_ERROR << "caught exception during tcp socket shutdown: " << ex.what() << "\n";
}
std::forward<F>(cb)(ec);
Expand All @@ -192,9 +207,9 @@ struct Socket<fuerte::SocketType::Ssl> {
: resolver(ctx), socket(ctx, loop.sslContext()), timer(ctx), ctx(ctx),
sslContext(loop.sslContext()), cleanupDone(false) {}

~Socket() {
~Socket() {
try {
this->cancel();
this->cancel();
} catch (std::exception const& ex) {
FUERTE_LOG_ERROR << "caught exception during ssl socket shutdown: " << ex.what() << "\n";
}
Expand Down Expand Up @@ -249,11 +264,15 @@ struct Socket<fuerte::SocketType::Ssl> {
}
socket.async_handshake(asio_ns::ssl::stream_base::client,
std::move(done));
}, [this]() {
return canceled;
}, [this]() {
return canceled;
});
}


std::string getConnectionName() {
return getConnectionNameS(socket);
}

bool isOpen() const {
return socket.lowest_layer().is_open();
}
Expand All @@ -263,7 +282,7 @@ struct Socket<fuerte::SocketType::Ssl> {
socket = asio_ns::ssl::stream<asio_ns::ip::tcp::socket>(this->ctx, this->sslContext);
canceled = false;
}

void cancel() {
canceled = true;
try {
Expand All @@ -287,18 +306,18 @@ struct Socket<fuerte::SocketType::Ssl> {
// socket is a member. This means that the allocation of the connection and
// this of the socket is kept until all asynchronous operations are completed
// (or aborted).

// ec is an out parameter here that is passed to the methods so they
// can fill in whatever error happened. we ignore it here anyway. we
// use the ec-variants of the methods here to prevent exceptions.
asio_ns::error_code ec;
asio_ns::error_code ec;

if (!socket.lowest_layer().is_open()) {
timer.cancel(ec);
std::forward<F>(cb)(ec);
return;
}

socket.lowest_layer().cancel(ec);
cleanupDone = false;
// implicitly cancels any previous timers
Expand Down Expand Up @@ -340,10 +359,10 @@ struct Socket<fuerte::SocketType::Unix> {
Socket(EventLoopService&, asio_ns::io_context& ctx)
: socket(ctx), timer(ctx) {}

~Socket() {
~Socket() {
canceled = true;
try {
this->cancel();
this->cancel();
} catch (std::exception const& ex) {
FUERTE_LOG_ERROR << "caught exception during unix socket shutdown: " << ex.what() << "\n";
}
Expand All @@ -356,19 +375,23 @@ struct Socket<fuerte::SocketType::Unix> {
done(asio_ns::error::operation_aborted);
return;
}

asio_ns::local::stream_protocol::endpoint ep(config._host);
socket.async_connect(ep, std::forward<F>(done));
}


std::string getConnectionName() {
return "no local connection name";
}

bool isOpen() const {
return socket.is_open();
}

void rearm() {
canceled = false;
}

void cancel() {
canceled = true;
try {
Expand All @@ -387,7 +410,7 @@ struct Socket<fuerte::SocketType::Unix> {
// ec is an out parameter here that is passed to the methods so they
// can fill in whatever error happened. we ignore it here anyway. we
// use the ec-variants of the methods here to prevent exceptions.
asio_ns::error_code ec;
asio_ns::error_code ec;
try {
timer.cancel(ec);
if (socket.is_open()) {
Expand All @@ -397,7 +420,7 @@ struct Socket<fuerte::SocketType::Unix> {
}
} catch (std::exception const& ex) {
// an exception is unlikely to occur here, as we are using the error-code
// variants of cancel/shutdown/close above
// variants of cancel/shutdown/close above
FUERTE_LOG_ERROR << "caught exception during unix socket shutdown: " << ex.what() << "\n";
}
std::forward<F>(cb)(ec);
Expand Down
10 changes: 7 additions & 3 deletions 3rdParty/fuerte/src/GeneralConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ class GeneralConnection : public fuerte::Connection {
return _state.load(std::memory_order_acquire);
}

virtual std::string localEndpoint() override final {
return _proto.getConnectionName();
}

/// The following public methods can be called from any thread:

// Start an asynchronous request.
Expand Down Expand Up @@ -349,7 +353,7 @@ class GeneralConnection : public fuerte::Connection {
if (me._failConnectAttempts > 0) {
--me._failConnectAttempts;
}
#endif
#endif

FUERTE_LOG_DEBUG << "tryConnect (" << retries << "), connecting failed: " << ec.message() << "\n";
if (retries > 1 && ec != asio_ns::error::operation_aborted) {
Expand Down Expand Up @@ -377,7 +381,7 @@ class GeneralConnection : public fuerte::Connection {
me.shutdownConnection(Error::CouldNotConnect, msg);
}
});

// only if we are still in the connect phase, we want to schedule a timer
// for the connect timeout. if the connect already failed and scheduled a
// timer for the reconnect timeout, we do not want to mess with the timer here.
Expand All @@ -394,7 +398,7 @@ class GeneralConnection : public fuerte::Connection {
if (me._proto.connectTimerRole == ConnectTimerRole::kConnect) {
FUERTE_LOG_DEBUG << "tryConnect, connect timeout this=" << self.get() << "\n";
me._proto.cancel();
}
}
}
});
}
Expand Down
1 change: 1 addition & 0 deletions 3rdParty/fuerte/src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,5 @@ std::string Connection::endpoint() const {
}
return endpoint;
}

}}} // namespace arangodb::fuerte::v1
21 changes: 14 additions & 7 deletions client-tools/Shell/V8ClientConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ std::shared_ptr<fu::Connection> V8ClientConnection::createConnection(
setCustomError(500, "unable to create connection");
LOG_TOPIC("9daaa", DEBUG, arangodb::Logger::HTTPCLIENT)
<< "Connection attempt to endpoint '" << _client.endpoint()
<< "' failed fatally";
<< "' failed fatally from: " << newConnection->localEndpoint();
return nullptr;
}

Expand Down Expand Up @@ -237,7 +237,8 @@ std::shared_ptr<fu::Connection> V8ClientConnection::createConnection(
setCustomError(_lastHttpReturnCode, errorMessage);
LOG_TOPIC("9daab", DEBUG, arangodb::Logger::HTTPCLIENT)
<< "Connection attempt to endpoint '" << _client.endpoint()
<< "' failed: " << errorMessage;
<< "' failed: " << errorMessage
<< " from: " << newConnection->localEndpoint();
return nullptr;
}
}
Expand All @@ -250,7 +251,8 @@ std::shared_ptr<fu::Connection> V8ClientConnection::createConnection(
setCustomError(503, msg);
LOG_TOPIC("9daac", DEBUG, arangodb::Logger::HTTPCLIENT)
<< "Connection attempt to endpoint '" << _client.endpoint()
<< "' failed: " << msg;
<< "' failed: " << msg
<< " from: " << newConnection->localEndpoint();
return nullptr;
}

Expand Down Expand Up @@ -303,7 +305,8 @@ std::shared_ptr<fu::Connection> V8ClientConnection::createConnection(
_currentConnectionId.erase();
LOG_TOPIC("9daad", DEBUG, arangodb::Logger::HTTPCLIENT)
<< "Connection attempt to endpoint '" << _client.endpoint()
<< "' failed: " << msg;
<< "' failed: " << msg
<< " from: " << newConnection->localEndpoint();
return nullptr;
} else {
newConnection = _builder.connect(_loop);
Expand Down Expand Up @@ -1423,7 +1426,8 @@ static void ClientConnection_httpFuzzRequests(
// during testing.
LOG_TOPIC("39e50", WARN, arangodb::Logger::FIXME)
<< "fuzzer producing " << numReqs << " requests(s) with " << numIts
<< " iteration(s) each, using seed " << fuzzer.getSeed();
<< " iteration(s) each, using seed " << fuzzer.getSeed()
<< " from: " << v8connection->getLocalEndpoint();
}
std::unordered_map<uint32_t, uint32_t> fuzzReturnCodesCount;

Expand Down Expand Up @@ -2812,6 +2816,7 @@ uint32_t V8ClientConnection::sendFuzzRequest(fuzzer::RequestFuzzer& fuzzer) {
return kFuzzNotConnected;
}

auto localEndpoint = getLocalEndpoint();
auto req = fuzzer.createRequest();
auto req_copy = *req;

Expand All @@ -2823,12 +2828,14 @@ uint32_t V8ClientConnection::sendFuzzRequest(fuzzer::RequestFuzzer& fuzzer) {
rc = ec;
if (rc != fu::Error::NoError) {
LOG_TOPIC("39e53", WARN, arangodb::Logger::FIXME)
<< "rc: " << static_cast<uint32_t>(rc);
<< "rc: " << static_cast<uint32_t>(rc)
<< " from: " << getLocalEndpoint();
}
}
if (!connection || connection->state() == fu::Connection::State::Closed) {
LOG_TOPIC("39e51", WARN, arangodb::Logger::FIXME)
<< "connection closed after " << fuerte::v1::to_string(req_copy);
<< "connection closed after " << fuerte::v1::to_string(req_copy)
<< " from: " << localEndpoint;
if (response) {
LOG_TOPIC("39e52", WARN, arangodb::Logger::FIXME)
<< "Server responce: " << response;
Expand Down
7 changes: 7 additions & 0 deletions client-tools/Shell/V8ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ class V8ClientConnection {
std::string const& role() const { return _role; }
std::string endpointSpecification() const;

std::string getLocalEndpoint() {
if (_connection) {
return _connection->localEndpoint();
} else {
return "not connected";
}
}
ArangoshServer& server();

v8::Handle<v8::Value> getData(
Expand Down
3 changes: 3 additions & 0 deletions tests/AsyncAgencyComm/AsyncAgencyCommTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ struct AsyncAgencyCommPoolMock final : public network::ConnectionPool {
_mock(mock),
_endpoint(std::move(endpoint)) {}

virtual std::string localEndpoint() override final {
return "not implemented";
};
std::size_t requestsLeft() const override { return 1; }
State state() const override {
return fuerte::Connection::State::Connected;
Expand Down
4 changes: 4 additions & 0 deletions tests/IResearch/AgencyMock.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ struct AsyncAgencyStorePoolConnection final
void sendRequest(std::unique_ptr<arangodb::fuerte::Request> req,
arangodb::fuerte::RequestCallback cb) override;

virtual std::string localEndpoint() override final {
return "not implemented";
};

arangodb::AgencyCache& _cache;
std::string _endpoint;
};
Expand Down
4 changes: 4 additions & 0 deletions tests/Mocks/PreparedResponseConnectionPool.cpp
4CD1
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ class FakeConnection final : public fuerte::Connection {

void cancel() override {}

virtual std::string localEndpoint() override final {
return "not implemented";
};

private:
std::shared_ptr<std::vector<PreparedRequestResponse>> _responses;
};
Expand Down
Loading
0