[go: up one dir, main page]

0% found this document useful (0 votes)
38 views196 pages

Building Robust Inter-Process Queues in CPP

Uploaded by

Gamindu Udayanga
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
38 views196 pages

Building Robust Inter-Process Queues in CPP

Uploaded by

Gamindu Udayanga
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 196

1

Jody Hagins
jody.hagins@lseg.com
coachhagins@gmail.com
2
The Message Queue

Not a task queue

3
The Message Queue

4
The Message Queue

Not a task queue

Pass messages between components

5
The Message Queue

Not a task queue

Pass messages between components

Object oriented programming

6
The Message Queue

Not a task queue

Pass messages between components

Object oriented programming

Across threads

7
The Message Queue

Not a task queue

Pass messages between components

Object oriented programming

Across threads

Across processes

8
The Message Queue

Not a task queue

Pass messages between components

Object oriented programming

Across threads

Across processes

Across hosts

9
Coupling vs. Cohesion
"One goal of design is to minimize coupling between parts and
to maximize cohesion within them."
Multi-Paradigm Design for C++ James Coplien

An indication of the strength of interconnections between


program units.

11
TCP/UDP in STREAMS

Solaris Internals 12
Coupling vs. Cohesion

Packet w/
Packet Compression
Check Flag Check
Timestamp Flag

Compressed
Packet
Packet
process_packet uncompress

13
What Are Inter-Process Queues

14
What Are Inter-Process Queues

• FIFO

15
What Are Inter-Process Queues

• FIFO
• Supports Multiple Message Types

16
What Are Inter-Process Queues

• FIFO
• Supports Multiple Message Types
• Between Unrelated Processes

17
What Are Inter-Process Queues

• FIFO
• Supports Multiple Message Types
• Between Unrelated Processes
• Low Latency

18
What Are Inter-Process Queues

• FIFO
• Supports Multiple Message Types
• Between Unrelated Processes
• Low Latency
• Resumable

19
What Are Inter-Process Queues

• FIFO
• Supports Multiple Message Types
• Between Unrelated Processes
• Low Latency
• Resumable
• Recoverable MCAST Queues

20
What Are Inter-Process Queues

• FIFO
• Supports Multiple Message Types
• Between Unrelated Processes
• Low Latency
• Resumable
• Recoverable MCAST Queues
• Meyers API

21
What Are Inter-Process Queues

• FIFO
• Supports Multiple Message Types
• Between Unrelated Processes
• Low Latency
• Resumable
• Recoverable MCAST Queues
• Meyers API
• Pollable

22
std::queue

23
std::queue

Just push and pop

24
std::queue

Just push and pop

And a constructor or two

25
std::queue

Just push and pop

And a constructor or two

26
std::queue

Just push and pop

And a constructor or two

And a destructor and assignment operators

27
std::queue

Just push and pop

And a constructor or two

And a destructor and assignment operators

And peek at the rst and last???

28
fi
std::queue

Just push and pop

And a constructor or two

And a destructor and assignment operators

And peek at the rst and last???

Query the capacity...

29
fi
std::queue

Just push and pop

And a constructor or two

And a destructor and assignment operators

And peek at the rst and last???

Query the capacity...

More ways to push...

30
fi
std::queue

Just push and pop

And a constructor or two

And a destructor and assignment operators

And peek at the rst and last???

Query the capacity...

More ways to push...

31
fi
std::queue

Just push and pop

And a constructor or two

And a destructor and assignment operators

And peek at the rst and last???

Query the capacity...

More ways to push...

32
fi
std::queue

Just push and pop

And a constructor or two

And a destructor and assignment operators

And peek at the rst and last???

Query the capacity...

More ways to push...

33
fi
std::queue

34
std::queue

35
Separation of Concerns

36
Separation of Concerns

37
Separation of Concerns

38
Separation of Concerns
• The Queue
• Construction (metadata and bu er)
• Synchronization (races with processes starting)
• The Producer
• Must have an existing Queue
• Highlander Rule
• Can only write into queue
• The Consumer
• Must have an existing Queue
• Highlander Rule
• Can only read from queue
39
ff
Separation of Concerns
• The Queue
• Construction (metadata and bu er)
• Synchronization (races with processes starting)
• The Producer
• Must have an existing Queue
• Highlander Rule
• Can only write into queue
• The Consumer
• Must have an existing Queue
• Highlander Rule
• Can only read from queue
40
ff
Separation of Concerns
• The Queue
• Construction (metadata and bu er)
• Synchronization (races with processes starting)
• The Producer

