8000 [adapters] Fix connector-initiated multihost transactions. by blp · Pull Request #5542 · feldera/feldera · GitHub
[go: up one dir, main page]

Skip to content

Conversation

@blp
Copy link
Member
@blp blp commented Jan 30, 2026

This commit is primarily intended to clarify the transaction code. I found it somewhat puzzling before partly because is_ongoing() and is_ready_to_commit() seemed a little confusing, but also because advance_transaction_state() was one match statement that I found obscure. This commit rewrites these functions so that I can understand them better, adding more comments and more explicit logic.

However, some of the changes are bug fixes for the case where a connector initiates a transaction when multihost is enabled. In this case, what would happen before was something like this:

  1. Connector requests starting a transaction.
  2. Coordinator runs a step before it receives the connector's request.
  3. Coordinator receives the connector's request.
  4. Coordinator starts a transaction.

Unfortunately, as a side effect, step 2 would discard the connector's request entirely because advance_transaction_state(), when it was called if there was no ongoing transaction and none to initiate, always called transaction_info.initiators.clear(), which clears all the transaction requests including those from connectors.

This commit changes that case to instead just assert that there's no active API transaction (in the multihost case), instead of clearing all of them, which as a side effect fixes the bug.

Issue: https://github.com/feldera/cloud/issues/1372

@blp blp requested a review from swanandx January 30, 2026 23:41
@blp blp self-assigned this Jan 30, 2026
Copilot AI review requested due to automatic review settings January 30, 2026 23:41
@blp blp added bug Something isn't working connectors Issues related to the adapters/connectors crate rust Pull requests that update Rust code enterprise Issue related to Feldera Enterprise features. multihost Related to multihost or distributed pipelines labels Jan 30, 2026
Copy link
Contributor
Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors transaction management logic to improve clarity and fixes a bug where connector-initiated transaction requests were being prematurely cleared in multihost configurations.

Changes:

  • Renamed and clarified transaction state checking methods with improved documentation
  • Added is_active() method to distinguish between active and ongoing transactions
  • Refactored advance_transaction_state() from a single match statement into explicit if-else branches with detailed comments

// here; instead, we'd be ensuring that a transaction is
// running.
//
// - They cannot be all be committed, because
Copy link
Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'all be' to 'all'.

Suggested change
// - They cannot be all be committed, because
// - They cannot all be committed, because

Copilot uses AI. Check for mistakes.
This commit is primarily intended to clarify the transaction code.
I found it somewhat puzzling before partly because is_ongoing() and
is_ready_to_commit() seemed a little confusing, but also because
advance_transaction_state() was one `match` statement that I found obscure.
This commit rewrites these functions so that I can understand them better,
adding more comments and more explicit logic.

However, some of the changes are bug fixes for the case where a connector
initiates a transaction when multihost is enabled.  In this case, what
would happen before was something like this:

1. Connector requests starting a transaction.
2. Coordinator runs a step before it receives the connector's request.
3. Coordinator receives the connector's request.
4. Coordinator starts a transaction.

Unfortunately, as a side effect, step 2 would discard the connector's
request entirely because advance_transaction_state(), when it was called
if there was no ongoing transaction and none to initiate, always
called transaction_info.initiators.clear(), which clears all the
transaction requests including those from connectors.

This commit changes that case to instead just assert that there's no
active API transaction (in the multihost case), instead of clearing all
of them, which as a side effect fixes the bug.

Issue: feldera/cloud#1372
Signed-off-by: Ben Pfaff <blp@feldera.com>
Copy link
Contributor
@swanandx swanandx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the fix!

assuming all hosts logging Starting transaction .. / Committing.. is intended.

