From 70f3e9513ff21f18a338aeb8a28e6fa4e0a89c5a Mon Sep 17 00:00:00 2001 From: jsteemann Date: Tue, 27 Jul 2021 15:55:19 +0200 Subject: [PATCH 1/3] draft PR for removing mutexes from Logging --- arangod/Agency/Constituent.h | 1 + arangod/Cluster/MaintenanceFeature.h | 1 + lib/Logger/LogThread.cpp | 1 - lib/Logger/LogTopic.cpp | 1 + lib/Logger/Logger.cpp | 76 +++++++++++++--------------- lib/Logger/Logger.h | 12 ++--- 6 files changed, 44 insertions(+), 48 deletions(-) diff --git a/arangod/Agency/Constituent.h b/arangod/Agency/Constituent.h index 4eb9901bc9b5..b7b9e3847894 100644 --- a/arangod/Agency/Constituent.h +++ b/arangod/Agency/Constituent.h @@ -28,6 +28,7 @@ #include "AgentConfiguration.h" #include "Basics/Common.h" #include "Basics/ConditionVariable.h" +#include "Basics/Mutex.h" #include "Basics/Thread.h" #include "RestServer/MetricsFeature.h" diff --git a/arangod/Cluster/MaintenanceFeature.h b/arangod/Cluster/MaintenanceFeature.h index be59fcacd21b..74ae9bdc6d3e 100644 --- a/arangod/Cluster/MaintenanceFeature.h +++ b/arangod/Cluster/MaintenanceFeature.h @@ -27,6 +27,7 @@ #include "ApplicationFeatures/ApplicationFeature.h" #include "Basics/Common.h" #include "Basics/ConditionVariable.h" +#include "Basics/Mutex.h" #include "Basics/ReadWriteLock.h" #include "Basics/Result.h" #include "Cluster/Action.h" diff --git a/lib/Logger/LogThread.cpp b/lib/Logger/LogThread.cpp index 3a88dae6f65b..e578d72c2fcc 100644 --- a/lib/Logger/LogThread.cpp +++ b/lib/Logger/LogThread.cpp @@ -33,7 +33,6 @@ LogThread::LogThread(application_features::ApplicationServer& server, std::strin : Thread(server, name), _messages(64) {} LogThread::~LogThread() { - Logger::_threaded = false; Logger::_active = false; shutdown(); diff --git a/lib/Logger/LogTopic.cpp b/lib/Logger/LogTopic.cpp index a7e9c7020093..709e569772a4 100644 --- a/lib/Logger/LogTopic.cpp +++ b/lib/Logger/LogTopic.cpp @@ -27,6 +27,7 @@ #include "LogTopic.h" +#include "Basics/Mutex.h" #include "Basics/MutexLocker.h" #include "Logger/LogMacros.h" #include "Logger/Logger.h" diff --git a/lib/Logger/Logger.cpp b/lib/Logger/Logger.cpp index 3f75eb9ab26c..d3b8a63c0b18 100644 --- a/lib/Logger/Logger.cpp +++ b/lib/Logger/Logger.cpp @@ -32,8 +32,6 @@ #include "Basics/Common.h" #include "Basics/Exceptions.h" -#include "Basics/Mutex.h" -#include "Basics/MutexLocker.h" #include "Basics/StringUtils.h" #include "Basics/Thread.h" #include "Basics/application-exit.h" @@ -111,8 +109,6 @@ void LogMessage::shrink(std::size_t maxLength) { } -Mutex Logger::_initializeMutex; - std::atomic Logger::_active(false); std::atomic Logger::_level(LogLevel::INFO); @@ -123,7 +119,6 @@ bool Logger::_shortenFilenames(true); bool Logger::_showProcessIdentifier(true); bool Logger::_showThreadIdentifier(false); bool Logger::_showThreadName(false); -bool Logger::_threaded(false); bool Logger::_useColor(true); bool Logger::_useEscaped(true); bool Logger::_keepLogRotate(false); @@ -135,7 +130,7 @@ TRI_pid_t Logger::_cachedPid(0); std::string Logger::_outputPrefix; std::string Logger::_hostname; -std::unique_ptr Logger::_loggingThread(nullptr); +std::shared_ptr Logger::_loggingThread(nullptr); LogGroup& Logger::defaultLogGroup() { return ::defaultLogGroupInstance; } @@ -676,14 +671,19 @@ void Logger::append(LogGroup& group, // note that these loggers do not require any configuration so we can always and safely invoke them. LogAppender::logGlobal(group, *msg); - if (!_active.load(std::memory_order_relaxed)) { + if (!_active.load(std::memory_order_acquire)) { // logging is still turned off. now use hard-coded to-stderr logging inactive(msg); } else { // now either queue or output the message bool handled = false; - if (_threaded && !forceDirect) { - handled = _loggingThread->log(group, msg); + if (!forceDirect) { + // check if we have a logging thread + auto loggingThread = std::atomic_load_explicit(&_loggingThread, std::memory_order_relaxed); + + if (loggingThread != nullptr) { + handled = loggingThread->log(group, msg); + } } if (!handled) { @@ -704,27 +704,22 @@ void Logger::append(LogGroup& group, //////////////////////////////////////////////////////////////////////////////// void Logger::initialize(application_features::ApplicationServer& server, bool threaded) { - MUTEX_LOCKER(locker, _initializeMutex); - - if (_active.exchange(true)) { + if (_active.exchange(true, std::memory_order_acquire)) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "Logger already initialized"); } // logging is now active - TRI_ASSERT(_active); - if (threaded) { - _loggingThread = std::make_unique(server, ::LogThreadName); - if (!_loggingThread->start()) { + auto loggingThread = std::make_shared(server, ::LogThreadName); + if (!loggingThread->start()) { LOG_TOPIC("28bd9", FATAL, arangodb::Logger::FIXME) << "could not start logging thread"; FATAL_ERROR_EXIT(); } - } - // generate threaded logging? - _threaded = threaded; + std::atomic_store_explicit(&_loggingThread, loggingThread, std::memory_order_release); + } } //////////////////////////////////////////////////////////////////////////////// @@ -732,36 +727,36 @@ void Logger::initialize(application_features::ApplicationServer& server, bool th //////////////////////////////////////////////////////////////////////////////// void Logger::shutdown() { - MUTEX_LOCKER(locker, _initializeMutex); - - if (!_active.exchange(false)) { - // if logging not activated or already shutdown, then we can abort here + if (!_active.exchange(false, std::memory_order_acquire)) { + // if logging not activated or already shut down, then we can abort here return; } + // logging is now inactive - TRI_ASSERT(!_active); + auto loggingThread = std::atomic_load_explicit(&_loggingThread, std::memory_order_relaxed); + + // reset the instance variable in Logger, so that others won't see it anymore + std::atomic_store_explicit(&_loggingThread, std::shared_ptr(), std::memory_order_release); // logging is now inactive (this will terminate the logging thread) // join with the logging thread - if (_threaded) { - _threaded = false; - + if (loggingThread != nullptr) { char const* currentThreadName = Thread::currentThreadName(); if (currentThreadName != nullptr && ::LogThreadName == currentThreadName) { // oops, the LogThread itself crashed... // so we need to flush the log messages here ourselves - if we waited for // the LogThread to flush them, we would wait forever. - _loggingThread->processPendingMessages(); - _loggingThread->beginShutdown(); + loggingThread->processPendingMessages(); + loggingThread->beginShutdown(); } else { int tries = 0; - while (_loggingThread->hasMessages() && ++tries < 10) { - _loggingThread->wakeup(); + while (loggingThread->hasMessages() && ++tries < 10) { + loggingThread->wakeup(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } - _loggingThread->beginShutdown(); + loggingThread->beginShutdown(); // wait until logging thread has logged all active messages - while (_loggingThread->isRunning()) { + while (loggingThread->isRunning()) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } @@ -773,8 +768,8 @@ void Logger::shutdown() { _cachedPid = 0; } -void Logger::shutdownLogThread() { - _loggingThread.reset(); +void Logger::shutdownLogThread() noexcept { + std::atomic_store_explicit(&_loggingThread, std::shared_ptr(), std::memory_order_release); } //////////////////////////////////////////////////////////////////////////////// @@ -782,14 +777,13 @@ void Logger::shutdownLogThread() { //////////////////////////////////////////////////////////////////////////////// void Logger::flush() noexcept { - MUTEX_LOCKER(locker, _initializeMutex); - - if (!_active) { + if (!_active.load(std::memory_order_acquire)) { // logging not (or not yet) initialized return; } - - if (_threaded && _loggingThread != nullptr) { - _loggingThread->flush(); + + auto loggingThread = std::atomic_load_explicit(&_loggingThread, std::memory_order_relaxed); + if (loggingThread != nullptr) { + loggingThread->flush(); } } diff --git a/lib/Logger/Logger.h b/lib/Logger/Logger.h index ba546204ea41..fea4a1840675 100644 --- a/lib/Logger/Logger.h +++ b/lib/Logger/Logger.h @@ -63,7 +63,6 @@ #include #include "Basics/Common.h" -#include "Basics/Mutex.h" #include "Basics/threads.h" #include "Logger/LogLevel.h" #include "Logger/LogTimeFormat.h" @@ -313,12 +312,10 @@ class Logger { public: static void initialize(application_features::ApplicationServer&, bool); static void shutdown(); - static void shutdownLogThread(); + static void shutdownLogThread() noexcept; static void flush() noexcept; private: - static Mutex _initializeMutex; - // these variables might be changed asynchronously static std::atomic _active; static std::atomic _level; @@ -331,7 +328,6 @@ class Logger { static bool _showThreadIdentifier; static bool _showThreadName; static bool _showRole; - static bool _threaded; static bool _useColor; static bool _useEscaped; static bool _keepLogRotate; @@ -343,6 +339,10 @@ class Logger { static std::string _outputPrefix; static std::string _hostname; - static std::unique_ptr _loggingThread; + // logger thread. only populated when threaded logging is selected. + // the pointer must only be used with atomic accessors. + // TODO: access this shared_ptr via std::atomic_load may not be + // lock-free, so we should try to improve on it! + static std::shared_ptr _loggingThread; }; } // namespace arangodb From d7ef94e4d209864471d4f74f89a259a68be04c4d Mon Sep 17 00:00:00 2001 From: mpoeter Date: Tue, 27 Jul 2021 17:35:30 +0200 Subject: [PATCH 2/3] Make access of logging thread lock-free. --- lib/Logger/Logger.cpp | 39 ++++++++++++++++++++++-------------- lib/Logger/Logger.h | 18 ++++++++++++----- lib/Logger/LoggerFeature.cpp | 1 - 3 files changed, 37 insertions(+), 21 deletions(-) diff --git a/lib/Logger/Logger.cpp b/lib/Logger/Logger.cpp index d3b8a63c0b18..01f5eb67f4d7 100644 --- a/lib/Logger/Logger.cpp +++ b/lib/Logger/Logger.cpp @@ -130,8 +130,18 @@ TRI_pid_t Logger::_cachedPid(0); std::string Logger::_outputPrefix; std::string Logger::_hostname; -std::shared_ptr Logger::_loggingThread(nullptr); +std::atomic Logger::_loggingThreadRefs(0); +std::atomic Logger::_loggingThread(nullptr); +Logger::ThreadRef::ThreadRef() { + Logger::_loggingThreadRefs.fetch_add(1, std::memory_order_relaxed); + _thread = Logger::_loggingThread.load(std::memory_order_acquire); +} + +Logger::ThreadRef::~ThreadRef() { + Logger::_loggingThreadRefs.fetch_sub(1, std::memory_order_relaxed); +} + LogGroup& Logger::defaultLogGroup() { return ::defaultLogGroupInstance; } LogLevel Logger::logLevel() { return _level.load(std::memory_order_relaxed); } @@ -679,9 +689,9 @@ void Logger::append(LogGroup& group, bool handled = false; if (!forceDirect) { // check if we have a logging thread - auto loggingThread = std::atomic_load_explicit(&_loggingThread, std::memory_order_relaxed); + ThreadRef loggingThread; - if (loggingThread != nullptr) { + if (loggingThread) { handled = loggingThread->log(group, msg); } } @@ -711,14 +721,14 @@ void Logger::initialize(application_features::ApplicationServer& server, bool th // logging is now active if (threaded) { - auto loggingThread = std::make_shared(server, ::LogThreadName); + auto loggingThread = new LogThread(server, ::LogThreadName); if (!loggingThread->start()) { LOG_TOPIC("28bd9", FATAL, arangodb::Logger::FIXME) << "could not start logging thread"; FATAL_ERROR_EXIT(); } - std::atomic_store_explicit(&_loggingThread, loggingThread, std::memory_order_release); + _loggingThread.store(loggingThread, std::memory_order_release); } } @@ -733,14 +743,17 @@ void Logger::shutdown() { } // logging is now inactive - auto loggingThread = std::atomic_load_explicit(&_loggingThread, std::memory_order_relaxed); - // reset the instance variable in Logger, so that others won't see it anymore - std::atomic_store_explicit(&_loggingThread, std::shared_ptr(), std::memory_order_release); - + auto loggingThread = _loggingThread.exchange(nullptr, std::memory_order_relaxed); + // logging is now inactive (this will terminate the logging thread) // join with the logging thread if (loggingThread != nullptr) { + // wait until all threads have dropped their reference to the logging thread + while (_loggingThreadRefs.load(std::memory_order_relaxed)) { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + char const* currentThreadName = Thread::currentThreadName(); if (currentThreadName != nullptr && ::LogThreadName == currentThreadName) { // oops, the LogThread itself crashed... @@ -768,10 +781,6 @@ void Logger::shutdown() { _cachedPid = 0; } -void Logger::shutdownLogThread() noexcept { - std::atomic_store_explicit(&_loggingThread, std::shared_ptr(), std::memory_order_release); -} - //////////////////////////////////////////////////////////////////////////////// /// @brief tries to flush the logging //////////////////////////////////////////////////////////////////////////////// @@ -782,8 +791,8 @@ void Logger::flush() noexcept { return; } - auto loggingThread = std::atomic_load_explicit(&_loggingThread, std::memory_order_relaxed); - if (loggingThread != nullptr) { + ThreadRef loggingThread; + if (loggingThread) { loggingThread->flush(); } } diff --git a/lib/Logger/Logger.h b/lib/Logger/Logger.h index fea4a1840675..174ab614e096 100644 --- a/lib/Logger/Logger.h +++ b/lib/Logger/Logger.h @@ -312,7 +312,6 @@ class Logger { public: static void initialize(application_features::ApplicationServer&, bool); static void shutdown(); - static void shutdownLogThread() noexcept; static void flush() noexcept; private: @@ -339,10 +338,19 @@ class Logger { static std::string _outputPrefix; static std::string _hostname; + struct ThreadRef { + ThreadRef(); + ~ThreadRef(); + LogThread* operator->() const noexcept { return _thread; } + operator bool() const noexcept { return _thread != nullptr; } + private: + LogThread* _thread; + }; + // logger thread. only populated when threaded logging is selected. - // the pointer must only be used with atomic accessors. - // TODO: access this shared_ptr via std::atomic_load may not be - // lock-free, so we should try to improve on it! - static std::shared_ptr _loggingThread; + // the pointer must only be used with atomic accessors after the ref counter + // has been increased. Best to usethe ThreadRef class for this! + static std::atomic _loggingThreadRefs; + static std::atomic _loggingThread; }; } // namespace arangodb diff --git a/lib/Logger/LoggerFeature.cpp b/lib/Logger/LoggerFeature.cpp index 59b823eb4b57..400d7ed07c5a 100644 --- a/lib/Logger/LoggerFeature.cpp +++ b/lib/Logger/LoggerFeature.cpp @@ -92,7 +92,6 @@ LoggerFeature::LoggerFeature(application_features::ApplicationServer& server, LoggerFeature::~LoggerFeature() { Logger::shutdown(); - Logger::shutdownLogThread(); } void LoggerFeature::collectOptions(std::shared_ptr options) { From c60e67f56e4675d47226852bf6a86a79b636f6a1 Mon Sep 17 00:00:00 2001 From: mpoeter Date: Wed, 28 Jul 2021 13:32:57 +0200 Subject: [PATCH 3/3] Address review comments. --- lib/Logger/Logger.cpp | 19 +++++++++++++++---- lib/Logger/Logger.h | 6 ++++++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/lib/Logger/Logger.cpp b/lib/Logger/Logger.cpp index 01f5eb67f4d7..d08d9b5d5949 100644 --- a/lib/Logger/Logger.cpp +++ b/lib/Logger/Logger.cpp @@ -134,11 +134,15 @@ std::atomic Logger::_loggingThreadRefs(0); std::atomic Logger::_loggingThread(nullptr); Logger::ThreadRef::ThreadRef() { - Logger::_loggingThreadRefs.fetch_add(1, std::memory_order_relaxed); + // (1) - this acquire-fetch-add synchronizes with the release-fetch-add (5) + Logger::_loggingThreadRefs.fetch_add(1, std::memory_order_acquire); + // (2) - this acquire-load synchronizes with the release-store (4) _thread = Logger::_loggingThread.load(std::memory_order_acquire); } Logger::ThreadRef::~ThreadRef() { + // (3) - this relaxed-fetch-add is potentially part of a release-sequence + // headed by (5) Logger::_loggingThreadRefs.fetch_sub(1, std::memory_order_relaxed); } @@ -721,14 +725,15 @@ void Logger::initialize(application_features::ApplicationServer& server, bool th // logging is now active if (threaded) { - auto loggingThread = new LogThread(server, ::LogThreadName); + auto loggingThread = std::make_unique(server, ::LogThreadName); if (!loggingThread->start()) { LOG_TOPIC("28bd9", FATAL, arangodb::Logger::FIXME) << "could not start logging thread"; FATAL_ERROR_EXIT(); } - _loggingThread.store(loggingThread, std::memory_order_release); + // (4) - this release-store synchronizes with the acquire-load (2) + _loggingThread.store(loggingThread.release(), std::memory_order_release); } } @@ -744,11 +749,17 @@ void Logger::shutdown() { // logging is now inactive // reset the instance variable in Logger, so that others won't see it anymore - auto loggingThread = _loggingThread.exchange(nullptr, std::memory_order_relaxed); + std::unique_ptr loggingThread(_loggingThread.exchange(nullptr, std::memory_order_relaxed)); // logging is now inactive (this will terminate the logging thread) // join with the logging thread if (loggingThread != nullptr) { + // (5) - this release-fetch-add synchronizes with the acquire-fetch-add (1) + // Even though a fetch-add with 0 is essentially a noop, this is necessary to + // ensure that threads which try to get a reference to the _loggingThread + // actually see the new nullptr value. + _loggingThreadRefs.fetch_add(0, std::memory_order_release); + // wait until all threads have dropped their reference to the logging thread while (_loggingThreadRefs.load(std::memory_order_relaxed)) { std::this_thread::sleep_for(std::chrono::milliseconds(20)); diff --git a/lib/Logger/Logger.h b/lib/Logger/Logger.h index 174ab614e096..c64782d457c8 100644 --- a/lib/Logger/Logger.h +++ b/lib/Logger/Logger.h @@ -341,6 +341,12 @@ class Logger { struct ThreadRef { ThreadRef(); ~ThreadRef(); + + ThreadRef(const ThreadRef&) = delete; + ThreadRef(ThreadRef&&) = delete; + ThreadRef& operator=(const ThreadRef&) = delete; + ThreadRef& operator=(ThreadRef&&) = delete; + LogThread* operator->() const noexcept { return _thread; } operator bool() const noexcept { return _thread != nullptr; } private: