8000 Remove mutexes from Logging by jsteemann · Pull Request #14550 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Remove mutexes from Logging #14550

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions arangod/Agency/Constituent.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "AgentConfiguration.h"
#include "Basics/Common.h"
#include "Basics/ConditionVariable.h"
#include "Basics/Mutex.h"
#include "Basics/Thread.h"
#include "RestServer/MetricsFeature.h"

Expand Down
1 change: 1 addition & 0 deletions arangod/Cluster/MaintenanceFeature.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "ApplicationFeatures/ApplicationFeature.h"
#include "Basics/Common.h"
#include "Basics/ConditionVariable.h"
#include "Basics/Mutex.h"
#include "Basics/ReadWriteLock.h"
#include "Basics/Result.h"
#include "Cluster/Action.h"
Expand Down
1 change: 0 additions & 1 deletion lib/Logger/LogThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ LogThread::LogThread(application_features::ApplicationServer& server, std::strin
: Thread(server, name), _messages(64) {}

LogThread::~LogThread() {
Logger::_threaded = false;
Logger::_active = false;

shutdown();
Expand Down
1 change: 1 addition & 0 deletions lib/Logger/LogTopic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "LogTopic.h"

#include "Basics/Mutex.h"
#include "Basics/MutexLocker.h"
#include "Logger/LogMacros.h"
#include "Logger/Logger.h"
Expand Down
102 changes: 58 additions & 44 deletions lib/Logger/Logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@

#include "Basics/Common.h"
#include "Basics/Exceptions.h"
#include "Basics/Mutex.h"
#include "Basics/MutexLocker.h"
#include "Basics/StringUtils.h"
#include "Basics/Thread.h"
#include "Basics/application-exit.h"
Expand Down Expand Up @@ -111,8 +109,6 @@ void LogMessage::shrink(std::size_t maxLength) {
}


Mutex Logger::_initializeMutex;

std::atomic<bool> Logger::_active(false);
std::atomic<LogLevel> Logger::_level(LogLevel::INFO);

Expand All @@ -123,7 +119,6 @@ bool Logger::_shortenFilenames(true);
bool Logger::_showProcessIdentifier(true);
bool Logger::_showThreadIdentifier(false);
bool Logger::_showThreadName(false);
bool Logger::_threaded(false);
bool Logger::_useColor(true);
bool Logger::_useEscaped(true);
bool Logger::_keepLogRotate(false);
Expand All @@ -135,8 +130,22 @@ TRI_pid_t Logger::_cachedPid(0);
std::string Logger::_outputPrefix;
std::string Logger::_hostname;

std::unique_ptr<LogThread> Logger::_loggingThread(nullptr);
std::atomic<std::size_t> Logger::_loggingThreadRefs(0);
std::atomic<LogThread*> Logger::_loggingThread(nullptr);

Logger::ThreadRef::ThreadRef() {
// (1) - this acquire-fetch-add synchronizes with the release-fetch-add (5)
Logger::_loggingThreadRefs.fetch_add(1, std::memory_order_acquire);
// (2) - this acquire-load synchronizes with the release-store (4)
_thread = Logger::_loggingThread.load(std::memory_order_acquire);
}

Logger::ThreadRef::~ThreadRef() {
// (3) - this relaxed-fetch-add is potentially part of a release-sequence
// headed by (5)
Logger::_loggingThreadRefs.fetch_sub(1, std::memory_order_relaxed);
}

LogGroup& Logger::defaultLogGroup() { return ::defaultLogGroupInstance; }

LogLevel Logger::logLevel() { return _level.load(std::memory_order_relaxed); }
Expand Down Expand Up @@ -676,14 +685,19 @@ void Logger::append(LogGroup& group,
// note that these loggers do not require any configuration so we can always and safely invoke them.
LogAppender::logGlobal(group, *msg);

if (!_active.load(std::memory_order_relaxed)) {
if (!_active.load(std::memory_order_acquire)) {
// logging is still turned off. now use hard-coded to-stderr logging
inactive(msg);
} else {
// now either queue or output the message
bool handled = false;
if (_threaded && !forceDirect) {
handled = _loggingThread->log(group, msg);
if (!forceDirect) {
// check if we have a logging thread
ThreadRef loggingThread;

if (loggingThread) {
handled = loggingThread->log(group, msg);
}
}

if (!handled) {
Expand All @@ -704,64 +718,69 @@ void Logger::append(LogGroup& group,
////////////////////////////////////////////////////////////////////////////////

void Logger::initialize(application_features::ApplicationServer& server, bool threaded) {
MUTEX_LOCKER(locker, _initializeMutex);

if (_active.exchange(true)) {
if (_active.exchange(true, std::memory_order_acquire)) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"Logger already initialized");
}

// logging is now active
TRI_ASSERT(_active);

if (threaded) {
_loggingThread = std::make_unique<LogThread>(server, ::LogThreadName);
if (!_loggingThread->start()) {
auto loggingThread = std::make_unique<LogThread>(server, ::LogThreadName);
if (!loggingThread->start()) {
LOG_TOPIC("28bd9", FATAL, arangodb::Logger::FIXME)
<< "could not start logging thread";
FATAL_ERROR_EXIT();
}
}

// generate threaded logging?
_threaded = threaded;
// (4) - this release-store synchronizes with the acquire-load (2)
_loggingThread.store(loggingThread.release(), std::memory_order_release);
}
}

////////////////////////////////////////////////////////////////////////////////
/// @brief shuts down the logging components
////////////////////////////////////////////////////////////////////////////////

void Logger::shutdown() {
MUTEX_LOCKER(locker, _initializeMutex);

if (!_active.exchange(false)) {
// if logging not activated or already shutdown, then we can abort here
if (!_active.exchange(false, std::memory_order_acquire)) {
// if logging not activated or already shut down, then we can abort here
return;
}
// logging is now inactive

TRI_ASSERT(!_active);

// reset the instance variable in Logger, so that others won't see it anymore
std::unique_ptr<LogThread> loggingThread(_loggingThread.exchange(nullptr, std::memory_order_relaxed));

// logging is now inactive (this will terminate the logging thread)
// join with the logging thread
if (_threaded) {
_threaded = false;

if (loggingThread != nullptr) {
// (5) - this release-fetch-add synchronizes with the acquire-fetch-add (1)
// Even though a fetch-add with 0 is essentially a noop, this is necessary to
// ensure that threads which try to get a reference to the _loggingThread
// actually see the new nullptr value.
_loggingThreadRefs.fetch_add(0, std::memory_order_release);

// wait until all threads have dropped their reference to the logging thread
while (_loggingThreadRefs.load(std::memory_order_relaxed)) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}

char const* currentThreadName = Thread::currentThreadName();
if (currentThreadName != nullptr && ::LogThreadName == currentThreadName) {
// oops, the LogThread itself crashed...
// so we need to flush the log messages here ourselves - if we waited for
// the LogThread to flush them, we would wait forever.
_loggingThread->processPendingMessages();
_loggingThread->beginShutdown();
loggingThread->processPendingMessages();
loggingThread->beginShutdown();
} else {
int tries = 0;
while (_loggingThread->hasMessages() && ++tries < 10) {
_loggingThread->wakeup();
while (loggingThread->hasMessages() && ++tries < 10) {
loggingThread->wakeup();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
_loggingThread->beginShutdown();
loggingThread->beginShutdown();
// wait until logging thread has logged all active messages
while (_loggingThread->isRunning()) {
while (loggingThread->isRunning()) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));< 1E79 /td>
}
}
Expand All @@ -773,23 +792,18 @@ void Logger::shutdown() {
_cachedPid = 0;
}

void Logger::shutdownLogThread() {
_loggingThread.reset();
}

////////////////////////////////////////////////////////////////////////////////
/// @brief tries to flush the logging
////////////////////////////////////////////////////////////////////////////////

void Logger::flush() noexcept {
MUTEX_LOCKER(locker, _initializeMutex);

if (!_active) {
if (!_active.load(std::memory_order_acquire)) {
// logging not (or not yet) initialized
return;
}

if (_threaded && _loggingThread != nullptr) {
_loggingThread->flush();

ThreadRef loggingThread;
if (loggingThread) {
loggingThread->flush();
}
}
26 changes: 20 additions & 6 deletions lib/Logger/Logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
#include <vector>

#include "Basics/Common.h"
#include "Basics/Mutex.h"
#include "Basics/threads.h"
#include "Logger/LogLevel.h"
#include "Logger/LogTimeFormat.h"
Expand Down Expand Up @@ -313,12 +312,9 @@ class Logger {
public:
static void initialize(application_features::ApplicationServer&, bool);
static void shutdown();
static void shutdownLogThread();
static void flush() noexcept;

private:
static Mutex _initializeMutex;

// these variables might be changed asynchronously
static std::atomic<bool> _active;
static std::atomic<LogLevel> _level;
Expand All @@ -331,7 +327,6 @@ class Logger {
static bool _showThreadIdentifier;
static bool _showThreadName;
static bool _showRole;
static bool _threaded;
static bool _useColor;
static bool _useEscaped;
static bool _keepLogRotate;
Expand All @@ -343,6 +338,25 @@ class Logger {
static std::string _outputPrefix;
static std::string _hostname;

static std::unique_ptr<LogThread> _loggingThread;
struct ThreadRef {
ThreadRef();
~ThreadRef();

ThreadRef(const ThreadRef&) = delete;
ThreadRef(ThreadRef&&) B2AF = delete;
ThreadRef& operator=(const ThreadRef&) = delete;
ThreadRef& operator=(ThreadRef&&) = delete;

LogThread* operator->() const noexcept { return _thread; }
operator bool() const noexcept { return _thread != nullptr; }
private:
LogThread* _thread;
};

// logger thread. only populated when threaded logging is selected.
// the pointer must only be used with atomic accessors after the ref counter
// has been increased. Best to usethe ThreadRef class for this!
static std::atomic<std::size_t> _loggingThreadRefs;
static std::atomic<LogThread*> _loggingThread;
};
} // namespace arangodb
1 change: 0 additions & 1 deletion lib/Logger/LoggerFeature.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ LoggerFeature::LoggerFeature(application_features::ApplicationServer& server,

LoggerFeature::~LoggerFeature() {
Logger::shutdown();
Logger::shutdownLogThread();
}

void LoggerFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
Expand Down
0