Not Optional
• Must have an existing Queue
• Highlander Rule
• Can only write into queue
• The Consumer
• Must have an existing Queue
• Highlander Rule
• Can only read from queue
41
ff
Separation of Concerns
• The Queue
• Construction (metadata and bu er)
• Synchronization (races with processes starting)
• The Producer
• Must have an existing Queue
• Highlander Rule
• Can only write into queue
• The Consumer
• Must have an existing Queue
• Highlander Rule
• Can only read from queue
42
ff
Advisory File Lock
struct AdvisoryFileLock
{
struct Create { mode_t mode; };

explicit AdvisoryFileLock(std::filesystem::path path);


explicit AdvisoryFileLock(
std::filesystem::path path, Create create);

void swap(AdvisoryFileLock & that) noexcept;

bool try_lock();
void lock();
void unlock() noexcept;

std::filesystem::path const & path() const;


};
43
Characteristics
struct Characteristics
{
struct Signature
{
std::array<char, 32> value_;

std::string str() const


{
auto v = std::string_view(value_.data(), value_.size());
return std::string(
v.data(), std::min(v.size(), v.find('\0')));
}

constexpr auto operator <=> (Signature const &) const = default;


};

44
Characteristics
struct UUID
{
std::array<std::byte, 16> value_;

std::string str() const;

static UUID make();

constexpr auto operator <=> (UUID const &) const = default;


};

enum struct Capacity : std::uint64_t


{
};

45
Characteristics
enum struct MsgSize : std::uint32_t
{
};

enum struct MsgAlignment : std::uint32_t


{
};

enum struct CacheLineSize : std::uint16_t


{
};

46
Characteristics
enum Flags : std::uint16_t
{
SingleProducer = 0x0001,
MultiProducer = 0x0002,
SingleProducerBlocks = 0x0004,
MultiProducerBlocks = 0x0008,
SingleConsumer = 0x0010,
MultiConsumer = 0x0020,
SingleConsumerBlocks = 0x0040,
MultiConsumerBlocks = 0x0080,
Interprocess = 0x0100,
Multicast = 0x0200,
Broadcast = Multicast,
None = 0x0000,
All = 0xffff,
};
47
MetaData
template <typename PidLockT>
requires ProcessIdLockC<PidLockT>
struct MetaData
{
using PidLock = PidLockT;

enum struct BufferOffset : std::size_t


{
};

constexpr MetaData() = default;


explicit MetaData(Characteristics const & c);
explicit MetaData(
Characteristics const & c,
BufferOffset buffer_offset);

48
MetaData
std::unique_lock<PidLock> producer_lock()
{
return std::unique_lock<PidLock>(producer_lock_);
}

std::unique_lock<PidLock> producer_lock(std::try_to_lock_t arg)


{
return std::unique_lock<PidLock>(producer_lock_, arg);
}

std::unique_lock<PidLock> try_producer_lock()
{
return producer_lock(std::try_to_lock);
}

49
MetaData
std::unique_lock<PidLock> consumer_lock()
{
return std::unique_lock<PidLock>(consumer_lock_);
}

std::unique_lock<PidLock> consumer_lock(std::try_to_lock_t arg)


{
return std::unique_lock<PidLock>(consumer_lock_, arg);
}

std::unique_lock<PidLock> try_consumer_lock()
{
return consumer_lock(std::try_to_lock);
}

50
Process ID

using Value = std::conditional_t<


std::atomic<__uint128_t>::is_always_lock_free,
__uint128_t,
std::uint64_t>;
Value value_;

51
Process ID
ProcessId
ProcessId::
current()
{
static ProcessId id = [] {

return ProcessId{::getpid()};
}();
return id;
}

52
Process ID
ProcessId
ProcessId::
current()
{
static ProcessId id = [] {

return ProcessId{::getpid()};
}();
return id;
}

53
Process ID
ProcessId
ProcessId::
current()
{
static ProcessId id = [] {
::pthread_atfork(
nullptr, // prepare
nullptr, // parent
[] { id = ProcessId{::getpid()}; });
return ProcessId{::getpid()};
}();
return id;
}

54
Process ID
ProcessId
ProcessId::
current()
{
static ProcessId id = [] {
::pthread_atfork(
nullptr, // prepare
nullptr, // parent
[] { id = ProcessId{::getpid()}; });
return ProcessId{::getpid()};
}();
return id;
}

55
Fork Handling
• Process Orchestration
• Async-signal safe operations in the child
• low level system calls
• no malloc, printf, locks, stdio, most library code
• mmap

• AtFork
• Install and remove handlers
• Prioritize execution

• Here be Dragons ; Real and Scary Dragons


56
Process ID Lock
template <typename BlockPolicyT>
struct TProcessIdLock
{
using BlockPolicy = BlockPolicyT;

TProcessIdLock() = default;

explicit TProcessIdLock(std::in_place_t inplace);

bool try_lock()
{
return pidlock_detail::try_lock_impl(
data_.pid,
ProcessId::current());
}

57
Process ID Lock

void lock()
{
auto const me = ProcessId::current();
for (int i = 0; i < 10; ++i) {
if (pidlock_detail::try_lock_impl(data_.pid, me)) {
return;
}
std::this_thread::yield();
}
data_.block_policy().wait([&] {
return pidlock_detail::try_lock_impl(data_.pid, me);
});
}

58
Process ID Lock

void unlock()
{
auto me = ProcessId::current();
data_.pid.compare_exchange_strong(
me,
ProcessId::null());
data_.block_policy().notify_one();
}

59
Process ID Lock

void unlock()
{
auto me = ProcessId::current();
data_.pid.compare_exchange_strong(
me,
ProcessId::null());
data_.block_policy().notify_one();
}

ProcessId pid() const


{
return data_.pid.load(std::memory_order_acquire);
}

60
Process ID Lock
bool try_lock_impl(wjh::Atomic<ProcessId> & pid, ProcessId const & me)
{
auto expected = PID::null();
if (exchange(pid, expected, me)) return true;
if (expected != me) {
if (auto p = PID::maybe(expected.pid());
not p || expected != *p)
{
exchange(pid, expected, PID::null());
expected = PID::null();
if (exchange(pid, expected, me)) return true;
}
}
return false;
}

61
Process ID Lock
bool try_lock_impl(wjh::Atomic<ProcessId> & pid, ProcessId const & me)
{
auto expected = PID::null();
if (exchange(pid, expected, me)) return true;
if (expected != me) {
if (auto p = PID::maybe(expected.pid());
not p || expected != *p)
{
exchange(pid, expected, PID::null());
expected = PID::null();
if (exchange(pid, expected, me)) return true;
}
}
return false;
}

62
Process ID Lock
bool try_lock_impl(wjh::Atomic<ProcessId> & pid, ProcessId const & me)
{
auto expected = PID::null();
if (exchange(pid, expected, me)) return true;
if (expected != me) {
if (auto p = PID::maybe(expected.pid());
not p || expected != *p)
{
exchange(pid, expected, PID::null());
expected = PID::null();
if (exchange(pid, expected, me)) return true;
}
}
return false;
}

63
Process ID Lock
bool try_lock_impl(wjh::Atomic<ProcessId> & pid, ProcessId const & me)
{
auto expected = PID::null();
if (exchange(pid, expected, me)) return true;
if (expected != me) {
if (auto p = PID::maybe(expected.pid());
not p || expected != *p)
{
exchange(pid, expected, PID::null());
expected = PID::null();
if (exchange(pid, expected, me)) return true;
}
}
return false;
}

64
Process ID Lock
bool try_lock_impl(wjh::Atomic<ProcessId> & pid, ProcessId const & me)
{
auto expected = PID::null();
if (exchange(pid, expected, me)) return true;
if (expected != me) {
if (auto p = PID::maybe(expected.pid());
not p || expected != *p)
{
exchange(pid, expected, PID::null());
expected = PID::null();
if (exchange(pid, expected, me)) return true;
}
}
return false;
}

65
Process ID Lock
bool try_lock_impl(wjh::Atomic<ProcessId> & pid, ProcessId const & me)
{
auto expected = PID::null();
if (exchange(pid, expected, me)) return true;
if (expected != me) {
if (auto p = PID::maybe(expected.pid());
not p || expected != *p)
{
exchange(pid, expected, PID::null());
expected = PID::null();
if (exchange(pid, expected, me)) return true;
}
}
return false;
}

66
Process ID Lock
bool try_lock_impl(wjh::Atomic<ProcessId> & pid, ProcessId const & me)
{
auto expected = PID::null();
if (exchange(pid, expected, me)) return true;
if (expected != me) {
if (auto p = PID::maybe(expected.pid());
not p || expected != *p)
{
exchange(pid, expected, PID::null());
expected = PID::null();
if (exchange(pid, expected, me)) return true;
}
}
return false;
}

67
Process ID Lock
bool try_lock_impl(wjh::Atomic<ProcessId> & pid, ProcessId const & me)
{
auto expected = PID::null();
if (exchange(pid, expected, me)) return true;
if (expected != me) {
if (auto p = PID::maybe(expected.pid());
not p || expected != *p)
{
exchange(pid, expected, PID::null());
expected = PID::null();
if (exchange(pid, expected, me)) return true;
}
}
return false;
}

68
Separation of Concerns
• The Queue
• Construction (metadata and bu er)
• Synchronization (races with processes starting)
• The Producer
• Must have an existing Queue
• Highlander Rule
• Can only write into queue
• The Consumer
• Must have and existing Queue
• Highlander Rule
• Can only read from queue
69
ff
Producer
template <typename InvocableT, typename... ArgTs>
std::invoke_result_t<InvocableT, Writer &, ArgTs...>
write(InvocableT && invocable, ArgTs &&... args) noexcept(
noexcept(std::invoke(
std::declval<InvocableT>(),
std::declval<Writer &>(),
std::declval<ArgTs>()...)));

70
Producer
template <typename InvocableT, typename... ArgTs>
std::invoke_result_t<InvocableT, Writer &, ArgTs...>
write(InvocableT && invocable, ArgTs &&... args) noexcept(
noexcept(std::invoke(
std::declval<InvocableT>(),
std::declval<Writer &>(),
std::declval<ArgTs>()...)));

71
Producer
class Writer
{
template <std::size_t N>
void write(std::span<std::byte const, N> data) noexcept;

template <TriviallyCopyableC T>


T & emplace(auto &&... args) noexcept(
std::is_nothrow_constructible_v<T, decltype(args)...>)
requires std::constructible_from<T, decltype(args)...>;

template <std::size_t Alignment = 1>


void write(void const * data, std::size_t size) noexcept;
};

72
Producer
class Writer
{
template <std::size_t N>
void write(std::span<std::byte const, N> data) noexcept;

template <TriviallyCopyableC T>


T & emplace(auto &&... args) noexcept(
std::is_nothrow_constructible_v<T, decltype(args)...>)
requires std::constructible_from<T, decltype(args)...>;

template <std::size_t Alignment = 1>


void write(void const * data, std::size_t size) noexcept;
};

73
Producer
class Writer
{
template <std::size_t N>
void write(std::span<std::byte const, N> data) noexcept;

template <TriviallyCopyableC T>


T & emplace(auto &&... args) noexcept(
std::is_nothrow_constructible_v<T, decltype(args)...>)
requires std::constructible_from<T, decltype(args)...>;

template <std::size_t Alignment = 1>


void write(void const * data, std::size_t size) noexcept;
};

74
Producer
class Writer
{
template <std::size_t N>
void write(std::span<std::byte const, N> data) noexcept;

template <TriviallyCopyableC T>


T & emplace(auto &&... args) noexcept(
std::is_nothrow_constructible_v<T, decltype(args)...>)
requires std::constructible_from<T, decltype(args)...>;

template <std::size_t Alignment = 1>


void write(void const * data, std::size_t size) noexcept;
};

75
Separation of Concerns
• The Queue
• Construction (metadata and bu er)
• Synchronization (races with processes starting)
• The Producer
• Must have an existing Queue
• Highlander Rule
• Can only write into queue
• The Consumer
• Must have and existing Queue
• Highlander Rule
• Can only read from queue
76
ff
Consumer
static constexpr std::uint64_t no_data = 0;
static constexpr std::uint64_t lapped = -1;

tl::expected<std::size_t, std::uint64_t> try_read(


std::span<std::byte> data);

77
The Daugaard Queue

78
The Daugaard Queue
// Copyright 2018 Kaspar Daugaard. For educational purposes only.
// See http://daugaard.org/blog/writing-a-fast-and-versatile-spsc-ring-
buffer

https://www.daugaard.org/blog/

79
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};

SharedState m_WriterShared;
SharedState m_ReaderShared;

80
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};

SharedState m_WriterShared;
SharedState m_ReaderShared;

81
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};

