10000 Improvements for Pregel memory-mapped files by jsteemann · Pull Request #14637 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Improvements for Pregel memory-mapped files #14637

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you 8000 agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
v3.8.1 (XXXX-XX-XX)
-------------------

* When creating Pregel memory-mapped files, create them with O_TMPFILE attribute
on Linux so that files are guaranteed to vanish even if a process dies.

* Improve log messages for Pregel runs by giving them more context.

* Fixed issue BTS-536 "Upgrading without rest-server is aborted by error".
Expand Down
4 changes: 2 additions & 2 deletions arangod/Pregel/Algos/PageRank.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ struct PRMasterContext : public MasterContext {
}

void preApplication() override {
LOG_TOPIC("e0598", DEBUG, Logger::PREGEL) << "Using threshold " << _threshold;
};
LOG_TOPIC("e0598", DEBUG, Logger::PREGEL) << "Using threshold " << _threshold << " for pagerank";
}

bool postGlobalSuperstep() override {
float const* diff = getAggregatedValue<float>(kConvergence);
Expand Down
9 changes: 6 additions & 3 deletions arangod/Pregel/GraphStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,15 @@ void moveAppend(std::vector<X>& src, std::vector<X>& dst) {
template <typename M>
std::unique_ptr<TypedBuffer<M>> createBuffer(WorkerConfig const& config, size_t cap) {
if (config.useMemoryMaps()) {
auto ptr = std::make_unique<MappedFileBuffer<M>>(cap);
// prefix used for logging in TypedBuffer.h
std::string logPrefix = "[job " + std::to_string(config.executionNumber()) + "] ";

auto ptr = std::make_unique<MappedFileBuffer<M>>(cap, logPrefix);
ptr->sequentialAccess();
return ptr;
} else {
return std::make_unique<VectorTypedBuffer<M>>(cap);
}

return std::make_unique<VectorTypedBuffer<M>>(cap);
}
} // namespace

Expand Down
273 changes: 219 additions & 54 deletions arangod/Pregel/TypedBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,37 @@
#ifndef ARANGODB_PREGEL_BUFFER_H
#define ARANGODB_PREGEL_BUFFER_H 1

#include <cstddef>
#include <errno.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>

#ifdef TRI_HAVE_UNISTD_H
#include <unistd.h>
#endif

#ifdef __linux__
#include <sys/mman.h>
#endif

#include "Basics/Common.h"

#include "Basics/FileUtils.h"
#include "Basics/PageSize.h"
#include "Basics/StringUtils.h"
#include "Basics/Thread.h"
#include "Basics/debugging.h"
#include "Basics/error.h"
#include "Basics/files.h"
#include "Basics/memory-map.h"
#include "Basics/operating-system.h"
#include "Basics/system-functions.h"
#include "Logger/LogMacros.h"
#include "Logger/Logger.h"
#include "Logger/LoggerStream.h"
#include "Random/RandomGenerator.h"

#include <cstddef>

#ifdef TRI_HAVE_UNISTD_H
#include <unistd.h>
#endif

#ifdef __linux__
#include <sys/mman.h>
#endif

namespace arangodb {
namespace pregel {

Expand Down Expand Up @@ -154,39 +161,38 @@ class VectorTypedBuffer : public TypedBuffer<T> {
/** Filesize limited by size_t, usually 2^32 or 2^64 */
template <typename T>
class MappedFileBuffer : public TypedBuffer<T> {
std::string _filename; // underlying filename
int _fd = -1; // underlying file descriptor
void* _mmHandle; // underlying memory map object handle (windows only)
size_t _mappedSize; // actually mapped size
std::string _logPrefix; // prefix used for logging
std::string _filename; // underlying filename
int _fd; // underlying file descriptor
bool _temporary; // O_TMPFILE used?
void* _mmHandle; // underlying memory map object handle (windows only)
size_t _mappedSize; // actually mapped size

public:
explicit MappedFileBuffer(size_t capacity)
: TypedBuffer<T>() {
MappedFileBuffer(MappedFileBuffer const& other) = delete;
MappedFileBuffer& operator=(MappedFileBuffer const& other) = delete;

explicit MappedFileBuffer(size_t capacity, std::string const& logPrefix)
: TypedBuffer<T>(),
_logPrefix(logPrefix),
_fd(-1),
_temporary(false),
_mmHandle(nullptr),
_mappedSize(sizeof(T) * capacity) {
TRI_ASSERT(capacity > 0u);
double tt = TRI_microtime();
int64_t tt2 = arangodb::RandomGenerator::interval((int64_t)0LL, (int64_t)0x7fffffffffffffffLL);

std::string file = "pregel-" +
std::to_string(uint64_t(Thread::currentProcessId())) + "-" +
std::to_string(uint64_t(tt)) + "-" +
std::to_string(tt2) +
".mmap";
this->_filename = basics::FileUtils::buildFilename(TRI_GetTempPath(), file);

_mappedSize = sizeof(T) * capacity;
size_t pageSize = PageSize::getValue();
TRI_ASSERT(pageSize >= 256);
// use multiples of page-size
_mappedSize = (size_t)(((_mappedSize + pageSize - 1) / pageSize) * pageSize);

LOG_TOPIC("358e3", DEBUG, Logger::PREGEL) << "creating mmap file '" << _filename << "' with capacity " << capacity << " and size " << _mappedSize;

_fd = TRI_CreateDatafile(_filename, _mappedSize);
_fd = createFile(_mappedSize);

if (_fd < 0) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_SYS_ERROR,
basics::StringUtils::concatT("pregel cannot create mmap file '",
_filename, "': ", TRI_last_error()));
basics::StringUtils::concatT("pregel cannot create mmap file ",
label(), ": ", TRI_last_error()));
}

// memory map the data
Expand All @@ -203,21 +209,21 @@ class MappedFileBuffer : public TypedBuffer<T> {
TRI_set_errno(res);
TRI_CLOSE(_fd);
_fd = -1;

// remove empty file
TRI_UnlinkFile(_filename.c_str());

LOG_TOPIC("54dfb", ERR, arangodb::Logger::FIXME)
<< "cannot memory map file '" << _filename << "': '"

LOG_TOPIC("54dfb", ERR, arangodb::Logger::PREGEL) << _logPrefix
<< "cannot memory map " << label() << ": '"
<< TRI_errno_string(res) << "'";
LOG_TOPIC("1a034", ERR, arangodb::Logger::FIXME)
LOG_TOPIC("1a034", ERR, arangodb::Logger::PREGEL) << _logPrefix
<< "The database directory might reside on a shared folder "
"(VirtualBox, VMWare) or an NFS-mounted volume which does not "
"allow memory mapped files.";

removeFile();

THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL,
basics::StringUtils::concatT("cannot memory map file '", _filename, "': '",
TRI_errno_string(res), "'"));
basics::StringUtils::concatT("cannot memory map file ", label(), ": ",
TRI_errno_string(res)));
}

this->_begin = static_cast<T*>(data);
Expand All @@ -236,14 +242,6 @@ class MappedFileBuffer : public TypedBuffer<T> {
TRI_MMFileAdvise(this->_begin, _mappedSize, TRI_MADVISE_RANDOM);
}

void willNeed() {
TRI_MMFileAdvise(this->_begin, _mappedSize, TRI_MADVISE_WILLNEED);
}

void dontNeed() {
TRI_MMFileAdvise(this->_begin, _mappedSize, TRI_MADVISE_DONTNEED);
}

/// close file
// cppcheck-suppress virtualCallInConstructor
void close() override {
Expand All @@ -252,7 +250,7 @@ class MappedFileBuffer : public TypedBuffer<T> {
return;
}

LOG_TOPIC("45530", DEBUG, Logger::PREGEL) << "closing mmap file '" << _filename << "'";
LOG_TOPIC("45530", DEBUG, Logger::PREGEL) << _logPrefix << "closing mmap " << label();

// destroy all elements in the buffer
for (auto* p = this->_begin; p != this->_end; ++p) {
Expand All @@ -262,17 +260,17 @@ class MappedFileBuffer : public TypedBuffer<T> {
if (auto res = TRI_UNMMFile(this->_begin, _mappedSize, _fd, &_mmHandle);
res != TRI_ERROR_NO_ERROR) {
// leave file open here as it will still be memory-mapped
LOG_TOPIC("ab7be", ERR, arangodb::Logger::FIXME) << "munmap failed with: " << res;
LOG_TOPIC("ab7be", ERR, arangodb::Logger::PREGEL) << _logPrefix << "munmap failed with: " << res;
}
if (_fd != -1) {
TRI_ASSERT(_fd >= 0);
if (auto res = TRI_CLOSE(_fd); res != 0) {
LOG_TOPIC("00e1d", ERR, arangodb::Logger::FIXME)
<< "unable to close pregel mapped file '" << _filename << "': " << res;
LOG_TOPIC("00e1d", WARN, arangodb::Logger::PREGEL) << _logPrefix
<< "unable to close pregel mapped " << label() << ": " << res;
}

// remove file
TRI_UnlinkFile(this->_filename.c_str());
removeFile();
_filename.clear();
}

this->_begin = nullptr;
Expand All @@ -283,6 +281,173 @@ class MappedFileBuffer : public TypedBuffer<T> {

/// true, if file successfully opened
bool isValid() const { return this->_begin != nullptr; }

private:
std::string label() const {
if (_temporary) {
return "temporary file in " + _filename;
}
return "file " + _filename;
}

std::string buildFilename(bool temporary) const {
if (temporary) {
// only need a path
return TRI_GetTempPath();
}

double tt = TRI_microtime();
int64_t tt2 = arangodb::RandomGenerator::interval((int64_t)0LL, (int64_t)0x7fffffffffffffffLL);

std::string file = "pregel-" +
std::to_string(uint64_t(Thread::currentProcessId())) + "-" +
std::to_string(uint64_t(tt)) + "-" +
std::to_string(tt2) +
".mmap";
return basics::FileUtils::buildFilename(TRI_GetTempPath(), file);
}

void removeFile() const {
if (!_temporary && !_filename.empty()) {
TRI_UnlinkFile(_filename.c_str());
}
}

/// @brief creates a new datafile
/// returns the file descriptor or -1 if the file cannot be created
int createFile(size_t maximalSize) {
TRI_ERRORBUF;

#ifdef _WIN32
bool temporary = false;
#else
bool temporary = true;
#endif

// open the file
int fd = -1;
if (temporary) {
_temporary = true;
_filename = buildFilename(_temporary);
// try creating a temporary file with O_TMPFILE first.
// this may be unsupported.
// in that case, we will fall back to creating a regular (non-temp) file.
fd = TRI_CREATE(_filename.c_str(), O_EXCL | O_RDWR | TRI_O_CLOEXEC | TRI_NOATIME | TRI_O_TMPFILE,
S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
// if fd is < 0, we will try without O_TMPFILE below.
}

if (fd < 0) {
_temporary = false;
_filename = buildFilename(_temporary);
fd = TRI_CREATE(_filename.c_str(), O_CREAT | O_EXCL | O_RDWR | TRI_O_CLOEXEC | TRI_NOATIME,
S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
}

LOG_TOPIC("358e3", DEBUG, Logger::PREGEL) << _logPrefix
<< "creating mmap " << label() << " of " << _mappedSize << " bytes capacity";

TRI_IF_FAILURE("CreateDatafile1") {
// intentionally fail
TRI_CLOSE(fd);
fd = -1;
errno = ENOSPC;
}

if (fd < 0) {
if (errno == ENOSPC) {
TRI_set_errno(TRI_ERROR_ARANGO_FILESYSTEM_FULL);
LOG_TOPIC("f7530", ERR, arangodb::Logger::PREGEL) << _logPrefix
<< "cannot create " << label() << ": " << TRI_last_error();
} else {
TRI_SYSTEM_ERROR();

TRI_set_errno(TRI_ERROR_SYS_ERROR);
LOG_TOPIC("53a75", ERR, arangodb::Logger::PREGEL) << _logPrefix
<< "cannot create " << label() << ": " << TRI_GET_ERRORBUF;
}

_filename.clear();
return -1;
}

// no fallocate present, or at least pretend it's not there...
int res = 1;

#ifdef __linux__
#ifdef FALLOC_FL_ZERO_RANGE
// try fallocate
res = fallocate(fd, FALLOC_FL_ZERO_RANGE, 0, maximalSize);
#endif
#endif

// cppcheck-suppress knownConditionTrueFalse
if (res != 0) {
// either fallocate failed or it is not there...

// create a buffer filled with zeros
static constexpr size_t nullBufferSize = 4096;
char nullBuffer[nullBufferSize];
memset(&nullBuffer[0], 0, nullBufferSize);

// fill file with zeros from buffer
size_t writeSize = nullBufferSize;
size_t written = 0;
while (written < maximalSize) {
if (writeSize + written > maximalSize) {
writeSize = maximalSize - written;
}

ssize_t writeResult = TRI_WRITE(fd, &nullBuffer[0], static_cast<TRI_write_t>(writeSize));

TRI_IF_FAILURE("CreateDatafile2") {
// intentionally fail
writeResult = -1;
errno = ENOSPC;
}

if (writeResult < 0) {
if (errno == ENOSPC) {
TRI_set_errno(TRI_ERROR_ARANGO_FILESYSTEM_FULL);
LOG_TOPIC("449cf", ERR, arangodb::Logger::PREGEL) << _logPrefix
<< "cannot create " << label() << ": " << TRI_last_error();
} else {
TRI_SYSTEM_ERROR();
TRI_set_errno(TRI_ERROR_SYS_ERROR);
LOG_TOPIC("2c4a6", ERR, arangodb::Logger::PREGEL) << _logPrefix
<< "cannot create " << label() << ": " << TRI_GET_ERRORBUF;
}

TRI_CLOSE(fd);
removeFile();

return -1;
}

written += static_cast<size_t>(writeResult);
}
}

// go back to offset 0
TRI_lseek_t offset = TRI_LSEEK(fd, (TRI_lseek_t)0, SEEK_SET);

if (offset == (TRI_lseek_t)-1) {
TRI_SYSTEM_ERROR();
TRI_set_errno(TRI_ERROR_SYS_ERROR);
TRI_CLOSE(fd);

LOG_TOPIC("dfc52", ERR, arangodb::Logger::PREGEL) << _logPrefix
<< "cannot seek in " << label() << ": " << TRI_GET_ERRORBUF;

removeFile();
_filename.clear();

return -1;
}

return fd;
}

};
} // namespace pregel
} // namespace arangodb
Expand Down
Loading
0