8000 Add checksum newtype by simolus3 · Pull Request #80 · powersync-ja/powersync-sqlite-core · GitHub
[go: up one dir, main page]

Skip to content

Add checksum newtype #80

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use sqlite::ResultCode;
use sqlite_nostd as sqlite;
use sqlite_nostd::{Connection, Context};

use crate::bucket_priority::BucketPriority;
use crate::create_sqlite_optional_text_fn;
use crate::create_sqlite_text_fn;
use crate::error::SQLiteError;
use crate::sync::BucketPriority;

fn powersync_client_id_impl(
ctx: *mut sqlite::context,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use core::ffi::{c_char, c_int};
use sqlite::ResultCode;
use sqlite_nostd as sqlite;

mod bucket_priority;
mod checkpoint;
mod crud_vtab;
mod diff;
Expand All @@ -26,6 +25,7 @@ mod migrations;
mod operations;
mod operations_vtab;
mod schema;
mod sync;
mod sync_local;
mod sync_types;
mod util;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use sqlite::ResultCode;
use sqlite_nostd as sqlite;
use sqlite_nostd::{Connection, Context};

use crate::bucket_priority::BucketPriority;
use crate::error::{PSResult, SQLiteError};
use crate::fix_data::apply_v035_fix;
use crate::sync::BucketPriority;

pub const LATEST_VERSION: i32 = 9;

Expand Down
32 changes: 17 additions & 15 deletions crates/core/src/operations.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use alloc::format;
use alloc::string::String;
use num_traits::Zero;

use crate::error::{PSResult, SQLiteError};
use crate::sync::Checksum;
use sqlite_nostd as sqlite;
use sqlite_nostd::{Connection, ResultCode};

Expand Down Expand Up @@ -101,16 +103,16 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
bucket_statement.reset()?;

let mut last_op: Option<i64> = None;
let mut add_checksum: i32 = 0;
let mut op_checksum: i32 = 0;
let mut add_checksum = Checksum::zero();
let mut op_checksum = Checksum::zero();
let mut added_ops: i32 = 0;

while iterate_statement.step()? == ResultCode::ROW {
let op_id = iterate_statement.column_int64(0);
let op = iterate_statement.column_text(1)?;
let object_type = iterate_statement.column_text(2);
let object_id = iterate_statement.column_text(3);
let checksum = iterate_statement.column_int(4);
let checksum = Checksum::from_i32(iterate_statement.column_int(4));
let op_data = iterate_statement.column_text(5);

last_op = Some(op_id);
Expand All @@ -131,9 +133,9 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",

while supersede_statement.step()? == ResultCode::ROW {
// Superseded (deleted) a previous operation, add the checksum
let supersede_checksum = supersede_statement.column_int(1);
add_checksum = add_checksum.wrapping_add(supersede_checksum);
op_checksum = op_checksum.wrapping_sub(supersede_checksum);
let supersede_checksum = Checksum::from_i32(supersede_statement.column_int(1));
add_checksum += supersede_checksum;
op_checksum -= supersede_checksum;

// Superseded an operation, only skip if the bucket was empty
// Previously this checked "superseded_op <= last_applied_op".
Expand All @@ -149,7 +151,7 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
if op == "REMOVE" {
let should_skip_remove = !superseded;

add_checksum = add_checksum.wrapping_add(checksum);
add_checksum += checksum;

if !should_skip_remove {
if let (Ok(object_type), Ok(object_id)) = (object_type, object_id) {
Expand Down Expand Up @@ -190,12 +192,12 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
insert_statement.bind_null(6)?;
}

insert_statement.bind_int(7, checksum)?;
insert_statement.bind_int(7, checksum.bitcast_i32())?;
insert_statement.exec()?;

op_checksum = op_checksum.wrapping_add(checksum);
op_checksum += checksum;
} else if op == "MOVE" {
add_checksum = add_checksum.wrapping_add(checksum);
add_checksum += checksum;
} else if op == "CLEAR" {
// Any remaining PUT operations should get an implicit REMOVE
// language=SQLite
Expand Down Expand Up @@ -223,12 +225,12 @@ WHERE bucket = ?1",
"UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE id = ?2",
)?;
clear_statement2.bind_int64(2, bucket_id)?;
clear_statement2.bind_int(1, checksum)?;
clear_statement2.bind_int(1, checksum.bitcast_i32())?;
clear_statement2.exec()?;

add_checksum = 0;
add_checksum = Checksum::zero();
is_empty = true;
op_checksum = 0;
op_checksum = Checksum::zero();
}
}

Expand All @@ -244,8 +246,8 @@ WHERE bucket = ?1",
)?;
statement.bind_int64(1, bucket_id)?;
statement.bind_int64(2, *last_op)?;
statement.bind_int(3, add_checksum)?;
statement.bind_int(4, op_checksum)?;
statement.bind_int(3, add_checksum.bitcast_i32())?;
statement.bind_int(4, op_checksum.bitcast_i32())?;
statement.bind_int(5, added_ops)?;

statement.exec()?;
Expand Down
214 changes: 214 additions & 0 deletions crates/core/src/sync/checksum.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
use core::{
fmt::Display,
num::Wrapping,
ops::{Add, AddAssign, Sub, SubAssign},
};

use num_traits::float::FloatCore;
use num_traits::Zero;
use serde::{de::Visitor, Deserialize, Serialize};

/// A checksum as received from the sync service.
///
/// Conceptually, we use unsigned 32 bit integers to represent checksums, and adding checksums
/// should be a wrapping add.
#[repr(transparent)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
pub struct Checksum(Wrapping<u32>);

impl Checksum {
pub const fn value(self) -> u32 {
self.0 .0
}

pub const fn from_value(value: u32) -> Self {
Self(Wrapping(value))
}

pub const fn from_i32(value: i32) -> Self {
Self::from_value(value as u32)
}

pub const fn bitcast_i32(self) -> i32 {
self.value() as i32
}
}

impl Zero for Checksum {
fn zero() -> Self {
const { Self::from_value(0) }
}

fn is_zero(&self) -> bool {
self.value() == 0
}
}

impl Add for Checksum {
type Output = Self;

#[inline]
fn add(self, rhs: Self) -> Self::Output {
Self(self.0 + rhs.0)
}
}

impl AddAssign for Checksum {
#[inline]
fn add_assign(&mut self, rhs: Self) {
self.0 += rhs.0
}
}

impl Sub for Checksum {
type Output = Self;

fn sub(self, rhs: Self) -> Self::Output {
Self(self.0 - rhs.0)
}
}

impl SubAssign for Checksum {
fn sub_assign(&mut self, rhs: Self) {
self.0 -= rhs.0;
}
}

impl From<u32> for Checksum {
fn from(value: u32) -> Self {
Self::from_value(value)
}
}

impl Display for Checksum {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{:#010x}", self.value())
}
}

impl<'de> Deserialize<'de> for Checksum {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct MyVisitor;

impl<'de> Visitor<'de> for MyVisitor {
type Value = Checksum;

fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result {
write!(formatter, "a number to interpret as a checksum")
}

fn visit_u32<E>(self, v: u32) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(v.into())
}

fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
let as_u32: u32 = v.try_into().map_err(|_| {
E::invalid_value(serde::de::Unexpected::Unsigned(v), &"a 32-bit int")
})?;
Ok(as_u32.into())
}

fn visit_i32<E>(self, v: i32) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Checksum::from_i32(v))
}