SharedState m_WriterShared;
SharedState m_ReaderShared;

82
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};

SharedState m_WriterShared;
SharedState m_ReaderShared;

83
Daugaard: Local State
class RingBuffer { // ...
// Writer and reader's local state.
struct alignas(CACHE_LINE_SIZE) LocalState
{
LocalState()
: buffer(nullptr), pos(0), end(0), base(0), size(0) {}

char* buffer;
size_t pos;
size_t end;
size_t base;
size_t size;
};

LocalState m_Writer;
LocalState m_Reader;
84
Daugaard: Local State
class RingBuffer { // ...
// Writer and reader's local state.
struct alignas(CACHE_LINE_SIZE) LocalState
{
LocalState()
: buffer(nullptr), pos(0), end(0), base(0), size(0) {}

char* buffer;
size_t pos;
size_t end;
size_t base;
size_t size;
};

LocalState m_Writer;
LocalState m_Reader;
85
Daugaard: Local State
class RingBuffer { // ...
// Writer and reader's local state.
struct alignas(CACHE_LINE_SIZE) LocalState
{
LocalState()
: buffer(nullptr), pos(0), end(0), base(0), size(0) {}

char* buffer;
size_t pos;
size_t end;
size_t base;
size_t size;
};

LocalState m_Writer;
LocalState m_Reader;
86
Daugaard: Local State
class RingBuffer { // ...
// Writer and reader's local state.
struct alignas(CACHE_LINE_SIZE) LocalState
{
LocalState()
: buffer(nullptr), pos(0), end(0), base(0), size(0) {}

char* buffer;
size_t pos;
size_t end;
size_t base;
size_t size;
};

LocalState m_Writer;
LocalState m_Reader;
87
Daugaard: Local State
class RingBuffer { // ...
// Writer and reader's local state.
struct alignas(CACHE_LINE_SIZE) LocalState
{
LocalState()
: buffer(nullptr), pos(0), end(0), base(0), size(0) {}

char* buffer;
size_t pos;
size_t end;
size_t base;
size_t size;
};

LocalState m_Writer;
LocalState m_Reader;
88
Daugaard: Local State
class RingBuffer { // ...
// Writer and reader's local state.
struct alignas(CACHE_LINE_SIZE) LocalState
{
LocalState()
: buffer(nullptr), pos(0), end(0), base(0), size(0) {}

char* buffer;
size_t pos;
size_t end;
size_t base;
size_t size;
};

LocalState m_Writer;
LocalState m_Reader;
89
Daugaard: Write

90
Daugaard: Read

91
Daugaard: Write

92
Daugaard: Write

93
Daugaard: Write

94
Daugaard: Write

// Write an element to the buffer.


template <typename T>
FORCE_INLINE void Write(const T& value)
{
void* dest = PrepareWrite(sizeof(T), alignof(T));
new(dest) T(value);
}

95
Daugaard: Write

// Write an element to the buffer.


template <typename T>
FORCE_INLINE void Write(const T& value)
{
void* dest = PrepareWrite(sizeof(T), alignof(T));
new(dest) T(value);
}

96
Daugaard: Write
void* RingBuffer::PrepareWrite(size_t size, size_t alignment)
{
size_t pos = Align(m_Writer.pos, alignment);
size_t end = pos + size;
if (end > m_Writer.end)
GetBufferSpaceToWriteTo(pos, end);
m_Writer.pos = end;
return m_Writer.buffer + pos;
}

97
Daugaard: Write
void* RingBuffer::PrepareWrite(size_t size, size_t alignment)
{
size_t pos = Align(m_Writer.pos, alignment);
size_t end = pos + size;
if (end > m_Writer.end)
GetBufferSpaceToWriteTo(pos, end);
m_Writer.pos = end;
return m_Writer.buffer + pos;
}

98
Daugaard: Write
void* RingBuffer::PrepareWrite(size_t size, size_t alignment)
{
size_t pos = Align(m_Writer.pos, alignment);
size_t end = pos + size;
if (end > m_Writer.end)
GetBufferSpaceToWriteTo(pos, end);
m_Writer.pos = end;
return m_Writer.buffer + pos;
}

99
Daugaard: Write
void* RingBuffer::PrepareWrite(size_t size, size_t alignment)
{
size_t pos = Align(m_Writer.pos, alignment);
size_t end = pos + size;
if (end > m_Writer.end)
GetBufferSpaceToWriteTo(pos, end);
m_Writer.pos = end;
return m_Writer.buffer + pos;
}

100
Daugaard: Write
void* RingBuffer::PrepareWrite(size_t size, size_t alignment)
{
size_t pos = Align(m_Writer.pos, alignment);
size_t end = pos + size;
if (end > m_Writer.end)
GetBufferSpaceToWriteTo(pos, end);
m_Writer.pos = end;
return m_Writer.buffer + pos;
}

101
Daugaard: Write

102
Daugaard: Write

103
Daugaard: Write
// Write an array of elements to the buffer.
template <typename T>
FORCE_INLINE void WriteArray(const T* values, size_t count)
{
void* dest = PrepareWrite(sizeof(T) * count, alignof(T));
for (size_t i = 0; i < count; i++)
new(static_cast<T*>(dest) + i) T(values[i]);
}

104
Daugaard: Write
// Write an array of elements to the buffer.
template <typename T>
FORCE_INLINE void WriteArray(const T* values, size_t count)
{
void* dest = PrepareWrite(sizeof(T) * count, alignof(T));
for (size_t i = 0; i < count; i++)
new(static_cast<T*>(dest) + i) T(values[i]);
}

105
Daugaard: Write

106
Daugaard: Write
void RingBuffer::FinishWrite()
{
m_WriterShared.pos.store(
m_Writer.base + m_Writer.pos,
std::memory_order_release);
}

107
Daugaard: Read

108
Daugaard: Read

109
Daugaard: Read

110
Daugaard: Read

// Read an element from the buffer.


template <typename T>
FORCE_INLINE const T& Read()
{
void* src = PrepareRead(sizeof(T), alignof(T));
return *static_cast<T*>(src);
}

111
Daugaard: Read

// Read an element from the buffer.


template <typename T>
FORCE_INLINE const T& Read()
{
void* src = PrepareRead(sizeof(T), alignof(T));
return *static_cast<T*>(src);
}

112
Daugaard: Read

// Read an element from the buffer.


template <typename T>
FORCE_INLINE const T& Read()
{
void* src = PrepareRead(sizeof(T), alignof(T));
return *static_cast<T*>(src);
}

113
Daugaard: Read

114
Daugaard: Read

115
Daugaard: Read

template <typename T>


FORCE_INLINE const T* ReadArray(size_t count)
{
void* src = PrepareRead(sizeof(T) * count, alignof(T));
return static_cast<T*>(src);
}

116
Daugaard: Read

117
Daugaard: Initialization

118
Daugaard: Initialization
// Initialize. Buffer must have required alignment. Size must be a
// power of two.
void Initialize(void* buffer, size_t size)
{
Reset();
m_Reader.buffer = m_Writer.buffer = static_cast<char*>(buffer);
m_Reader.size = m_Writer.size = m_Writer.end = size;
}

void Reset()
{
m_Reader = m_Writer = LocalState();
m_ReaderShared.pos = m_WriterShared.pos = 0;
}

119
Daugaard: Initialization
// Initialize. Buffer must have required alignment. Size must be a
// power of two.
void Initialize(void* buffer, size_t size)
{
Reset();
m_Reader.buffer = m_Writer.buffer = static_cast<char*>(buffer);
m_Reader.size = m_Writer.size = m_Writer.end = size;
}

void Reset()
{
m_Reader = m_Writer = LocalState();
m_ReaderShared.pos = m_WriterShared.pos = 0;
}

120
Daugaard: Initialization
// Initialize. Buffer must have required alignment. Size must be a
// power of two.
void Initialize(void* buffer, size_t size)
{
Reset();
m_Reader.buffer = m_Writer.buffer = static_cast<char*>(buffer);
m_Reader.size = m_Writer.size = m_Writer.end = size;
}

void Reset()
{
m_Reader = m_Writer = LocalState();
m_ReaderShared.pos = m_WriterShared.pos = 0;
}

121
Daugaard: Initialization
// Initialize. Buffer must have required alignment. Size must be a
// power of two.
void Initialize(void* buffer, size_t size)
{
Reset();
m_Reader.buffer = m_Writer.buffer = static_cast<char*>(buffer);
m_Reader.size = m_Writer.size = m_Writer.end = size;
}

void Reset()
{
m_Reader = m_Writer = LocalState();
m_ReaderShared.pos = m_WriterShared.pos = 0;
}

122
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};

SharedState m_WriterShared;
SharedState m_ReaderShared;

123
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};

SharedState m_WriterShared;
SharedState m_ReaderShared;

124
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};

SharedState m_WriterShared;
SharedState m_ReaderShared;

125
Implicit Lifetime

126
Implicit Lifetime
struct X
{
int a, b;
};

X * p = static_cast<X *>(std::malloc(sizeof(X)));
p->a = 1;
p->b = 2;

127
Implicit Lifetime

128
Implicit Lifetime

129
Implicit Lifetime

130
Implicit Lifetime

131
Implicit Lifetime

132
Implicit Lifetime

133
Implicit Lifetime

134
Implicit Lifetime

135
Implicit Lifetime

136
Implicit Lifetime

class Foo
{
// ...
Foo(tag_t) = default;
};

137
Implicit Lifetime

138
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};

SharedState m_WriterShared;
SharedState m_ReaderShared;

139
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};

SharedState m_WriterShared;
SharedState m_ReaderShared;

140
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};

