8000 [adapters] Log detailed checkpoint info on startup. by ryzhyk · Pull Request #5546 · feldera/feldera · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter 10000

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/adapters/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3801,6 +3801,8 @@ impl ControllerInit {
storage: CircuitStorageConfig,
checkpoint: Checkpoint,
) -> Result<Self, ControllerError> {
let checkpoint_summary = checkpoint.display_summary();

let Checkpoint {
circuit,
step,
Expand All @@ -3811,7 +3813,7 @@ impl ControllerInit {
input_statistics,
output_statistics,
} = checkpoint;
info!("resuming from checkpoint made at step {step}");
info!("Resuming from checkpoint:\n{checkpoint_summary}");

let storage = storage.with_init_checkpoint(circuit.map(|circuit| circuit.uuid));

Expand Down
62 changes: 62 additions & 0 deletions crates/adapters/src/controller/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,68 @@ pub struct Checkpoint {
pub output_statistics: HashMap<String, CheckpointOutputEndpointMetrics>,
}

impl Checkpoint {
pub fn display_summary(&self) -> String {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why "display" and not just "summary"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name emphasizes that the summary is formatted for display purposes. I think this is somewhat common in Rust.

let input_metadata = self
.input_metadata
.0
.iter()
.map(|(name, value)| format!(" {name}: {}", serde_json::to_string(value).unwrap()))
.collect::<Vec<_>>()
.join("\n");

let input_statistics = self
.input_statistics
.iter()
.map(|(name, value)| format!(" {name}: {}", serde_json::to_string(value).unwrap()))
.collect::<Vec<_>>()
.join("\n");

let output_statistics = self
.output_statistics
.iter()
.map(|(name, value)| format!(" {name}: {}", serde_json::to_string(value).unwrap()))
.collect::<Vec<_>>()
.join("\n");

let checkpoint_timestamp = self
.circuit
.as_ref()
.and_then(|circuit| circuit.uuid.get_timestamp())
.and_then(|timestamp| {
DateTime::<Utc>::from_timestamp(timestamp.to_unix().0 as i64, timestamp.to_unix().1)
})
.map(|timestamp| timestamp.to_rfc3339())
.unwrap_or_else(|| "Unknown".to_string());

format!(
r#"Checkpoint made at step {}:
Initial pipeline start time: {}
Checkpoint timestamp: {}
Records processed before the checkpoint: {}
Checkpoint metadata: {}
Checkpointed input connector state:
{}
Checkpointed input connector metrics:
{}
Checkpointed output connector metrics:
{}"#,
self.step,
self.initial_start_time,
checkpoint_timestamp,
self.processed_records,
if let Some(circuit) = &self.circuit {
serde_json::to_string(circuit).unwrap()
} else {
"None".to_string()
},
input_metadata,
input_statistics,
output_statistics
)
}
}

/// Checkpoint for the statistics for an input endpoint.
///
/// This is the checkpointed form of [InputEndpointMetrics].
Expand Down
Loading
0