-
Notifications
You must be signed in to change notification settings - Fork 96
[adapters] Fix connector-initiated multihost transactions. #5542
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR refactors transaction management logic to improve clarity and fixes a bug where connector-initiated transaction requests were being prematurely cleared in multihost configurations.
Changes:
- Renamed and clarified transaction state checking methods with improved documentation
- Added
is_active()method to distinguish between active and ongoing transactions - Refactored
advance_transaction_state()from a single match statement into explicit if-else branches with detailed comments
crates/adapters/src/controller.rs
Outdated
| // here; instead, we'd be ensuring that a transaction is | ||
| // running. | ||
| // | ||
| // - They cannot be all be committed, because |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'all be' to 'all'.
| // - They cannot be all be committed, because | |
| // - They cannot all be committed, because |
This commit is primarily intended to clarify the transaction code. I found it somewhat puzzling before partly because is_ongoing() and is_ready_to_commit() seemed a little confusing, but also because advance_transaction_state() was one `match` statement that I found obscure. This commit rewrites these functions so that I can understand them better, adding more comments and more explicit logic. However, some of the changes are bug fixes for the case where a connector initiates a transaction when multihost is enabled. In this case, what would happen before was something like this: 1. Connector requests starting a transaction. 2. Coordinator runs a step before it receives the connector's request. 3. Coordinator receives the connector's request. 4. Coordinator starts a transaction. Unfortunately, as a side effect, step 2 would discard the connector's request entirely because advance_transaction_state(), when it was called if there was no ongoing transaction and none to initiate, always called transaction_info.initiators.clear(), which clears all the transaction requests including those from connectors. This commit changes that case to instead just assert that there's no active API transaction (in the multihost case), instead of clearing all of them, which as a side effect fixes the bug. Issue: feldera/cloud#1372 Signed-off-by: Ben Pfaff <blp@feldera.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the fix!
assuming all hosts logging Starting transaction .. / Committing.. is intended.
[txns host=0] 2026-02-02T07:51:34.217334Z INFO dbsp_adapters::controller: Starting transaction 1
[txns host=1] 2026-02-02T07:51:34.217560Z INFO dbsp_adapters::controller: Starting transaction 1
[txns host=1] 2026-02-02T07:51:34.218593Z WARN dbsp::storage::backend::posixio_impl: /pipeline-storage/status.json.mut: unable to delete dropped file: No such file or directory (os error 2)
[txns host=1] 2026-02-02T07:51:34.218681Z WARN dbsp_adapters::server: status.json: failed to write to storage (/pipeline-storage/status.json.mut: rename failed: entity not found)
[txns host=0] 2026-02-02T07:51:34.219036Z WARN dbsp::storage::backend::posixio_impl: /pipeline-storage/status.json.mut: unable to delete dropped file: No such file or directory (os error 2)
[txns host=0] 2026-02-02T07:51:34.219080Z WARN dbsp_adapters::server: status.json: failed to write to storage (/pipeline-storage/status.json.mut: rename failed: entity not found)
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.228504Z INFO stream{path="/coordination/transaction/status" ordinal=0}: feldera_coordinator::pipelines: New transaction coordination status TransactionCoordination { status: Some(true), requests: {} }
[txns host=1] 2026-02-02T07:51:34.231961Z INFO dbsp_adapters::server: pause: Transitioning from Running to Paused
[txns host=0] 2026-02-02T07:51:34.231982Z INFO dbsp_adapters::server: pause: Transitioning from Running to Paused
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.232245Z INFO stream{path="/coordination/status" ordinal=0}: feldera_coordinator::pipelines: New status ExtendedRuntimeStatus { runtime_status: Paused, runtime_status_details: String(""), runtime_desired_status: Paused }
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.232266Z INFO stream{path="/coordination/status" ordinal=1}: feldera_coordinator::pipelines: New status ExtendedRuntimeStatus { runtime_status: Paused, runtime_status_details: String(""), runtime_desired_status: Paused }
[txns host=1] 2026-02-02T07:51:34.232669Z INFO dbsp_adapters::server: start: Transitioning from Paused to Running
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.232519Z INFO stream{path="/coordination/transaction/status" ordinal=1}: feldera_coordinator::pipelines: New transaction coordination status TransactionCoordination { status: Some(false), requests: {} }
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.232533Z INFO stream{path="/coordination/transaction/status" ordinal=0}: feldera_coordinator::pipelines: New transaction coordination status TransactionCoordination { status: Some(false), requests: {} }
[txns host=0] 2026-02-02T07:51:34.232629Z INFO dbsp_adapters::server: start: Transitioning from Paused to Running
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.232811Z INFO stream{path="/coordination/status" ordinal=1}: feldera_coordinator::pipelines: New status ExtendedRuntimeStatus { runtime_status: Running, runtime_status_details: String(""), runtime_desired_status: Running }
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.232919Z INFO stream{path="/coordination/status" ordinal=0}: feldera_coordinator::pipelines: New status ExtendedRuntimeStatus { runtime_status: Running, runtime_status_details: String(""), runtime_desired_status: Running }
[txns host=0] 2026-02-02T07:51:34.232949Z INFO dbsp_adapters::controller: Committing transaction 1
[txns host=1] 2026-02-02T07:51:34.233003Z INFO dbsp_adapters::controller: Committing transaction 1
[txns host=1] 2026-02-02T07:51:34.233817Z INFO dbsp_adapters::controller: Transaction 1 committed
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.233933Z INFO stream{path="/coordination/transaction/status" ordinal=1}: feldera_coordinator::pipelines: New transaction coordination status TransactionCoordination { status: None, requests: {} }
[txns host=1] 2026-02-02T07:51:34.234302Z WARN dbsp::storage::backend::posixio_impl: /pipeline-storage/status.json.mut: unable to delete dropped file: No such file or directory (os error 2)
[txns host=1] 2026-02-02T07:51:34.234318Z WARN dbsp_adapters::server: status.json: failed to write to storage (/pipeline-storage/status.json.mut: rename failed: entity not found)
[txns host=0] 2026-02-02T07:51:34.233986Z INFO dbsp_adapters::controller: Transaction 1 committed
Yes (I think we might need more logging from the coordinator itself). |
This commit is primarily intended to clarify the transaction code. I found it somewhat puzzling before partly because is_ongoing() and is_ready_to_commit() seemed a little confusing, but also because advance_transaction_state() was one
matchstatement that I found obscure. This commit rewrites these functions so that I can understand them better, adding more comments and more explicit logic.However, some of the changes are bug fixes for the case where a connector initiates a transaction when multihost is enabled. In this case, what would happen before was something like this:
Unfortunately, as a side effect, step 2 would discard the connector's request entirely because advance_transaction_state(), when it was called if there was no ongoing transaction and none to initiate, always called transaction_info.initiators.clear(), which clears all the transaction requests including those from connectors.
This commit changes that case to instead just assert that there's no active API transaction (in the multihost case), instead of clearing all of them, which as a side effect fixes the bug.
Issue: https://github.com/feldera/cloud/issues/1372