SharedState m_WriterShared;
SharedState m_ReaderShared;

141
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};

SharedState m_WriterShared;
SharedState m_ReaderShared;

142
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};

SharedState m_WriterShared;
SharedState m_ReaderShared;

143
The Hinnant Rule

144
Pop Quiz

struct Foo
{
void operator = (Foo &&) = delete;
};

145
Pop Quiz

struct Foo
{
void operator = (Foo &&) = delete;
};

static_assert(std::is_default_constructible_v<Foo>);
static_assert(std::is_copy_constructible_v<Foo>);
static_assert(std::is_move_constructible_v<Foo>);
static_assert(std::is_copy_assignable_v<Foo>);
static_assert(std::is_move_assignable_v<Foo>);
static_assert(std::is_destructible_v<Foo>);

146
The Hinnant Rule

147
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};

SharedState m_WriterShared;
SharedState m_ReaderShared;

148
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
size_t pos;
};

SharedState m_WriterShared;
SharedState m_ReaderShared;

149
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
Atomic<size_t> pos;
};

SharedState m_WriterShared;
SharedState m_ReaderShared;

150
std::atomic

151
Implicit Lifetime
• Messages must be implicit lifetime
• Must become comfortable with this for sharing data across
processes

152
Cache Lines
#define CACHE_LINE_SIZE 64

