forked from postgres/postgres
-
Notifications
You must be signed in to change notification settings - Fork 1
Logical replication, large transaction streaming #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
ololobus
wants to merge
11
commits into
master
Choose a base branch
from
logic-stream
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
d31e023
to
e718d23
Compare
48454a2
to
06b299a
Compare
Instead of deciding to serialize a transaction merely based on the number of changes in that xact (toplevel or subxact), this makes the decisions based on amount of memory consumed by the changes. The amount of memory is defined by a new logical_work_mem GUC, so for example we can do this SET logical_work_mem = '128kB' to trigger very aggressive streaming. The minimum value is 64kB (i.e. it's lower than minimum value for maintenance_work_mem, which is 1MB). When adding a change to a transaction, we account for the size in two places. Firstly, in the ReorderBuffer, which is then used to decide if we reached the total memory limit. And secondly in the transaction the change belongs to, so that we can pick the largest transaction to evict (and serialize to disk). We still use max_changes_in_memory when loading changes serialized to disk. The trouble is we can't use the memory limit directly as there might be multiple subxact serialized, we need to read all of them but we don't know how many are there (and which subxact to read first). We do not serialize the ReorderBufferTXN entries, so if there is a transaction with many subxacts, most memory may be in this type of objects. Those records are not included in the memory accounting. We also do not account for INTERNAL_TUPLECID changes, which are kept in a separate list and not evicted from memory. Transactions with many CTID changes may consume significant amounts of memory, but we can't really do much about that. The current eviction algorithm is very simple - the transaction is picked merely by size, while it might be useful to also consider age (LSN) of the changes for example. With the new Generational memory allocator, evicting the oldest changes would make it more likely the memory gets actually pfreed. The logical_work_mem may be changed in two ways. In postgres.conf, which serves as the default for all publishers on that instance, and when creating the subscription, using a work_mem paramemter in the WITH clause (specifies number of kilobytes).
The logical decoding infrastructure needs to know which top-level transaction the subxact belongs to, in order to decode all the changes. Until now that might be delayed until commit, due to the caching (GPROC_MAX_CACHED_SUBXIDS), preventing features requiring incremental decoding. So instead we write the assignment info into WAL immediately, as part of the next WAL record (to minimize overhead).
When wal_level=logical, write individual invalidations into WAL so that decoding can use this information. We still add the invalidations to the cache, and write them to WAL at commit time in RecordTransactionCommit(). This uses the existing XLOG_INVALIDATIONS xlog record type, from the RM_STANDBY_ID resource manager (see LogStandbyInvalidations for details). So existing code relying on those 8000 invalidations (e.g. redo) does not need to be changed. The individual invalidations are written are written using a new xlog record type XLOG_XACT_INVALIDATIONS, from RM_XACT_ID resource manager. See LogLogicalInvalidations for details. These new xlog records are ignored by existing redo procedures, which still rely on the invalidations written to commit records. The invalidations are decoded and added as a new ReorderBufferChange type (REORDER_BUFFER_CHANGE_INVALIDATION), and then executed during replay, unlike the existing invalidations (which are either decoded as part of commit record, or executed immediately during decoding and not added to reorderbuffer at all). LogStandbyInvalidations was accumulating all the invalidations in memory, and then only wrote them once at commit time, which may reduce the performance impact by amortizing the overhead and deduplicating the invalidations. The new invalidations are written to WAL immediately, without any such caching. Perhaps it would be possible to add similar caching, e.g. at the command level, or something like that?
This adds four methods the output plugin API, adding support for streaming changes for large transactions. * stream_message * stream_change * stream_abort * stream_commit * stream_start * stream_stop Most of this is a simple extension of the existing methods, with the semantic difference that the transaction (or subtransaction) is incomplete and may be aborted later (which is something the regular API does not really need to deal with). This also extends the 'test_decoding' plugin, implementing these new stream methods. The stream_start/start_stop are used to demarcate the a chunk of changes streamed for a particular toplevel transaction.
Instead of serializing the transaction to disk after reaching the maximum number of changes in memory (4096 changes), we consume the changes we have in memory and invoke new stream API methods. This happens in ReorderBufferStreamTXN() using about the same logic as in ReorderBufferCommit() logic. We can do this incremental processing thanks to having assignments (associating subxact with toplevel xacts) in WAL right away, and thanks to logging the invalidation messages. This adds a second iterator for the streaming case, without the spill-to-disk functionality and only processing changes currently in memory. Theoretically, we could get rid of the k-way merge, and append the changes to the toplevel xact directly (and remember the position in the list in case the subxact gets aborted later). It also adds ReorderBufferTXN pointer to two places: * ReorderBufferChange, so that we know which xact it belongs to * ReorderBufferTXN, pointing to toplevel xact (from subxact) The output plugin can use this to decide which changes to discard in case of stream_abort_cb (e.g. when a subxact gets discarded).
To add support for streaming of in-progress transactions into the built-in transaction, we need to do three things: * Extend the logical replication protocol, so identify in-progress transactions, and allow adding additional bits of information (e.g. XID of subtransactions). * Modify the output plugin (pgoutput) to implement the new stream API callbacks, by leveraging the extended replication protocol. * Modify the replication apply worker, to properly handle streamed in-progress transaction by spilling the data to disk and then replaying them on commit. We however must explicitly disable streaming replication during replication slot creation, even if the plugin supports it. We don't need to replicate the changes accumulated during this phase, and moreover we don't have a replication connection open so we don't have where to send the data anyway.
06b299a
to
3d0c054
Compare
d2d0925
to
aebce07
Compare
7c01f90
to
16485ab
Compare
8542a98
to
2603ea1
Compare
7c657da
to
0e5f4f7
Compare
28db880
to
7f8e356
Compare
b32538b
to
fb6f525
Compare
5d7849e
to
afd25cc
Compare
2843bba
to
9fcc5b8
Compare
7fb7114
to
9dd5d33
Compare
2435350
to
5a0dfc6
Compare
5a5411e
to
63ab42a
Compare
2943f01
to
c87ec8b
Compare
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
No description provided.