[txns host=0] 2026-02-02T07:51:34.217334Z  INFO dbsp_adapters::controller:  Starting transaction 1
[txns host=1] 2026-02-02T07:51:34.217560Z  INFO dbsp_adapters::controller:  Starting transaction 1
[txns host=1] 2026-02-02T07:51:34.218593Z  WARN dbsp::storage::backend::posixio_impl:  /pipeline-storage/status.json.mut: unable to delete dropped file: No such file or directory (os error 2)
[txns host=1] 2026-02-02T07:51:34.218681Z  WARN dbsp_adapters::server:  status.json: failed to write to storage (/pipeline-storage/status.json.mut: rename failed: entity not found)
[txns host=0] 2026-02-02T07:51:34.219036Z  WARN dbsp::storage::backend::posixio_impl:  /pipeline-storage/status.json.mut: unable to delete dropped file: No such file or directory (os error 2)
[txns host=0] 2026-02-02T07:51:34.219080Z  WARN dbsp_adapters::server:  status.json: failed to write to storage (/pipeline-storage/status.json.mut: rename failed: entity not found)
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.228504Z  INFO stream{path="/coordination/transaction/status" ordinal=0}: feldera_coordinator::pipelines: New transaction coordination status TransactionCoordination { status: Some(true), requests: {} }
[txns host=1] 2026-02-02T07:51:34.231961Z  INFO dbsp_adapters::server:  pause: Transitioning from Running to Paused
[txns host=0] 2026-02-02T07:51:34.231982Z  INFO dbsp_adapters::server:  pause: Transitioning from Running to Paused
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.232245Z  INFO stream{path="/coordination/status" ordinal=0}: feldera_coordinator::pipelines: New status ExtendedRuntimeStatus { runtime_status: Paused, runtime_status_details: String(""), runtime_desired_status: Paused }
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.232266Z  INFO stream{path="/coordination/status" ordinal=1}: feldera_coordinator::pipelines: New status ExtendedRuntimeStatus { runtime_status: Paused, runtime_status_details: String(""), runtime_desired_status: Paused }
[txns host=1] 2026-02-02T07:51:34.232669Z  INFO dbsp_adapters::server:  start: Transitioning from Paused to Running
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.232519Z  INFO stream{path="/coordination/transaction/status" ordinal=1}: feldera_coordinator::pipelines: New transaction coordination status TransactionCoordination { status: Some(false), requests: {} }
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.232533Z  INFO stream{path="/coordination/transaction/status" ordinal=0}: feldera_coordinator::pipelines: New transaction coordination status TransactionCoordination { status: Some(false), requests: {} }
[txns host=0] 2026-02-02T07:51:34.232629Z  INFO dbsp_adapters::server:  start: Transitioning from Paused to Running
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.232811Z  INFO stream{path="/coordination/status" ordinal=1}: feldera_coordinator::pipelines: New status ExtendedRuntimeStatus { runtime_status: Running, runtime_status_details: String(""), runtime_desired_status: Running }
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.232919Z  INFO stream{path="/coordination/status" ordinal=0}: feldera_coordinator::pipelines: New status ExtendedRuntimeStatus { runtime_status: Running, runtime_status_details: String(""), runtime_desired_status: Running }
[txns host=0] 2026-02-02T07:51:34.232949Z  INFO dbsp_adapters::controller:  Committing transaction 1
[txns host=1] 2026-02-02T07:51:34.233003Z  INFO dbsp_adapters::controller:  Committing transaction 1
[txns host=1] 2026-02-02T07:51:34.233817Z  INFO dbsp_adapters::controller:  Transaction 1 committed
[coordinator pipeline-019c1d3f-9a70-7bc3-8db1-820a7f17567d] 2026-02-02T07:51:34.233933Z  INFO stream{path="/coordination/transaction/status" ordinal=1}: feldera_coordinator::pipelines: New transaction coordination status TransactionCoordination { status: None, requests: {} }
[txns host=1] 2026-02-02T07:51:34.234302Z  WARN dbsp::storage::backend::posixio_impl:  /pipeline-storage/status.json.mut: unable to delete dropped file: No such file or directory (os error 2)
[txns host=1] 2026-02-02T07:51:34.234318Z  WARN dbsp_adapters::server:  status.json: failed to write to storage (/pipeline-storage/status.json.mut: rename failed: entity not found)
[txns host=0] 2026-02-02T07:51:34.233986Z  INFO dbsp_adapters::controller:  Transaction 1 committed

@swanandx swanandx added this pull request to the merge queue Feb 2, 2026
Merged via the queue into main with commit b5043d9 Feb 2, 2026
1 check passed
@swanandx swanandx deleted the transactions branch February 2, 2026 11:19
@blp
Copy link
Member Author
blp commented Feb 2, 2026

thanks for the fix!

assuming all hosts logging Starting transaction .. / Committing.. is intended.

Yes (I think we might need more logging from the coordinator itself).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working connectors Issues related to the adapters/connectors crate enterprise Issue related to Feldera Enterprise features. multihost Related to multihost or distributed pipelines rust Pull requests that update Rust code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

0