153
Daugaard: Cache Lines
#if defined(CACHE_LINE_SIZE)
inline constexpr std::size_t cache_line_size = CACHE_LINE_SIZE;
#elif defined(__cpp_lib_hardware_interference_size) && \
(__cpp_lib_hardware_interference_size >= 201703L)
inline constexpr std::size_t cache_line_size =
std::hardware_destructive_interference_size;
#else
#error "cache_line_size cannot be defined"
#endif

154
Daugaard: Cache Lines

155
Magic Buffer
1 MB

156
Magic Buffer
1 MB

struct Args
{
void * addr = nullptr;
size_type len = 0;
int fd = -1;
size_type offset = 0;
bool magic = false;
size_type page_size = 0;
};

157
Magic Buffer
2 MB

// Perform an anonymous mapping of twice the requested length.


auto const saved_fd = std::exchange(args.fd, -1);
auto const offset = std::exchange(args.offset, 0);
args.len *= 2;
args.magic = false;
auto mapped = do_mapping(args);

158
Magic Buffer
1 MB 1 MB

// Map the file over the first half of the anonymous mapping.
args.fd = saved_fd;
args.len /= 2;
args.offset = offset;
args.addr = mapped->address;
auto tmp = do_mapping(args);
tmp->release();

159
Magic Buffer
1 MB 1 MB

// Map the file over the second half of the anonymous mapping.
args.addr = static_cast<std::byte *>(mapped->address) + args.len;
tmp = do_mapping(args);
tmp->release();

addr_ = std::bit_cast<pointer>(mapped->address);
size_ = mapped->length / 2;
mapped_ = std::move(mapped);

160
Magic Buffer
1 MB 1 MB

static constexpr int prot = PROT_READ | PROT_WRITE;


