Building Robust Inter-Process Queues in CPP
Building Robust Inter-Process Queues in CPP
Jody Hagins
jody.hagins@lseg.com
coachhagins@gmail.com
2
The Message Queue
3
The Message Queue
4
The Message Queue
5
The Message Queue
6
The Message Queue
Across threads
7
The Message Queue
Across threads
Across processes
8
The Message Queue
Across threads
Across processes
Across hosts
9
Coupling vs. Cohesion
"One goal of design is to minimize coupling between parts and
to maximize cohesion within them."
Multi-Paradigm Design for C++ James Coplien
11
TCP/UDP in STREAMS
Solaris Internals 12
Coupling vs. Cohesion
Packet w/
Packet Compression
Check Flag Check
Timestamp Flag
Compressed
Packet
Packet
process_packet uncompress
13
What Are Inter-Process Queues
14
What Are Inter-Process Queues
• FIFO
15
What Are Inter-Process Queues
• FIFO
• Supports Multiple Message Types
16
What Are Inter-Process Queues
• FIFO
• Supports Multiple Message Types
• Between Unrelated Processes
17
What Are Inter-Process Queues
• FIFO
• Supports Multiple Message Types
• Between Unrelated Processes
• Low Latency
18
What Are Inter-Process Queues
• FIFO
• Supports Multiple Message Types
• Between Unrelated Processes
• Low Latency
• Resumable
19
What Are Inter-Process Queues
• FIFO
• Supports Multiple Message Types
• Between Unrelated Processes
• Low Latency
• Resumable
• Recoverable MCAST Queues
20
What Are Inter-Process Queues
• FIFO
• Supports Multiple Message Types
• Between Unrelated Processes
• Low Latency
• Resumable
• Recoverable MCAST Queues
• Meyers API
21
What Are Inter-Process Queues
• FIFO
• Supports Multiple Message Types
• Between Unrelated Processes
• Low Latency
• Resumable
• Recoverable MCAST Queues
• Meyers API
• Pollable
22
std::queue
23
std::queue
24
std::queue
25
std::queue
26
std::queue
27
std::queue
28
fi
std::queue
29
fi
std::queue
30
fi
std::queue
31
fi
std::queue
32
fi
std::queue
33
fi
std::queue
34
std::queue
35
Separation of Concerns
36
Separation of Concerns
37
Separation of Concerns
38
Separation of Concerns
• The Queue
• Construction (metadata and bu er)
• Synchronization (races with processes starting)
• The Producer
• Must have an existing Queue
• Highlander Rule
• Can only write into queue
• The Consumer
• Must have an existing Queue
• Highlander Rule
• Can only read from queue
39
ff
Separation of Concerns
• The Queue
• Construction (metadata and bu er)
• Synchronization (races with processes starting)
• The Producer
• Must have an existing Queue
• Highlander Rule
• Can only write into queue
• The Consumer
• Must have an existing Queue
• Highlander Rule
• Can only read from queue
40
ff
Separation of Concerns
• The Queue
• Construction (metadata and bu er)
• Synchronization (races with processes starting)
• The Producer
Not Optional
• Must have an existing Queue
• Highlander Rule
• Can only write into queue
• The Consumer
• Must have an existing Queue
• Highlander Rule
• Can only read from queue
41
ff
Separation of Concerns
• The Queue
• Construction (metadata and bu er)
• Synchronization (races with processes starting)
• The Producer
• Must have an existing Queue
• Highlander Rule
• Can only write into queue
• The Consumer
• Must have an existing Queue
• Highlander Rule
• Can only read from queue
42
ff
Advisory File Lock
struct AdvisoryFileLock
{
struct Create { mode_t mode; };
bool try_lock();
void lock();
void unlock() noexcept;
44
Characteristics
struct UUID
{
std::array<std::byte, 16> value_;
45
Characteristics
enum struct MsgSize : std::uint32_t
{
};
46
Characteristics
enum Flags : std::uint16_t
{
SingleProducer = 0x0001,
MultiProducer = 0x0002,
SingleProducerBlocks = 0x0004,
MultiProducerBlocks = 0x0008,
SingleConsumer = 0x0010,
MultiConsumer = 0x0020,
SingleConsumerBlocks = 0x0040,
MultiConsumerBlocks = 0x0080,
Interprocess = 0x0100,
Multicast = 0x0200,
Broadcast = Multicast,
None = 0x0000,
All = 0xffff,
};
47
MetaData
template <typename PidLockT>
requires ProcessIdLockC<PidLockT>
struct MetaData
{
using PidLock = PidLockT;
48
MetaData
std::unique_lock<PidLock> producer_lock()
{
return std::unique_lock<PidLock>(producer_lock_);
}
std::unique_lock<PidLock> try_producer_lock()
{
return producer_lock(std::try_to_lock);
}
49
MetaData
std::unique_lock<PidLock> consumer_lock()
{
return std::unique_lock<PidLock>(consumer_lock_);
}
std::unique_lock<PidLock> try_consumer_lock()
{
return consumer_lock(std::try_to_lock);
}
50
Process ID
51
Process ID
ProcessId
ProcessId::
current()
{
static ProcessId id = [] {
return ProcessId{::getpid()};
}();
return id;
}
52
Process ID
ProcessId
ProcessId::
current()
{
static ProcessId id = [] {
return ProcessId{::getpid()};
}();
return id;
}
53
Process ID
ProcessId
ProcessId::
current()
{
static ProcessId id = [] {
::pthread_atfork(
nullptr, // prepare
nullptr, // parent
[] { id = ProcessId{::getpid()}; });
return ProcessId{::getpid()};
}();
return id;
}
54
Process ID
ProcessId
ProcessId::
current()
{
static ProcessId id = [] {
::pthread_atfork(
nullptr, // prepare
nullptr, // parent
[] { id = ProcessId{::getpid()}; });
return ProcessId{::getpid()};
}();
return id;
}
55
Fork Handling
• Process Orchestration
• Async-signal safe operations in the child
• low level system calls
• no malloc, printf, locks, stdio, most library code
• mmap
• AtFork
• Install and remove handlers
• Prioritize execution
TProcessIdLock() = default;
bool try_lock()
{
return pidlock_detail::try_lock_impl(
data_.pid,
ProcessId::current());
}
57
Process ID Lock
void lock()
{
auto const me = ProcessId::current();
for (int i = 0; i < 10; ++i) {
if (pidlock_detail::try_lock_impl(data_.pid, me)) {
return;
}
std::this_thread::yield();
}
data_.block_policy().wait([&] {
return pidlock_detail::try_lock_impl(data_.pid, me);
});
}
58
Process ID Lock
void unlock()
{
auto me = ProcessId::current();
data_.pid.compare_exchange_strong(
me,
ProcessId::null());
data_.block_policy().notify_one();
}
59
Process ID Lock
void unlock()
{
auto me = ProcessId::current();
data_.pid.compare_exchange_strong(
me,
ProcessId::null());
data_.block_policy().notify_one();
}
60
Process ID Lock
bool try_lock_impl(wjh::Atomic<ProcessId> & pid, ProcessId const & me)
{
auto expected = PID::null();
if (exchange(pid, expected, me)) return true;
if (expected != me) {
if (auto p = PID::maybe(expected.pid());
not p || expected != *p)
{
exchange(pid, expected, PID::null());
expected = PID::null();
if (exchange(pid, expected, me)) return true;
}
}
return false;
}
61
Process ID Lock
bool try_lock_impl(wjh::Atomic<ProcessId> & pid, ProcessId const & me)
{
auto expected = PID::null();
if (exchange(pid, expected, me)) return true;
if (expected != me) {
if (auto p = PID::maybe(expected.pid());
not p || expected != *p)
{
exchange(pid, expected, PID::null());
expected = PID::null();
if (exchange(pid, expected, me)) return true;
}
}
return false;
}
62
Process ID Lock
bool try_lock_impl(wjh::Atomic<ProcessId> & pid, ProcessId const & me)
{
auto expected = PID::null();
if (exchange(pid, expected, me)) return true;
if (expected != me) {
if (auto p = PID::maybe(expected.pid());
not p || expected != *p)
{
exchange(pid, expected, PID::null());
expected = PID::null();
if (exchange(pid, expected, me)) return true;
}
}
return false;
}
63
Process ID Lock
bool try_lock_impl(wjh::Atomic<ProcessId> & pid, ProcessId const & me)
{
auto expected = PID::null();
if (exchange(pid, expected, me)) return true;
if (expected != me) {
if (auto p = PID::maybe(expected.pid());
not p || expected != *p)
{
exchange(pid, expected, PID::null());
expected = PID::null();
if (exchange(pid, expected, me)) return true;
}
}
return false;
}
64
Process ID Lock
bool try_lock_impl(wjh::Atomic<ProcessId> & pid, ProcessId const & me)
{
auto expected = PID::null();
if (exchange(pid, expected, me)) return true;
if (expected != me) {
if (auto p = PID::maybe(expected.pid());
not p || expected != *p)
{
exchange(pid, expected, PID::null());
expected = PID::null();
if (exchange(pid, expected, me)) return true;
}
}
return false;
}
65
Process ID Lock
bool try_lock_impl(wjh::Atomic<ProcessId> & pid, ProcessId const & me)
{
auto expected = PID::null();
if (exchange(pid, expected, me)) return true;
if (expected != me) {
if (auto p = PID::maybe(expected.pid());
not p || expected != *p)
{
exchange(pid, expected, PID::null());
expected = PID::null();
if (exchange(pid, expected, me)) return true;
}
}
return false;
}
66
Process ID Lock
bool try_lock_impl(wjh::Atomic<ProcessId> & pid, ProcessId const & me)
{
auto expected = PID::null();
if (exchange(pid, expected, me)) return true;
if (expected != me) {
if (auto p = PID::maybe(expected.pid());
not p || expected != *p)
{
exchange(pid, expected, PID::null());
expected = PID::null();
if (exchange(pid, expected, me)) return true;
}
}
return false;
}
67
Process ID Lock
bool try_lock_impl(wjh::Atomic<ProcessId> & pid, ProcessId const & me)
{
auto expected = PID::null();
if (exchange(pid, expected, me)) return true;
if (expected != me) {
if (auto p = PID::maybe(expected.pid());
not p || expected != *p)
{
exchange(pid, expected, PID::null());
expected = PID::null();
if (exchange(pid, expected, me)) return true;
}
}
return false;
}
68
Separation of Concerns
• The Queue
• Construction (metadata and bu er)
• Synchronization (races with processes starting)
• The Producer
• Must have an existing Queue
• Highlander Rule
• Can only write into queue
• The Consumer
• Must have and existing Queue
• Highlander Rule
• Can only read from queue
69
ff
Producer
template <typename InvocableT, typename... ArgTs>
std::invoke_result_t<InvocableT, Writer &, ArgTs...>
write(InvocableT && invocable, ArgTs &&... args) noexcept(
noexcept(std::invoke(
std::declval<InvocableT>(),
std::declval<Writer &>(),
std::declval<ArgTs>()...)));
70
Producer
template <typename InvocableT, typename... ArgTs>
std::invoke_result_t<InvocableT, Writer &, ArgTs...>
write(InvocableT && invocable, ArgTs &&... args) noexcept(
noexcept(std::invoke(
std::declval<InvocableT>(),
std::declval<Writer &>(),
std::declval<ArgTs>()...)));
71
Producer
class Writer
{
template <std::size_t N>
void write(std::span<std::byte const, N> data) noexcept;
72
Producer
class Writer
{
template <std::size_t N>
void write(std::span<std::byte const, N> data) noexcept;
73
Producer
class Writer
{
template <std::size_t N>
void write(std::span<std::byte const, N> data) noexcept;
74
Producer
class Writer
{
template <std::size_t N>
void write(std::span<std::byte const, N> data) noexcept;
75
Separation of Concerns
• The Queue
• Construction (metadata and bu er)
• Synchronization (races with processes starting)
• The Producer
• Must have an existing Queue
• Highlander Rule
• Can only write into queue
• The Consumer
• Must have and existing Queue
• Highlander Rule
• Can only read from queue
76
ff
Consumer
static constexpr std::uint64_t no_data = 0;
static constexpr std::uint64_t lapped = -1;
77
The Daugaard Queue
78
The Daugaard Queue
// Copyright 2018 Kaspar Daugaard. For educational purposes only.
// See http://daugaard.org/blog/writing-a-fast-and-versatile-spsc-ring-
buffer
https://www.daugaard.org/blog/
79
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};
SharedState m_WriterShared;
SharedState m_ReaderShared;
80
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};
SharedState m_WriterShared;
SharedState m_ReaderShared;
81
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};
SharedState m_WriterShared;
SharedState m_ReaderShared;
82
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};
SharedState m_WriterShared;
SharedState m_ReaderShared;
83
Daugaard: Local State
class RingBuffer { // ...
// Writer and reader's local state.
struct alignas(CACHE_LINE_SIZE) LocalState
{
LocalState()
: buffer(nullptr), pos(0), end(0), base(0), size(0) {}
char* buffer;
size_t pos;
size_t end;
size_t base;
size_t size;
};
LocalState m_Writer;
LocalState m_Reader;
84
Daugaard: Local State
class RingBuffer { // ...
// Writer and reader's local state.
struct alignas(CACHE_LINE_SIZE) LocalState
{
LocalState()
: buffer(nullptr), pos(0), end(0), base(0), size(0) {}
char* buffer;
size_t pos;
size_t end;
size_t base;
size_t size;
};
LocalState m_Writer;
LocalState m_Reader;
85
Daugaard: Local State
class RingBuffer { // ...
// Writer and reader's local state.
struct alignas(CACHE_LINE_SIZE) LocalState
{
LocalState()
: buffer(nullptr), pos(0), end(0), base(0), size(0) {}
char* buffer;
size_t pos;
size_t end;
size_t base;
size_t size;
};
LocalState m_Writer;
LocalState m_Reader;
86
Daugaard: Local State
class RingBuffer { // ...
// Writer and reader's local state.
struct alignas(CACHE_LINE_SIZE) LocalState
{
LocalState()
: buffer(nullptr), pos(0), end(0), base(0), size(0) {}
char* buffer;
size_t pos;
size_t end;
size_t base;
size_t size;
};
LocalState m_Writer;
LocalState m_Reader;
87
Daugaard: Local State
class RingBuffer { // ...
// Writer and reader's local state.
struct alignas(CACHE_LINE_SIZE) LocalState
{
LocalState()
: buffer(nullptr), pos(0), end(0), base(0), size(0) {}
char* buffer;
size_t pos;
size_t end;
size_t base;
size_t size;
};
LocalState m_Writer;
LocalState m_Reader;
88
Daugaard: Local State
class RingBuffer { // ...
// Writer and reader's local state.
struct alignas(CACHE_LINE_SIZE) LocalState
{
LocalState()
: buffer(nullptr), pos(0), end(0), base(0), size(0) {}
char* buffer;
size_t pos;
size_t end;
size_t base;
size_t size;
};
LocalState m_Writer;
LocalState m_Reader;
89
Daugaard: Write
90
Daugaard: Read
91
Daugaard: Write
92
Daugaard: Write
93
Daugaard: Write
94
Daugaard: Write
95
Daugaard: Write
96
Daugaard: Write
void* RingBuffer::PrepareWrite(size_t size, size_t alignment)
{
size_t pos = Align(m_Writer.pos, alignment);
size_t end = pos + size;
if (end > m_Writer.end)
GetBufferSpaceToWriteTo(pos, end);
m_Writer.pos = end;
return m_Writer.buffer + pos;
}
97
Daugaard: Write
void* RingBuffer::PrepareWrite(size_t size, size_t alignment)
{
size_t pos = Align(m_Writer.pos, alignment);
size_t end = pos + size;
if (end > m_Writer.end)
GetBufferSpaceToWriteTo(pos, end);
m_Writer.pos = end;
return m_Writer.buffer + pos;
}
98
Daugaard: Write
void* RingBuffer::PrepareWrite(size_t size, size_t alignment)
{
size_t pos = Align(m_Writer.pos, alignment);
size_t end = pos + size;
if (end > m_Writer.end)
GetBufferSpaceToWriteTo(pos, end);
m_Writer.pos = end;
return m_Writer.buffer + pos;
}
99
Daugaard: Write
void* RingBuffer::PrepareWrite(size_t size, size_t alignment)
{
size_t pos = Align(m_Writer.pos, alignment);
size_t end = pos + size;
if (end > m_Writer.end)
GetBufferSpaceToWriteTo(pos, end);
m_Writer.pos = end;
return m_Writer.buffer + pos;
}
100
Daugaard: Write
void* RingBuffer::PrepareWrite(size_t size, size_t alignment)
{
size_t pos = Align(m_Writer.pos, alignment);
size_t end = pos + size;
if (end > m_Writer.end)
GetBufferSpaceToWriteTo(pos, end);
m_Writer.pos = end;
return m_Writer.buffer + pos;
}
101
Daugaard: Write
102
Daugaard: Write
103
Daugaard: Write
// Write an array of elements to the buffer.
template <typename T>
FORCE_INLINE void WriteArray(const T* values, size_t count)
{
void* dest = PrepareWrite(sizeof(T) * count, alignof(T));
for (size_t i = 0; i < count; i++)
new(static_cast<T*>(dest) + i) T(values[i]);
}
104
Daugaard: Write
// Write an array of elements to the buffer.
template <typename T>
FORCE_INLINE void WriteArray(const T* values, size_t count)
{
void* dest = PrepareWrite(sizeof(T) * count, alignof(T));
for (size_t i = 0; i < count; i++)
new(static_cast<T*>(dest) + i) T(values[i]);
}
105
Daugaard: Write
106
Daugaard: Write
void RingBuffer::FinishWrite()
{
m_WriterShared.pos.store(
m_Writer.base + m_Writer.pos,
std::memory_order_release);
}
107
Daugaard: Read
108
Daugaard: Read
109
Daugaard: Read
110
Daugaard: Read
111
Daugaard: Read
112
Daugaard: Read
113
Daugaard: Read
114
Daugaard: Read
115
Daugaard: Read
116
Daugaard: Read
117
Daugaard: Initialization
118
Daugaard: Initialization
// Initialize. Buffer must have required alignment. Size must be a
// power of two.
void Initialize(void* buffer, size_t size)
{
Reset();
m_Reader.buffer = m_Writer.buffer = static_cast<char*>(buffer);
m_Reader.size = m_Writer.size = m_Writer.end = size;
}
void Reset()
{
m_Reader = m_Writer = LocalState();
m_ReaderShared.pos = m_WriterShared.pos = 0;
}
119
Daugaard: Initialization
// Initialize. Buffer must have required alignment. Size must be a
// power of two.
void Initialize(void* buffer, size_t size)
{
Reset();
m_Reader.buffer = m_Writer.buffer = static_cast<char*>(buffer);
m_Reader.size = m_Writer.size = m_Writer.end = size;
}
void Reset()
{
m_Reader = m_Writer = LocalState();
m_ReaderShared.pos = m_WriterShared.pos = 0;
}
120
Daugaard: Initialization
// Initialize. Buffer must have required alignment. Size must be a
// power of two.
void Initialize(void* buffer, size_t size)
{
Reset();
m_Reader.buffer = m_Writer.buffer = static_cast<char*>(buffer);
m_Reader.size = m_Writer.size = m_Writer.end = size;
}
void Reset()
{
m_Reader = m_Writer = LocalState();
m_ReaderShared.pos = m_WriterShared.pos = 0;
}
121
Daugaard: Initialization
// Initialize. Buffer must have required alignment. Size must be a
// power of two.
void Initialize(void* buffer, size_t size)
{
Reset();
m_Reader.buffer = m_Writer.buffer = static_cast<char*>(buffer);
m_Reader.size = m_Writer.size = m_Writer.end = size;
}
void Reset()
{
m_Reader = m_Writer = LocalState();
m_ReaderShared.pos = m_WriterShared.pos = 0;
}
122
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};
SharedState m_WriterShared;
SharedState m_ReaderShared;
123
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};
SharedState m_WriterShared;
SharedState m_ReaderShared;
124
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};
SharedState m_WriterShared;
SharedState m_ReaderShared;
125
Implicit Lifetime
126
Implicit Lifetime
struct X
{
int a, b;
};
X * p = static_cast<X *>(std::malloc(sizeof(X)));
p->a = 1;
p->b = 2;
127
Implicit Lifetime
128
Implicit Lifetime
129
Implicit Lifetime
130
Implicit Lifetime
131
Implicit Lifetime
132
Implicit Lifetime
133
Implicit Lifetime
134
Implicit Lifetime
135
Implicit Lifetime
136
Implicit Lifetime
class Foo
{
// ...
Foo(tag_t) = default;
};
137
Implicit Lifetime
138
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};
SharedState m_WriterShared;
SharedState m_ReaderShared;
139
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};
SharedState m_WriterShared;
SharedState m_ReaderShared;
140
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};
SharedState m_WriterShared;
SharedState m_ReaderShared;
141
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};
SharedState m_WriterShared;
SharedState m_ReaderShared;
142
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};
SharedState m_WriterShared;
SharedState m_ReaderShared;
143
The Hinnant Rule
144
Pop Quiz
struct Foo
{
void operator = (Foo &&) = delete;
};
145
Pop Quiz
struct Foo
{
void operator = (Foo &&) = delete;
};
static_assert(std::is_default_constructible_v<Foo>);
static_assert(std::is_copy_constructible_v<Foo>);
static_assert(std::is_move_constructible_v<Foo>);
static_assert(std::is_copy_assignable_v<Foo>);
static_assert(std::is_move_assignable_v<Foo>);
static_assert(std::is_destructible_v<Foo>);
146
The Hinnant Rule
147
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
std::atomic<size_t> pos;
};
SharedState m_WriterShared;
SharedState m_ReaderShared;
148
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
size_t pos;
};
SharedState m_WriterShared;
SharedState m_ReaderShared;
149
Daugaard: Shared State
class RingBuffer
{
// Writer and reader's shared positions.
struct alignas(CACHE_LINE_SIZE) SharedState
{
Atomic<size_t> pos;
};
SharedState m_WriterShared;
SharedState m_ReaderShared;
150
std::atomic
151
Implicit Lifetime
• Messages must be implicit lifetime
• Must become comfortable with this for sharing data across
processes
152
Cache Lines
#define CACHE_LINE_SIZE 64
153
Daugaard: Cache Lines
#if defined(CACHE_LINE_SIZE)
inline constexpr std::size_t cache_line_size = CACHE_LINE_SIZE;
#elif defined(__cpp_lib_hardware_interference_size) && \
(__cpp_lib_hardware_interference_size >= 201703L)
inline constexpr std::size_t cache_line_size =
std::hardware_destructive_interference_size;
#else
#error "cache_line_size cannot be defined"
#endif
154
Daugaard: Cache Lines
155
Magic Buffer
1 MB
156
Magic Buffer
1 MB
struct Args
{
void * addr = nullptr;
size_type len = 0;
int fd = -1;
size_type offset = 0;
bool magic = false;
size_type page_size = 0;
};
157
Magic Buffer
2 MB
158
Magic Buffer
1 MB 1 MB
// Map the file over the first half of the anonymous mapping.
args.fd = saved_fd;
args.len /= 2;
args.offset = offset;
args.addr = mapped->address;
auto tmp = do_mapping(args);
tmp->release();
159
Magic Buffer
1 MB 1 MB
// Map the file over the second half of the anonymous mapping.
args.addr = static_cast<std::byte *>(mapped->address) + args.len;
tmp = do_mapping(args);
tmp->release();
addr_ = std::bit_cast<pointer>(mapped->address);
size_ = mapped->length / 2;
mapped_ = std::move(mapped);
160
Magic Buffer
1 MB 1 MB
161
MCAST Queues
162
MCAST Queues
• Single Producer, Multiple Consumers
• Every consumer gets every message (mcast, broadcast)
• Producer never blocks
• Slow consumers may miss messages
• Most really fast ones have some kind of UB (seqlock, etc)
163
MCAST Queues
164
MCAST Queues
165
MCAST Queues
• Single Producer, Multiple Consumers
• Every consumer gets every message (mcast, broadcast)
• Producer never blocks
• Slow consumers may miss messages
• Most really fast ones have some kind of UB (seqlock, etc)
166
MCAST Queues
• The Queue
• Lock for becoming the producer
• If enabled, a lock for becoming the special consumer
167
MCAST Producer
template <typename InvocableT, typename... ArgTs>
• The Producer
std::invoke_result_t<InvocableT, Producer::Writer &, ArgTs...>
• Uses TMP or concepts
Producer::
write(InvocableT && invocable, ArgTs
• If has a special consumer &&... args)
noexcept(noexcept(std::invoke(
• Writes block on a single special consumer
std::declval<InvocableT>(),
• Otherwise, writes
std::declval<Writer never block
&>(),
std::declval<ArgTs>()...)))
{
using R = std::invoke_result_t<InvocableT, Writer &, ArgTs...>;
auto writer = Writer(info_);
168
MCAST Producer
template <typename InvocableT, typename... ArgTs>
• The Producer
std::invoke_result_t<InvocableT, Producer::Writer &, ArgTs...>
• Uses TMP or concepts
Producer::
write(InvocableT && invocable, ArgTs
• If has a special consumer &&... args)
noexcept(noexcept(std::invoke(
• Writes block on a single special consumer
std::declval<InvocableT>(),
• Otherwise, writes
std::declval<Writer never block
&>(),
std::declval<ArgTs>()...)))
{
using R = std::invoke_result_t<InvocableT, Writer &, ArgTs...>;
auto writer = Writer(info_);
169
MCAST Producer
auto call = [&]() -> R {
• The Producer
return std::invoke(
• Uses TMP or concepts
std::forward<InvocableT>(invocable),
writer,
• If has a special consumer
std::forward<ArgTs>(args)...);
}; • Writes block on a single special consumer
• Otherwise,
if constexpr writes never block
(std::is_same_v<R, void>) {
call();
info_.finish_write();
return;
} else {
R result = call();
info_.finish_write();
return result;
}
}
170
MCAST Producer
auto call = [&]() -> R {
• The Producer
return std::invoke(
• Uses TMP or concepts
std::forward<InvocableT>(invocable),
writer,
• If has a special consumer
std::forward<ArgTs>(args)...);
}; • Writes block on a single special consumer
• Otherwise,
if constexpr writes never block
(std::is_same_v<R, void>) {
call();
info_.finish_write();
return;
} else {
R result = call();
info_.finish_write();
return result;
}
}
171
MCAST Producer
template <std::size_t N>
void
• The Producer
• Uses TMP or concepts
Producer::Writer::
write(std::span<std::byte const,
• If has a special consumerN> data) noexcept
{
• Writes
auto const sizeblock on a single special consumer
= std::uint32_t(data.size());
• Otherwise,
assert(size writes never block
== data.size());
auto buffer = info_.prepare_buffer<8>(sizeof(std::uint64_t) + size);
std::memcpy(buffer.data(), &size, sizeof(size));
std::memcpy(
buffer.data() + sizeof(std::uint64_t), data.data(), size);
info_.local_counter += buffer.size();
}
172
MCAST Producer
template <std::size_t N>
void
• The Producer
• Uses TMP or concepts
Producer::Writer::
write(std::span<std::byte const,
• If has a special consumerN> data) noexcept
{
• Writes
auto const sizeblock on a single special consumer
= std::uint32_t(data.size());
• Otherwise,
assert(size writes never block
== data.size());
auto buffer = info_.prepare_buffer<8>(sizeof(std::uint64_t) + size);
std::memcpy(buffer.data(), &size, sizeof(size));
std::memcpy(
buffer.data() + sizeof(std::uint64_t), data.data(), size);
info_.local_counter += buffer.size();
}
173
MCAST Producer
template <std::size_t N>
void
• The Producer
• Uses TMP or concepts
Producer::Writer::
write(std::span<std::byte const,
• If has a special consumerN> data) noexcept
{
• Writes
auto const sizeblock on a single special consumer
= std::uint32_t(data.size());
• Otherwise,
assert(size writes never block
== data.size());
auto buffer = info_.prepare_buffer<8>(sizeof(std::uint64_t) + size);
std::memcpy(buffer.data(), &size, sizeof(size));
std::memcpy(
buffer.data() + sizeof(std::uint64_t), data.data(), size);
info_.local_counter += buffer.size();
}
174
MCAST Producer
template <std::size_t N>
void
• The Producer
• Uses TMP or concepts
Producer::Writer::
write(std::span<std::byte const,
• If has a special consumerN> data) noexcept
{
• Writes
auto const sizeblock on a single special consumer
= std::uint32_t(data.size());
• Otherwise,
assert(size writes never block
== data.size());
auto buffer = info_.prepare_buffer<8>(sizeof(std::uint64_t) + size);
std::memcpy(buffer.data(), &size, sizeof(size));
std::memcpy(
buffer.data() + sizeof(std::uint64_t), data.data(), size);
info_.local_counter += buffer.size();
}
175
MCAST Producer
template <std::size_t Alignment>
void
• The Producer
• Uses TMP or concepts
Producer::Writer::
write(void const * data, std::size_t
• If has a special consumer size) noexcept
{
• Writes
auto buffer block on a single special consumer
= info_.prepare_buffer<Alignment>(size);
• Otherwise, writes never
std::memcpy(buffer.data(), block
data, size);
info_.local_counter += size;
}
176
MCAST Producer
template <std::size_t Alignment>
void
• The Producer
• Uses TMP or concepts
Producer::Writer::
write(void const * data, std::size_t
• If has a special consumer size) noexcept
{
• Writes
auto buffer block on a single special consumer
= info_.prepare_buffer<Alignment>(size);
• Otherwise, writes never
std::memcpy(buffer.data(), block
data, size);
info_.local_counter += size;
}
177
MCAST Producer
template <std::size_t Alignment>
void
• The Producer
• Uses TMP or concepts
Producer::Writer::
write(void const * data, std::size_t
• If has a special consumer size) noexcept
{
• Writes
auto buffer block on a single special consumer
= info_.prepare_buffer<Alignment>(size);
• Otherwise, writes never
std::memcpy(buffer.data(), block
data, size);
info_.local_counter += size;
}
178
MCAST Producer
template <std::size_t Alignment>
void
• The Producer
• Uses TMP or concepts
Producer::Writer::
write(void const * data, std::size_t
• If has a special consumer size) noexcept
{
• Writes
auto buffer block on a single special consumer
= info_.prepare_buffer<Alignment>(size);
• Otherwise, writes never
std::memcpy(buffer.data(), block
data, size);
info_.local_counter += size;
}
179
MCAST Producer
template <TriviallyCopyableC T>
T &
• The Producer
• Uses TMP or concepts
Producer::Writer::
emplace(auto &&... args) noexcept(
• If has a special consumer
std::is_nothrow_constructible_v<T, decltype(args)...>)
• Writes block on a single special
requires std::constructible_from<T, consumer
decltype(args)...>
{ • Otherwise, writes never block
auto buffer = info_.prepare_buffer<alignof(T)>(sizeof(T));
auto t = new (static_cast<void *>(buffer.data()))
T(std::forward<decltype(args)>(args)...);
info_.local_counter += sizeof(T);
return *t;
}
180
MCAST Producer
template <TriviallyCopyableC T>
T &
• The Producer
• Uses TMP or concepts
Producer::Writer::
emplace(auto &&... args) noexcept(
• If has a special consumer
std::is_nothrow_constructible_v<T, decltype(args)...>)
• Writes block on a single special
requires std::constructible_from<T, consumer
decltype(args)...>
{ • Otherwise, writes never block
auto buffer = info_.prepare_buffer<alignof(T)>(sizeof(T));
auto t = new (static_cast<void *>(buffer.data()))
T(std::forward<decltype(args)>(args)...);
info_.local_counter += sizeof(T);
return *t;
}
181
MCAST Producer
template <TriviallyCopyableC T>
T &
• The Producer
• Uses TMP or concepts
Producer::Writer::
emplace(auto &&... args) noexcept(
• If has a special consumer
std::is_nothrow_constructible_v<T, decltype(args)...>)
• Writes block on a single special
requires std::constructible_from<T, consumer
decltype(args)...>
{ • Otherwise, writes never block
auto buffer = info_.prepare_buffer<alignof(T)>(sizeof(T));
auto t = new (static_cast<void *>(buffer.data()))
T(std::forward<decltype(args)>(args)...);
info_.local_counter += sizeof(T);
return *t;
}
182
MCAST Producer
template <TriviallyCopyableC T>
T &
• The Producer
• Uses TMP or concepts
Producer::Writer::
emplace(auto &&... args) noexcept(
• If has a special consumer
std::is_nothrow_constructible_v<T, decltype(args)...>)
• Writes block on a single special
requires std::constructible_from<T, consumer
decltype(args)...>
{ • Otherwise, writes never block
auto buffer = info_.prepare_buffer<alignof(T)>(sizeof(T));
auto t = new (static_cast<void *>(buffer.data()))
T(std::forward<decltype(args)>(args)...);
info_.local_counter += sizeof(T);
return *t;
}
183
MCAST Producer
inline Producer::Writer::
• The Producer
Writer(Info & x)
• Uses TMP or concepts
: info_(x)
{ • If has a special consumer
// Save the counter, so we can restore it later, if the write
• Writeswas
// operation block on a for
aborted single special
some reasonconsumer
including a user exception.
• Otherwise, writes
info_.saved_counter never block
= info_.local_counter;
}
184
MCAST Producer
inline Producer::Writer::
• The
~Writer()
Producer
{ • Uses TMP or concepts
// Restore the counter. Safe
• If has a special consumerto do always because the tmp_counter is
// set to the local_counter when finish_write is called.
• Writes block =oninfo_.saved_counter;
info_.local_counter a single special consumer
} • Otherwise, writes never block
185
MCAST Producer
inline void
• The Producer
Producer::Info::
• Uses TMP or concepts
finish_write()
{ • If has a special consumer
shared->read_counter.store(
• Writes blockstd::memory_order_release);
local_counter, on a single special consumer
• Otherwise, writes never block
// Update the saved_counter because the write operation has been
// committed, and there is no longer a need to restore the original
// value.
saved_counter = local_counter;
}
186
MCAST Consumer
inline tl::expected<std::size_t, std::int_fast64_t>
• The
Consumer::
Producer
• Uses TMP or conceptsdata)
try_read(std::span<std::byte>
{ • If has a special consumer
auto read_counter = aligned<Alignment::Int64>(info_.local_counter);
• Writes size;
std::uint32_t block on a single special consumer
• Otherwise, +writes
if (read_counter never block
sizeof(size) >= info_.cached_counter) {
info_.cached_counter = info_.shared->read_counter.load(
std::memory_order_acquire);
if (read_counter + sizeof(size) > info_.cached_counter) {
return tl::unexpected(no_data);
}
}
auto const buf = info_.buffer + (read_counter & info_.mask);
std::memcpy(&size, buf, sizeof(size));
187
MCAST Consumer
inline tl::expected<std::size_t, std::int_fast64_t>
• The
Consumer::
Producer
• Uses TMP or conceptsdata)
try_read(std::span<std::byte>
{ • If has a special consumer
auto read_counter = aligned<Alignment::Int64>(info_.local_counter);
• Writes size;
std::uint32_t block on a single special consumer
• Otherwise, +writes
if (read_counter never block
sizeof(size) >= info_.cached_counter) {
info_.cached_counter = info_.shared->read_counter.load(
std::memory_order_acquire);
if (read_counter + sizeof(size) > info_.cached_counter) {
return tl::unexpected(no_data);
}
}
auto const buf = info_.buffer + (read_counter & info_.mask);
std::memcpy(&size, buf, sizeof(size));
188
MCAST Consumer
inline tl::expected<std::size_t, std::int_fast64_t>
• The
Consumer::
Producer
• Uses TMP or conceptsdata)
try_read(std::span<std::byte>
{ • If has a special consumer
auto read_counter = aligned<Alignment::Int64>(info_.local_counter);
• Writes size;
std::uint32_t block on a single special consumer
• Otherwise, +writes
if (read_counter never block
sizeof(size) >= info_.cached_counter) {
info_.cached_counter = info_.shared->read_counter.load(
std::memory_order_acquire);
if (read_counter + sizeof(size) > info_.cached_counter) {
return tl::unexpected(no_data);
}
}
auto const buf = info_.buffer + (read_counter & info_.mask);
std::memcpy(&size, buf, sizeof(size));
189
MCAST Consumer
inline tl::expected<std::size_t, std::int_fast64_t>
• The
Consumer::
Producer
• Uses TMP or conceptsdata)
try_read(std::span<std::byte>
{ • If has a special consumer
auto read_counter = aligned<Alignment::Int64>(info_.local_counter);
• Writes size;
std::uint32_t block on a single special consumer
• Otherwise, +writes
if (read_counter never block
sizeof(size) >= info_.cached_counter) {
info_.cached_counter = info_.shared->read_counter.load(
std::memory_order_acquire);
if (read_counter + sizeof(size) > info_.cached_counter) {
return tl::unexpected(no_data);
}
}
auto const buf = info_.buffer + (read_counter & info_.mask);
std::memcpy(&size, buf, sizeof(size));
190
MCAST Consumer
inline tl::expected<std::size_t, std::int_fast64_t>
• The
Consumer::
Producer
• Uses TMP or conceptsdata)
try_read(std::span<std::byte>
{ • If has a special consumer
auto read_counter = aligned<Alignment::Int64>(info_.local_counter);
• Writes size;
std::uint32_t block on a single special consumer
• Otherwise, +writes
if (read_counter never block
sizeof(size) >= info_.cached_counter) {
info_.cached_counter = info_.shared->read_counter.load(
std::memory_order_acquire);
if (read_counter + sizeof(size) > info_.cached_counter) {
return tl::unexpected(no_data);
}
}
auto const buf = info_.buffer + (read_counter & info_.mask);
std::memcpy(&size, buf, sizeof(size));
191
MCAST Consumer
inline tl::expected<std::size_t, std::int_fast64_t>
• The
Consumer::
Producer
• Uses TMP or conceptsdata)
try_read(std::span<std::byte>
{ • If has a special consumer
auto read_counter = aligned<Alignment::Int64>(info_.local_counter);
• Writes size;
std::uint32_t block on a single special consumer
• Otherwise, +writes
if (read_counter never block
sizeof(size) >= info_.cached_counter) {
info_.cached_counter = info_.shared->read_counter.load(
std::memory_order_acquire);
if (read_counter + sizeof(size) > info_.cached_counter) {
return tl::unexpected(no_data);
}
}
auto const buf = info_.buffer + (read_counter & info_.mask);
std::memcpy(&size, buf, sizeof(size));
192
MCAST Consumer
if (size > data.size()) {
• The Producer
return tl::unexpected(is_lapped() ? lapped :
• Uses TMP or concepts std::int_fast64_t(size));
} • If has a special consumer
read_counter += sizeof(std::uint64_t) + size;
• Writes block
if (read_counter on a single special consumer
> info_.cached_counter) {
• Otherwise,
return writes never block
tl::unexpected(no_data);
}
std::memcpy(data.data(), buf + sizeof(std::uint64_t), size);
if (is_lapped()) {
return tl::unexpected(lapped);
}
info_.local_counter = read_counter;
return size;
}
193
MCAST Consumer
if (size > data.size()) {
• The Producer
return tl::unexpected(is_lapped() ? lapped :
• Uses TMP or concepts std::int_fast64_t(size));
} • If has a special consumer
read_counter += sizeof(std::uint64_t) + size;
• Writes block
if (read_counter on a single special consumer
> info_.cached_counter) {
• Otherwise,
return writes never block
tl::unexpected(no_data);
}
std::memcpy(data.data(), buf + sizeof(std::uint64_t), size);
if (is_lapped()) {
return tl::unexpected(lapped);
}
info_.local_counter = read_counter;
return size;
}
194
MCAST Consumer
if (size > data.size()) {
• The Producer
return tl::unexpected(is_lapped() ? lapped :
• Uses TMP or concepts std::int_fast64_t(size));
} • If has a special consumer
read_counter += sizeof(std::uint64_t) + size;
• Writes block
if (read_counter on a single special consumer
> info_.cached_counter) {
• Otherwise,
return writes never block
tl::unexpected(no_data);
}
std::memcpy(data.data(), buf + sizeof(std::uint64_t), size);
if (is_lapped()) {
return tl::unexpected(lapped);
}
info_.local_counter = read_counter;
return size;
}
195
MCAST Consumer
if (size > data.size()) {
• The Producer
return tl::unexpected(is_lapped() ? lapped :
• Uses TMP or concepts std::int_fast64_t(size));
} • If has a special consumer
read_counter += sizeof(std::uint64_t) + size;
• Writes block
if (read_counter on a single special consumer
> info_.cached_counter) {
• Otherwise,
return writes never block
tl::unexpected(no_data);
}
std::memcpy(data.data(), buf + sizeof(std::uint64_t), size);
if (is_lapped()) {
return tl::unexpected(lapped);
}
info_.local_counter = read_counter;
return size;
}
196