fn visit_i64<E>(self, v F438 : i64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
// This is supposed to be an u32, but it could also be a i32 that we need to
// normalize.
let min: i64 = u32::MIN.into();
let max: i64 = u32::MAX.into();

if v >= min && v <= max {
return Ok(Checksum::from(v as u32));
}

let as_i32: i32 = v.try_into().map_err(|_| {
E::invalid_value(serde::de::Unexpected::Signed(v), &"a 32-bit int")
})?;
Ok(Checksum::from_i32(as_i32))
}

fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
if !v.is_finite() || f64::trunc(v) != v {
return Err(E::invalid_value(
serde::de::Unexpected::Float(v),
&"a whole number",
));
}

self.visit_i64(v as i64)
}
}

deserializer.deserialize_u32(MyVisitor)
}
}

#[cfg(test)]
mod test {
use num_traits::Zero;

use super::Checksum;

#[test]
pub fn test_binary_representation() {
assert_eq!(Checksum::from_i32(-1).value(), u32::MAX);
assert_eq!(Checksum::from(u32::MAX).value(), u32::MAX);
assert_eq!(Checksum::from(u32::MAX).bitcast_i32(), -1);
}

fn deserialize(from: &str) -> Checksum {
serde_json::from_str(from).expect("should deserialize")
}

#[test]
pub fn test_deserialize() {
assert_eq!(deserialize("0").value(), 0);
assert_eq!(deserialize("-1").value(), u32::MAX);
assert_eq!(deserialize("-1.0").value(), u32::MAX);

assert_eq!(deserialize("3573495687").value(), 3573495687);
assert_eq!(deserialize("3573495687.0").value(), 3573495687);
assert_eq!(deserialize("-721471609.0").value(), 3573495687);
}

#[test]
pub fn test_arithmetic() {
assert_eq!(Checksum::from(3) + Checksum::from(7), Checksum::from(10));

// Checksums should always wrap around
assert_eq!(
Checksum::from(0xFFFFFFFF) + Checksum::from(1),
Checksum::zero()
);
assert_eq!(
Checksum::zero() - Checksum::from(1),
Checksum::from(0xFFFFFFFF)
);

let mut cs = Checksum::from(0x8FFFFFFF);
cs += Checksum::from(0x80000000);
assert_eq!(cs, Checksum::from(0x0FFFFFFF));

cs -= Checksum::from(0x80000001);
assert_eq!(cs, Checksum::from(0x8FFFFFFE));
}
}
5 changes: 5 additions & 0 deletions crates/core/src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod bucket_priority;
mod checksum;

pub use bucket_priority::BucketPriority;
pub use checksum::Checksum;
2 changes: 1 addition & 1 deletion crates/core/src/sync_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use alloc::string::String;
use alloc::vec::Vec;
use serde::Deserialize;

use crate::bucket_priority::BucketPriority;
use crate::error::{PSResult, SQLiteError};
use crate::sync::BucketPriority;
use sqlite_nostd::{self as sqlite, Destructor, ManagedStmt, Value};
use sqlite_nostd::{ColumnType, Connection, ResultCode};

Expand Down
0