int const flags = [&] {
int f = MAP_SHARED;
if (addr) f |= MAP_FIXED;
if (fd < 0) f |= MAP_ANONYMOUS;
return f;
}();
auto const mapped = ::mmap(
addr, len, prot, flags, fd, off_t(offset));

161
MCAST Queues

162
MCAST Queues
• Single Producer, Multiple Consumers
• Every consumer gets every message (mcast, broadcast)
• Producer never blocks
• Slow consumers may miss messages
• Most really fast ones have some kind of UB (seqlock, etc)

163
MCAST Queues

164
MCAST Queues

165
MCAST Queues
• Single Producer, Multiple Consumers
• Every consumer gets every message (mcast, broadcast)
• Producer never blocks
• Slow consumers may miss messages
• Most really fast ones have some kind of UB (seqlock, etc)

166
MCAST Queues
• The Queue
• Lock for becoming the producer
• If enabled, a lock for becoming the special consumer

167
MCAST Producer
template <typename InvocableT, typename... ArgTs>
• The Producer
std::invoke_result_t<InvocableT, Producer::Writer &, ArgTs...>
• Uses TMP or concepts
Producer::
write(InvocableT && invocable, ArgTs
• If has a special consumer &&... args)
noexcept(noexcept(std::invoke(
• Writes block on a single special consumer
std::declval<InvocableT>(),
• Otherwise, writes
std::declval<Writer never block
&>(),
std::declval<ArgTs>()...)))
{
using R = std::invoke_result_t<InvocableT, Writer &, ArgTs...>;
auto writer = Writer(info_);

168
MCAST Producer
template <typename InvocableT, typename... ArgTs>
• The Producer
std::invoke_result_t<InvocableT, Producer::Writer &, ArgTs...>
• Uses TMP or concepts
Producer::
write(InvocableT && invocable, ArgTs
• If has a special consumer &&... args)
noexcept(noexcept(std::invoke(
• Writes block on a single special consumer
std::declval<InvocableT>(),
• Otherwise, writes
std::declval<Writer never block
&>(),
std::declval<ArgTs>()...)))
{
using R = std::invoke_result_t<InvocableT, Writer &, ArgTs...>;
auto writer = Writer(info_);

169
MCAST Producer
auto call = [&]() -> R {
• The Producer
return std::invoke(
• Uses TMP or concepts
std::forward<InvocableT>(invocable),
writer,
• If has a special consumer
std::forward<ArgTs>(args)...);
}; • Writes block on a single special consumer
• Otherwise,
if constexpr writes never block
(std::is_same_v<R, void>) {
call();
info_.finish_write();
return;
} else {
R result = call();
info_.finish_write();
return result;
}
}
170
MCAST Producer
auto call = [&]() -> R {
• The Producer
return std::invoke(
• Uses TMP or concepts
std::forward<InvocableT>(invocable),
writer,
• If has a special consumer
std::forward<ArgTs>(args)...);
}; • Writes block on a single special consumer
• Otherwise,
if constexpr writes never block
(std::is_same_v<R, void>) {
call();
info_.finish_write();
return;
} else {
R result = call();
info_.finish_write();
return result;
}
}
171
MCAST Producer
template <std::size_t N>
void
• The Producer
• Uses TMP or concepts
Producer::Writer::
write(std::span<std::byte const,
• If has a special consumerN> data) noexcept
{
• Writes
auto const sizeblock on a single special consumer
= std::uint32_t(data.size());
• Otherwise,
assert(size writes never block
== data.size());
auto buffer = info_.prepare_buffer<8>(sizeof(std::uint64_t) + size);
std::memcpy(buffer.data(), &size, sizeof(size));
std::memcpy(
buffer.data() + sizeof(std::uint64_t), data.data(), size);
info_.local_counter += buffer.size();
}

172
MCAST Producer
template <std::size_t N>
void
• The Producer
• Uses TMP or concepts
Producer::Writer::
write(std::span<std::byte const,
• If has a special consumerN> data) noexcept
{
• Writes
auto const sizeblock on a single special consumer
= std::uint32_t(data.size());
• Otherwise,
assert(size writes never block
== data.size());
auto buffer = info_.prepare_buffer<8>(sizeof(std::uint64_t) + size);
std::memcpy(buffer.data(), &size, sizeof(size));
std::memcpy(
buffer.data() + sizeof(std::uint64_t), data.data(), size);
info_.local_counter += buffer.size();
}

173
MCAST Producer
template <std::size_t N>
void
• The Producer
• Uses TMP or concepts
Producer::Writer::
write(std::span<std::byte const,
• If has a special consumerN> data) noexcept
{
• Writes
auto const sizeblock on a single special consumer
= std::uint32_t(data.size());
• Otherwise,
assert(size writes never block
== data.size());
auto buffer = info_.prepare_buffer<8>(sizeof(std::uint64_t) + size);
std::memcpy(buffer.data(), &size, sizeof(size));
std::memcpy(
buffer.data() + sizeof(std::uint64_t), data.data(), size);
info_.local_counter += buffer.size();
}

174
MCAST Producer
template <std::size_t N>
void
• The Producer
• Uses TMP or concepts
Producer::Writer::
write(std::span<std::byte const,
• If has a special consumerN> data) noexcept
{
• Writes
auto const sizeblock on a single special consumer
= std::uint32_t(data.size());
• Otherwise,
assert(size writes never block
== data.size());
auto buffer = info_.prepare_buffer<8>(sizeof(std::uint64_t) + size);
std::memcpy(buffer.data(), &size, sizeof(size));
std::memcpy(
buffer.data() + sizeof(std::uint64_t), data.data(), size);
info_.local_counter += buffer.size();
}

175
MCAST Producer
template <std::size_t Alignment>
void
• The Producer
• Uses TMP or concepts
Producer::Writer::
write(void const * data, std::size_t
• If has a special consumer size) noexcept
{
• Writes
auto buffer block on a single special consumer
= info_.prepare_buffer<Alignment>(size);
• Otherwise, writes never
std::memcpy(buffer.data(), block
data, size);
info_.local_counter += size;
}

