diff --git a/crates/core/src/kv.rs b/crates/core/src/kv.rs index 811409d..c8d9848 100644 --- a/crates/core/src/kv.rs +++ b/crates/core/src/kv.rs @@ -8,10 +8,10 @@ use sqlite::ResultCode; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, Context}; -use crate::bucket_priority::BucketPriority; use crate::create_sqlite_optional_text_fn; use crate::create_sqlite_text_fn; use crate::error::SQLiteError; +use crate::sync::BucketPriority; fn powersync_client_id_impl( ctx: *mut sqlite::context, diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 9d1c997..995e5f9 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -12,7 +12,6 @@ use core::ffi::{c_char, c_int}; use sqlite::ResultCode; use sqlite_nostd as sqlite; -mod bucket_priority; mod checkpoint; mod crud_vtab; mod diff; @@ -26,6 +25,7 @@ mod migrations; mod operations; mod operations_vtab; mod schema; +mod sync; mod sync_local; mod sync_types; mod util; diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index d00fef2..8fd5357 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -8,9 +8,9 @@ use sqlite::ResultCode; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, Context}; -use crate::bucket_priority::BucketPriority; use crate::error::{PSResult, SQLiteError}; use crate::fix_data::apply_v035_fix; +use crate::sync::BucketPriority; pub const LATEST_VERSION: i32 = 9; diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index c0b7622..e4c471c 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -1,7 +1,9 @@ use alloc::format; use alloc::string::String; +use num_traits::Zero; use crate::error::{PSResult, SQLiteError}; +use crate::sync::Checksum; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, ResultCode}; @@ -101,8 +103,8 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)", bucket_statement.reset()?; let mut last_op: Option = None; - let mut add_checksum: i32 = 0; - let mut op_checksum: i32 = 0; + let mut add_checksum = Checksum::zero(); + let mut op_checksum = Checksum::zero(); let mut added_ops: i32 = 0; while iterate_statement.step()? == ResultCode::ROW { @@ -110,7 +112,7 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)", let op = iterate_statement.column_text(1)?; let object_type = iterate_statement.column_text(2); let object_id = iterate_statement.column_text(3); - let checksum = iterate_statement.column_int(4); + let checksum = Checksum::from_i32(iterate_statement.column_int(4)); let op_data = iterate_statement.column_text(5); last_op = Some(op_id); @@ -131,9 +133,9 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)", while supersede_statement.step()? == ResultCode::ROW { // Superseded (deleted) a previous operation, add the checksum - let supersede_checksum = supersede_statement.column_int(1); - add_checksum = add_checksum.wrapping_add(supersede_checksum); - op_checksum = op_checksum.wrapping_sub(supersede_checksum); + let supersede_checksum = Checksum::from_i32(supersede_statement.column_int(1)); + add_checksum += supersede_checksum; + op_checksum -= supersede_checksum; // Superseded an operation, only skip if the bucket was empty // Previously this checked "superseded_op <= last_applied_op". @@ -149,7 +151,7 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)", if op == "REMOVE" { let should_skip_remove = !superseded; - add_checksum = add_checksum.wrapping_add(checksum); + add_checksum += checksum; if !should_skip_remove { if let (Ok(object_type), Ok(object_id)) = (object_type, object_id) { @@ -190,12 +192,12 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)", insert_statement.bind_null(6)?; } - insert_statement.bind_int(7, checksum)?; + insert_statement.bind_int(7, checksum.bitcast_i32())?; insert_statement.exec()?; - op_checksum = op_checksum.wrapping_add(checksum); + op_checksum += checksum; } else if op == "MOVE" { - add_checksum = add_checksum.wrapping_add(checksum); + add_checksum += checksum; } else if op == "CLEAR" { // Any remaining PUT operations should get an implicit REMOVE // language=SQLite @@ -223,12 +225,12 @@ WHERE bucket = ?1", "UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE id = ?2", )?; clear_statement2.bind_int64(2, bucket_id)?; - clear_statement2.bind_int(1, checksum)?; + clear_statement2.bind_int(1, checksum.bitcast_i32())?; clear_statement2.exec()?; - add_checksum = 0; + add_checksum = Checksum::zero(); is_empty = true; - op_checksum = 0; + op_checksum = Checksum::zero(); } } @@ -244,8 +246,8 @@ WHERE bucket = ?1", )?; statement.bind_int64(1, bucket_id)?; statement.bind_int64(2, *last_op)?; - statement.bind_int(3, add_checksum)?; - statement.bind_int(4, op_checksum)?; + statement.bind_int(3, add_checksum.bitcast_i32())?; + statement.bind_int(4, op_checksum.bitcast_i32())?; statement.bind_int(5, added_ops)?; statement.exec()?; diff --git a/crates/core/src/bucket_priority.rs b/crates/core/src/sync/bucket_priority.rs similarity index 100% rename from crates/core/src/bucket_priority.rs rename to crates/core/src/sync/bucket_priority.rs diff --git a/crates/core/src/sync/checksum.rs b/crates/core/src/sync/checksum.rs new file mode 100644 index 0000000..c6f2bc6 --- /dev/null +++ b/crates/core/src/sync/checksum.rs @@ -0,0 +1,214 @@ +use core::{ + fmt::Display, + num::Wrapping, + ops::{Add, AddAssign, Sub, SubAssign}, +}; + +use num_traits::float::FloatCore; +use num_traits::Zero; +use serde::{de::Visitor, Deserialize, Serialize}; + +/// A checksum as received from the sync service. +/// +/// Conceptually, we use unsigned 32 bit integers to represent checksums, and adding checksums +/// should be a wrapping add. +#[repr(transparent)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)] +pub struct Checksum(Wrapping); + +impl Checksum { + pub const fn value(self) -> u32 { + self.0 .0 + } + + pub const fn from_value(value: u32) -> Self { + Self(Wrapping(value)) + } + + pub const fn from_i32(value: i32) -> Self { + Self::from_value(value as u32) + } + + pub const fn bitcast_i32(self) -> i32 { + self.value() as i32 + } +} + +impl Zero for Checksum { + fn zero() -> Self { + const { Self::from_value(0) } + } + + fn is_zero(&self) -> bool { + self.value() == 0 + } +} + +impl Add for Checksum { + type Output = Self; + + #[inline] + fn add(self, rhs: Self) -> Self::Output { + Self(self.0 + rhs.0) + } +} + +impl AddAssign for Checksum { + #[inline] + fn add_assign(&mut self, rhs: Self) { + self.0 += rhs.0 + } +} + +impl Sub for Checksum { + type Output = Self; + + fn sub(self, rhs: Self) -> Self::Output { + Self(self.0 - rhs.0) + } +} + +impl SubAssign for Checksum { + fn sub_assign(&mut self, rhs: Self) { + self.0 -= rhs.0; + } +} + +impl From for Checksum { + fn from(value: u32) -> Self { + Self::from_value(value) + } +} + +impl Display for Checksum { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "{:#010x}", self.value()) + } +} + +impl<'de> Deserialize<'de> for Checksum { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct MyVisitor; + + impl<'de> Visitor<'de> for MyVisitor { + type Value = Checksum; + + fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result { + write!(formatter, "a number to interpret as a checksum") + } + + fn visit_u32(self, v: u32) -> Result + where + E: serde::de::Error, + { + Ok(v.into()) + } + + fn visit_u64(self, v: u64) -> Result + where + E: serde::de::Error, + { + let as_u32: u32 = v.try_into().map_err(|_| { + E::invalid_value(serde::de::Unexpected::Unsigned(v), &"a 32-bit int") + })?; + Ok(as_u32.into()) + } + + fn visit_i32(self, v: i32) -> Result + where + E: serde::de::Error, + { + Ok(Checksum::from_i32(v)) + } + + fn visit_i64(self, v: i64) -> Result + where + E: serde::de::Error, + { + // This is supposed to be an u32, but it could also be a i32 that we need to + // normalize. + let min: i64 = u32::MIN.into(); + let max: i64 = u32::MAX.into(); + + if v >= min && v <= max { + return Ok(Checksum::from(v as u32)); + } + + let as_i32: i32 = v.try_into().map_err(|_| { + E::invalid_value(serde::de::Unexpected::Signed(v), &"a 32-bit int") + })?; + Ok(Checksum::from_i32(as_i32)) + } + + fn visit_f64(self, v: f64) -> Result + where + E: serde::de::Error, + { + if !v.is_finite() || f64::trunc(v) != v { + return Err(E::invalid_value( + serde::de::Unexpected::Float(v), + &"a whole number", + )); + } + + self.visit_i64(v as i64) + } + } + + deserializer.deserialize_u32(MyVisitor) + } +} + +#[cfg(test)] +mod test { + use num_traits::Zero; + + use super::Checksum; + + #[test] + pub fn test_binary_representation() { + assert_eq!(Checksum::from_i32(-1).value(), u32::MAX); + assert_eq!(Checksum::from(u32::MAX).value(), u32::MAX); + assert_eq!(Checksum::from(u32::MAX).bitcast_i32(), -1); + } + + fn deserialize(from: &str) -> Checksum { + serde_json::from_str(from).expect("should deserialize") + } + + #[test] + pub fn test_deserialize() { + assert_eq!(deserialize("0").value(), 0); + assert_eq!(deserialize("-1").value(), u32::MAX); + assert_eq!(deserialize("-1.0").value(), u32::MAX); + + assert_eq!(deserialize("3573495687").value(), 3573495687); + assert_eq!(deserialize("3573495687.0").value(), 3573495687); + assert_eq!(deserialize("-721471609.0").value(), 3573495687); + } + + #[test] + pub fn test_arithmetic() { + assert_eq!(Checksum::from(3) + Checksum::from(7), Checksum::from(10)); + + // Checksums should always wrap around + assert_eq!( + Checksum::from(0xFFFFFFFF) + Checksum::from(1), + Checksum::zero() + ); + assert_eq!( + Checksum::zero() - Checksum::from(1), + Checksum::from(0xFFFFFFFF) + ); + + let mut cs = Checksum::from(0x8FFFFFFF); + cs += Checksum::from(0x80000000); + assert_eq!(cs, Checksum::from(0x0FFFFFFF)); + + cs -= Checksum::from(0x80000001); + assert_eq!(cs, Checksum::from(0x8FFFFFFE)); + } +} diff --git a/crates/core/src/sync/mod.rs b/crates/core/src/sync/mod.rs new file mode 100644 index 0000000..b2fbd19 --- /dev/null +++ b/crates/core/src/sync/mod.rs @@ -0,0 +1,5 @@ +mod bucket_priority; +mod checksum; + +pub use bucket_priority::BucketPriority; +pub use checksum::Checksum; diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index 91aa36a..225796b 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -4,8 +4,8 @@ use alloc::string::String; use alloc::vec::Vec; use serde::Deserialize; -use crate::bucket_priority::BucketPriority; use crate::error::{PSResult, SQLiteError}; +use crate::sync::BucketPriority; use sqlite_nostd::{self as sqlite, Destructor, ManagedStmt, Value}; use sqlite_nostd::{ColumnType, Connection, ResultCode};