176
MCAST Producer
template <std::size_t Alignment>
void
• The Producer
• Uses TMP or concepts
Producer::Writer::
write(void const * data, std::size_t
• If has a special consumer size) noexcept
{
• Writes
auto buffer block on a single special consumer
= info_.prepare_buffer<Alignment>(size);
• Otherwise, writes never
std::memcpy(buffer.data(), block
data, size);
info_.local_counter += size;
}

177
MCAST Producer
template <std::size_t Alignment>
void
• The Producer
• Uses TMP or concepts
Producer::Writer::
write(void const * data, std::size_t
• If has a special consumer size) noexcept
{
• Writes
auto buffer block on a single special consumer
= info_.prepare_buffer<Alignment>(size);
• Otherwise, writes never
std::memcpy(buffer.data(), block
data, size);
info_.local_counter += size;
}

178
MCAST Producer
template <std::size_t Alignment>
void
• The Producer
• Uses TMP or concepts
Producer::Writer::
write(void const * data, std::size_t
• If has a special consumer size) noexcept
{
• Writes
auto buffer block on a single special consumer
= info_.prepare_buffer<Alignment>(size);
• Otherwise, writes never
std::memcpy(buffer.data(), block
data, size);
info_.local_counter += size;
}

179
MCAST Producer
template <TriviallyCopyableC T>
T &
• The Producer
• Uses TMP or concepts
Producer::Writer::
emplace(auto &&... args) noexcept(
• If has a special consumer
std::is_nothrow_constructible_v<T, decltype(args)...>)
• Writes block on a single special
requires std::constructible_from<T, consumer
decltype(args)...>
{ • Otherwise, writes never block
auto buffer = info_.prepare_buffer<alignof(T)>(sizeof(T));
auto t = new (static_cast<void *>(buffer.data()))
T(std::forward<decltype(args)>(args)...);
info_.local_counter += sizeof(T);
return *t;
}

180
MCAST Producer
template <TriviallyCopyableC T>
T &
• The Producer
• Uses TMP or concepts
Producer::Writer::
emplace(auto &&... args) noexcept(
• If has a special consumer
std::is_nothrow_constructible_v<T, decltype(args)...>)
• Writes block on a single special
requires std::constructible_from<T, consumer
decltype(args)...>
{ • Otherwise, writes never block
auto buffer = info_.prepare_buffer<alignof(T)>(sizeof(T));
auto t = new (static_cast<void *>(buffer.data()))
T(std::forward<decltype(args)>(args)...);
info_.local_counter += sizeof(T);
return *t;
}

181
MCAST Producer
template <TriviallyCopyableC T>
T &
• The Producer
• Uses TMP or concepts
Producer::Writer::
emplace(auto &&... args) noexcept(
• If has a special consumer
std::is_nothrow_constructible_v<T, decltype(args)...>)
• Writes block on a single special
requires std::constructible_from<T, consumer
decltype(args)...>
{ • Otherwise, writes never block
auto buffer = info_.prepare_buffer<alignof(T)>(sizeof(T));
auto t = new (static_cast<void *>(buffer.data()))
T(std::forward<decltype(args)>(args)...);
info_.local_counter += sizeof(T);
return *t;
}

182
MCAST Producer
template <TriviallyCopyableC T>
T &
• The Producer
• Uses TMP or concepts
Producer::Writer::
emplace(auto &&... args) noexcept(
• If has a special consumer
std::is_nothrow_constructible_v<T, decltype(args)...>)
• Writes block on a single special
requires std::constructible_from<T, consumer
decltype(args)...>
{ • Otherwise, writes never block
auto buffer = info_.prepare_buffer<alignof(T)>(sizeof(T));
auto t = new (static_cast<void *>(buffer.data()))
T(std::forward<decltype(args)>(args)...);
info_.local_counter += sizeof(T);
return *t;
}

183
MCAST Producer
inline Producer::Writer::
• The Producer
Writer(Info & x)
• Uses TMP or concepts
: info_(x)
{ • If has a special consumer
// Save the counter, so we can restore it later, if the write
• Writeswas
// operation block on a for
aborted single special
some reasonconsumer
including a user exception.
• Otherwise, writes
info_.saved_counter never block
= info_.local_counter;
}

184
MCAST Producer
inline Producer::Writer::
• The
~Writer()
Producer
{ • Uses TMP or concepts
// Restore the counter. Safe
• If has a special consumerto do always because the tmp_counter is
// set to the local_counter when finish_write is called.
• Writes block =oninfo_.saved_counter;
info_.local_counter a single special consumer
} • Otherwise, writes never block

185
MCAST Producer
inline void
• The Producer
Producer::Info::
• Uses TMP or concepts
finish_write()
{ • If has a special consumer
shared->read_counter.store(
• Writes blockstd::memory_order_release);
local_counter, on a single special consumer
• Otherwise, writes never block
// Update the saved_counter because the write operation has been
// committed, and there is no longer a need to restore the original
// value.
saved_counter = local_counter;
}

186
MCAST Consumer
inline tl::expected<std::size_t, std::int_fast64_t>
• The
Consumer::
Producer
• Uses TMP or conceptsdata)
try_read(std::span<std::byte>
{ • If has a special consumer
auto read_counter = aligned<Alignment::Int64>(info_.local_counter);
• Writes size;
std::uint32_t block on a single special consumer
• Otherwise, +writes
if (read_counter never block
sizeof(size) >= info_.cached_counter) {
info_.cached_counter = info_.shared->read_counter.load(
std::memory_order_acquire);
if (read_counter + sizeof(size) > info_.cached_counter) {
return tl::unexpected(no_data);
}
}
auto const buf = info_.buffer + (read_counter & info_.mask);
std::memcpy(&size, buf, sizeof(size));

187
MCAST Consumer
inline tl::expected<std::size_t, std::int_fast64_t>
• The
Consumer::
Producer
• Uses TMP or conceptsdata)
try_read(std::span<std::byte>
{ • If has a special consumer
auto read_counter = aligned<Alignment::Int64>(info_.local_counter);
• Writes size;
std::uint32_t block on a single special consumer
• Otherwise, +writes
if (read_counter never block
sizeof(size) >= info_.cached_counter) {
info_.cached_counter = info_.shared->read_counter.load(
std::memory_order_acquire);
if (read_counter + sizeof(size) > info_.cached_counter) {
return tl::unexpected(no_data);
}
}
auto const buf = info_.buffer + (read_counter & info_.mask);
std::memcpy(&size, buf, sizeof(size));

188
MCAST Consumer
inline tl::expected<std::size_t, std::int_fast64_t>
• The
Consumer::
Producer
• Uses TMP or conceptsdata)
try_read(std::span<std::byte>
{ • If has a special consumer
auto read_counter = aligned<Alignment::Int64>(info_.local_counter);
• Writes size;
std::uint32_t block on a single special consumer
• Otherwise, +writes
if (read_counter never block
sizeof(size) >= info_.cached_counter) {
info_.cached_counter = info_.shared->read_counter.load(
std::memory_order_acquire);
if (read_counter + sizeof(size) > info_.cached_counter) {
return tl::unexpected(no_data);
}
}
auto const buf = info_.buffer + (read_counter & info_.mask);
std::memcpy(&size, buf, sizeof(size));

189
MCAST Consumer
inline tl::expected<std::size_t, std::int_fast64_t>
• The
Consumer::
Producer
• Uses TMP or conceptsdata)
try_read(std::span<std::byte>
{ • If has a special consumer
auto read_counter = aligned<Alignment::Int64>(info_.local_counter);
• Writes size;
std::uint32_t block on a single special consumer
• Otherwise, +writes
if (read_counter never block
sizeof(size) >= info_.cached_counter) {
info_.cached_counter = info_.shared->read_counter.load(
std::memory_order_acquire);
if (read_counter + sizeof(size) > info_.cached_counter) {
return tl::unexpected(no_data);
}
}
auto const buf = info_.buffer + (read_counter & info_.mask);
std::memcpy(&size, buf, sizeof(size));

190
MCAST Consumer
inline tl::expected<std::size_t, std::int_fast64_t>
• The
Consumer::
Producer
• Uses TMP or conceptsdata)
try_read(std::span<std::byte>
{ • If has a special consumer
auto read_counter = aligned<Alignment::Int64>(info_.local_counter);
• Writes size;
std::uint32_t block on a single special consumer
• Otherwise, +writes
if (read_counter never block
sizeof(size) >= info_.cached_counter) {
info_.cached_counter = info_.shared->read_counter.load(
std::memory_order_acquire);
if (read_counter + sizeof(size) > info_.cached_counter) {
return tl::unexpected(no_data);
}
}
auto const buf = info_.buffer + (read_counter & info_.mask);
std::memcpy(&size, buf, sizeof(size));

191
MCAST Consumer
inline tl::expected<std::size_t, std::int_fast64_t>
• The
Consumer::
Producer
• Uses TMP or conceptsdata)
try_read(std::span<std::byte>
{ • If has a special consumer
auto read_counter = aligned<Alignment::Int64>(info_.local_counter);
• Writes size;
std::uint32_t block on a single special consumer
• Otherwise, +writes
if (read_counter never block
sizeof(size) >= info_.cached_counter) {
info_.cached_counter = info_.shared->read_counter.load(
std::memory_order_acquire);
if (read_counter + sizeof(size) > info_.cached_counter) {
return tl::unexpected(no_data);
}
}
auto const buf = info_.buffer + (read_counter & info_.mask);
std::memcpy(&size, buf, sizeof(size));

192
MCAST Consumer
if (size > data.size()) {
• The Producer
return tl::unexpected(is_lapped() ? lapped :
• Uses TMP or concepts std::int_fast64_t(size));
} • If has a special consumer
read_counter += sizeof(std::uint64_t) + size;
• Writes block
if (read_counter on a single special consumer
> info_.cached_counter) {
• Otherwise,
return writes never block
tl::unexpected(no_data);
}
std::memcpy(data.data(), buf + sizeof(std::uint64_t), size);
if (is_lapped()) {
return tl::unexpected(lapped);
}
info_.local_counter = read_counter;
return size;
}

193
MCAST Consumer
if (size > data.size()) {
• The Producer
return tl::unexpected(is_lapped() ? lapped :
• Uses TMP or concepts std::int_fast64_t(size));
} • If has a special consumer
read_counter += sizeof(std::uint64_t) + size;
• Writes block
if (read_counter on a single special consumer
> info_.cached_counter) {
• Otherwise,
return writes never block
tl::unexpected(no_data);
}
std::memcpy(data.data(), buf + sizeof(std::uint64_t), size);
if (is_lapped()) {
return tl::unexpected(lapped);
}
info_.local_counter = read_counter;
return size;
}

194
MCAST Consumer
if (size > data.size()) {
• The Producer
return tl::unexpected(is_lapped() ? lapped :
• Uses TMP or concepts std::int_fast64_t(size));
} • If has a special consumer
read_counter += sizeof(std::uint64_t) + size;
• Writes block
if (read_counter on a single special consumer
> info_.cached_counter) {
• Otherwise,
return writes never block
tl::unexpected(no_data);
}
std::memcpy(data.data(), buf + sizeof(std::uint64_t), size);
if (is_lapped()) {
return tl::unexpected(lapped);
}
info_.local_counter = read_counter;
return size;
}

195
MCAST Consumer
if (size > data.size()) {
• The Producer
return tl::unexpected(is_lapped() ? lapped :
• Uses TMP or concepts std::int_fast64_t(size));
} • If has a special consumer
read_counter += sizeof(std::uint64_t) + size;
• Writes block
if (read_counter on a single special consumer
> info_.cached_counter) {
• Otherwise,
return writes never block
tl::unexpected(no_data);
}
std::memcpy(data.data(), buf + sizeof(std::uint64_t), size);
if (is_lapped()) {
return tl::unexpected(lapped);
}
info_.local_counter = read_counter;
return size;
}

196

You might also like