diff --git a/.github/workflows/android.yml b/.github/workflows/android.yml index 4b1231f..5c09328 100644 --- a/.github/workflows/android.yml +++ b/.github/workflows/android.yml @@ -22,8 +22,8 @@ jobs: - name: Setup run: | - rustup toolchain install nightly-2024-05-18-x86_64-unknown-linux-gnu - rustup component add rust-src --toolchain nightly-2024-05-18-x86_64-unknown-linux-gnu + rustup toolchain install nightly-2025-04-15-x86_64-unknown-linux-gnu + rustup component add rust-src --toolchain nightly-2025-04-15-x86_64-unknown-linux-gnu rustup target add \ aarch64-linux-android \ armv7-linux-androideabi \ diff --git a/.github/workflows/ios.yml b/.github/workflows/ios.yml index aebd68e..4c6057e 100644 --- a/.github/workflows/ios.yml +++ b/.github/workflows/ios.yml @@ -14,8 +14,8 @@ jobs: - name: Setup run: | - rustup toolchain install nightly-2024-05-18-aarch64-apple-darwin - rustup component add rust-src --toolchain nightly-2024-05-18-aarch64-apple-darwin + rustup toolchain install nightly-2025-04-15-aarch64-apple-darwin + rustup component add rust-src --toolchain nightly-2025-04-15-aarch64-apple-darwin rustup target add \ x86_64-apple-darwin \ aarch64-apple-darwin \ diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index a91d5bf..0e8d7cb 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -15,7 +15,7 @@ jobs: - name: Install Rust Nightly uses: dtolnay/rust-toolchain@stable with: - toolchain: nightly-2024-05-18 + toolchain: nightly-2025-04-15 components: rust-src - name: Build binaries @@ -33,7 +33,7 @@ jobs: - name: Install Rust Nightly uses: dtolnay/rust-toolchain@stable with: - toolchain: nightly-2024-05-18 + toolchain: nightly-2025-04-15 components: rust-src - name: Build binaries diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml index 67d24fa..a59e0cd 100644 --- a/.github/workflows/macos.yml +++ b/.github/workflows/macos.yml @@ -15,7 +15,7 @@ jobs: - name: Install Rust Nightly uses: dtolnay/rust-toolchain@stable with: - toolchain: nightly-2024-05-18 + toolchain: nightly-2025-04-15 components: rust-src - name: Build binary @@ -33,7 +33,7 @@ jobs: - name: Install Rust Nightly uses: dtolnay/rust-toolchain@stable with: - toolchain: nightly-2024-05-18 + toolchain: nightly-2025-04-15 components: rust-src - name: Build binary diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 36028f3..9c9564b 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -52,8 +52,8 @@ jobs: - name: Setup run: | - rustup toolchain install nightly-2024-05-18-x86_64-unknown-linux-gnu - rustup component add rust-src --toolchain nightly-2024-05-18-x86_64-unknown-linux-gnu + rustup toolchain install nightly-2025-04-15-x86_64-unknown-linux-gnu + rustup component add rust-src --toolchain nightly-2025-04-15-x86_64-unknown-linux-gnu rustup target add \ aarch64-linux-android \ armv7-linux-androideabi \ @@ -84,8 +84,8 @@ jobs: - name: Setup run: | - rustup toolchain install nightly-2024-05-18-aarch64-apple-darwin - rustup component add rust-src --toolchain nightly-2024-05-18-aarch64-apple-darwin + rustup toolchain install nightly-2025-04-15-aarch64-apple-darwin + rustup component add rust-src --toolchain nightly-2025-04-15-aarch64-apple-darwin rustup target add \ x86_64-apple-darwin \ aarch64-apple-darwin \ @@ -153,7 +153,7 @@ jobs: - name: Install Rust Nightly uses: dtolnay/rust-toolchain@stable with: - toolchain: nightly-2024-05-18 + toolchain: nightly-2025-04-15 components: rust-src - name: Build binaries @@ -178,7 +178,7 @@ jobs: - name: Install Rust Nightly uses: dtolnay/rust-toolchain@stable with: - toolchain: nightly-2024-05-18 + toolchain: nightly-2025-04-15 components: rust-src - name: Build binaries @@ -203,7 +203,7 @@ jobs: - name: Install Rust Nightly uses: dtolnay/rust-toolchain@stable with: - toolchain: nightly-2024-05-18 + toolchain: nightly-2025-04-15 components: rust-src - name: Build binary @@ -228,7 +228,7 @@ jobs: - name: Install Rust Nightly uses: dtolnay/rust-toolchain@stable with: - toolchain: nightly-2024-05-18 + toolchain: nightly-2025-04-15 components: rust-src - name: Build binary @@ -253,7 +253,7 @@ jobs: - name: Install Rust Nightly uses: dtolnay/rust-toolchain@stable with: - toolchain: nightly-2024-05-18 + toolchain: nightly-2025-04-15 components: rust-src - name: Build binary @@ -278,7 +278,7 @@ jobs: - name: Install Rust Nightly uses: dtolnay/rust-toolchain@stable with: - toolchain: nightly-2024-05-18 + toolchain: nightly-2025-04-15 components: rust-src - name: Setup emsdk diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 20ec38d..3d771bd 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -27,7 +27,7 @@ jobs: - name: Install Rust Nightly uses: dtolnay/rust-toolchain@stable with: - toolchain: nightly-2024-05-18 + toolchain: nightly-2025-04-15 components: rust-src - name: Build diff --git a/.github/workflows/wasm.yml b/.github/workflows/wasm.yml index eab0977..a37d3ab 100644 --- a/.github/workflows/wasm.yml +++ b/.github/workflows/wasm.yml @@ -8,20 +8,20 @@ jobs: if: github.event_name == 'push' || (github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name != github.repository) runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: submodules: true - name: Install Rust Nightly uses: dtolnay/rust-toolchain@stable with: - toolchain: nightly-2024-05-18 + toolchain: nightly-2025-04-15 components: rust-src - name: Setup emsdk uses: mymindstorm/setup-emsdk@v14 with: - version: 3.1.68 + version: 4.0.7 - name: Build WASM run: ./tool/build_wasm.sh diff --git a/.github/workflows/windows.yml b/.github/workflows/windows.yml index fa13aab..5ac33a3 100644 --- a/.github/workflows/windows.yml +++ b/.github/workflows/windows.yml @@ -15,7 +15,7 @@ jobs: - name: Install Rust Nightly uses: dtolnay/rust-toolchain@stable with: - toolchain: nightly-2024-05-18 + toolchain: nightly-2025-04-15 components: rust-src - name: Build binary diff --git a/Cargo.lock b/Cargo.lock index 0f3c7d5..6d69fcb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -246,7 +246,6 @@ dependencies = [ "serde", "serde_json", "sqlite_nostd", - "streaming-iterator", "uuid", ] @@ -404,12 +403,6 @@ dependencies = [ "sqlite3_capi", ] -[[package]] -name = "streaming-iterator" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b2231b7c3057d5e4ad0156fb3dc807d900806020c5ffa3ee6ff2c8c76fb8520" - [[package]] name = "syn" version = "1.0.109" diff --git a/Cargo.toml b/Cargo.toml index 3506783..eb7eb75 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,6 @@ default-members = ["crates/shell", "crates/sqlite"] [profile.dev] panic = "abort" -strip = true [profile.release] panic = "abort" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 9aef67e..419e484 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -19,7 +19,6 @@ num-traits = { version = "0.2.15", default-features = false } num-derive = "0.3" serde_json = { version = "1.0", default-features = false, features = ["alloc"] } serde = { version = "1.0", default-features = false, features = ["alloc", "derive"] } -streaming-iterator = { version = "0.1.9", default-features = false, features = ["alloc"] } const_format = "0.2.34" [dependencies.uuid] diff --git a/crates/core/src/checkpoint.rs b/crates/core/src/checkpoint.rs index 1e6527d..7e96272 100644 --- a/crates/core/src/checkpoint.rs +++ b/crates/core/src/checkpoint.rs @@ -1,11 +1,10 @@ extern crate alloc; -use alloc::format; use alloc::string::String; use alloc::vec::Vec; use core::ffi::c_int; -use serde::{Deserialize, Serialize}; +use serde::Serialize; use serde_json as json; use sqlite::ResultCode; use sqlite_nostd as sqlite; @@ -13,9 +12,10 @@ use sqlite_nostd::{Connection, Context, Value}; use crate::create_sqlite_text_fn; use crate::error::SQLiteError; -use crate::sync_types::Checkpoint; +use crate::sync::checkpoint::{validate_checkpoint, OwnedBucketChecksum}; +use crate::sync::line::Checkpoint; -#[derive(Serialize, Deserialize)] +#[derive(Serialize)] struct CheckpointResult { valid: bool, failed_buckets: Vec, @@ -26,53 +26,23 @@ fn powersync_validate_checkpoint_impl( args: &[*mut sqlite::value], ) -> Result { let data = args[0].text(); - - let _checkpoint: Checkpoint = serde_json::from_str(data)?; - + let checkpoint: Checkpoint = serde_json::from_str(data)?; let db = ctx.db_handle(); - - // language=SQLite - let statement = db.prepare_v2( - "WITH -bucket_list(bucket, checksum) AS ( - SELECT - json_extract(json_each.value, '$.bucket') as bucket, - json_extract(json_each.value, '$.checksum') as checksum - FROM json_each(json_extract(?1, '$.buckets')) -) -SELECT - bucket_list.bucket as bucket, - IFNULL(buckets.add_checksum, 0) as add_checksum, - IFNULL(buckets.op_checksum, 0) as oplog_checksum, - bucket_list.checksum as expected_checksum -FROM bucket_list - LEFT OUTER JOIN ps_buckets AS buckets ON - buckets.name = bucket_list.bucket -GROUP BY bucket_list.bucket", - )?; - - statement.bind_text(1, data, sqlite::Destructor::STATIC)?; - - let mut failures: Vec = alloc::vec![]; - - while statement.step()? == ResultCode::ROW { - let name = statement.column_text(0)?; - // checksums with column_int are wrapped to i32 by SQLite - let add_checksum = statement.column_int(1); - let oplog_checksum = statement.column_int(2); - let expected_checksum = statement.column_int(3); - - // wrapping add is like +, but safely overflows - let checksum = oplog_checksum.wrapping_add(add_checksum); - - if checksum != expected_checksum { - failures.push(String::from(name)); - } + let buckets: Vec = checkpoint + .buckets + .iter() + .map(OwnedBucketChecksum::from) + .collect(); + + let failures = validate_checkpoint(buckets.iter(), None, db)?; + let mut failed_buckets = Vec::::with_capacity(failures.len()); + for failure in failures { + failed_buckets.push(failure.bucket_name); } let result = CheckpointResult { - valid: failures.is_empty(), - failed_buckets: failures, + valid: failed_buckets.is_empty(), + failed_buckets: failed_buckets, }; Ok(json::to_string(&result)?) diff --git a/crates/core/src/diff.rs b/crates/core/src/diff.rs index 7e5ad18..fd37a05 100644 --- a/crates/core/src/diff.rs +++ b/crates/core/src/diff.rs @@ -1,6 +1,5 @@ extern crate alloc; -use alloc::format; use alloc::string::{String, ToString}; use core::ffi::c_int; diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index 3aa5dcf..e73fdd8 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -1,6 +1,9 @@ -use alloc::string::{String, ToString}; +use alloc::{ + format, + string::{String, ToString}, +}; use core::error::Error; -use sqlite_nostd::{sqlite3, Connection, ResultCode}; +use sqlite_nostd::{context, sqlite3, Connection, Context, ResultCode}; #[derive(Debug)] pub struct SQLiteError(pub ResultCode, pub Option); @@ -11,6 +14,24 @@ impl core::fmt::Display for SQLiteError { } } +impl SQLiteError { + pub fn apply_to_ctx(self, description: &str, ctx: *mut context) { + let SQLiteError(code, message) = self; + + if message.is_some() { + ctx.result_error(&format!("{:} {:}", description, message.unwrap())); + } else { + let error = ctx.db_handle().errmsg().unwrap(); + if error == "not an error" { + ctx.result_error(&format!("{:}", description)); + } else { + ctx.result_error(&format!("{:} {:}", description, error)); + } + } + ctx.result_error_code(code); + } +} + impl Error for SQLiteError {} pub trait PSResult { @@ -45,3 +66,9 @@ impl From for SQLiteError { SQLiteError(ResultCode::ABORT, Some(value.to_string())) } } + +impl From for SQLiteError { + fn from(value: core::fmt::Error) -> Self { + SQLiteError(ResultCode::INTERNAL, Some(format!("{}", value))) + } +} diff --git a/crates/core/src/fix035.rs b/crates/core/src/fix035.rs deleted file mode 100644 index f90cb6c..0000000 --- a/crates/core/src/fix035.rs +++ /dev/null @@ -1,47 +0,0 @@ -use alloc::format; - -use crate::error::{PSResult, SQLiteError}; -use sqlite_nostd as sqlite; -use sqlite_nostd::{Connection, ResultCode}; - -use crate::ext::SafeManagedStmt; -use crate::util::quote_identifier; - -// Apply a data migration to fix any existing data affected by the issue -// fixed in v0.3.5. -// -// The issue was that the `ps_updated_rows` table was not being populated -// with remove operations in some cases. This causes the rows to be removed -// from ps_oplog, but not from the ps_data__tables, resulting in dangling rows. -// -// The fix here is to find these dangling rows, and add them to ps_updated_rows. -// The next time the sync_local operation is run, these rows will be removed. -pub fn apply_v035_fix(db: *mut sqlite::sqlite3) -> Result { - // language=SQLite - let statement = db - .prepare_v2("SELECT name, powersync_external_table_name(name) FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data__*'") - .into_db_result(db)?; - - while statement.step()? == ResultCode::ROW { - let full_name = statement.column_text(0)?; - let short_name = statement.column_text(1)?; - let quoted = quote_identifier(full_name); - - // language=SQLite - let statement = db.prepare_v2(&format!( - " -INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) -SELECT ?1, id FROM {} - WHERE NOT EXISTS ( - SELECT 1 FROM ps_oplog - WHERE row_type = ?1 AND row_id = {}.id - );", - quoted, quoted - ))?; - statement.bind_text(1, short_name, sqlite::Destructor::STATIC)?; - - statement.exec()?; - } - - Ok(1) -} diff --git a/crates/core/src/fix_data.rs b/crates/core/src/fix_data.rs new file mode 100644 index 0000000..8dcab1b --- /dev/null +++ b/crates/core/src/fix_data.rs @@ -0,0 +1,200 @@ +use core::ffi::c_int; + +use alloc::format; +use alloc::string::String; + +use crate::create_sqlite_optional_text_fn; +use crate::error::{PSResult, SQLiteError}; +use sqlite_nostd::{self as sqlite, ColumnType, Value}; +use sqlite_nostd::{Connection, Context, ResultCode}; + +use crate::ext::SafeManagedStmt; +use crate::util::quote_identifier; + +// Apply a data migration to fix any existing data affected by the issue +// fixed in v0.3.5. +// +// The issue was that the `ps_updated_rows` table was not being populated +// with remove operations in some cases. This causes the rows to be removed +// from ps_oplog, but not from the ps_data__tables, resulting in dangling rows. +// +// The fix here is to find these dangling rows, and add them to ps_updated_rows. +// The next time the sync_local operation is run, these rows will be removed. +pub fn apply_v035_fix(db: *mut sqlite::sqlite3) -> Result { + // language=SQLite + let statement = db + .prepare_v2("SELECT name, powersync_external_table_name(name) FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data__*'") + .into_db_result(db)?; + + while statement.step()? == ResultCode::ROW { + let full_name = statement.column_text(0)?; + let short_name = statement.column_text(1)?; + let quoted = quote_identifier(full_name); + + // language=SQLite + let statement = db.prepare_v2(&format!( + " +INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) +SELECT ?1, id FROM {} + WHERE NOT EXISTS ( + SELECT 1 FROM ps_oplog + WHERE row_type = ?1 AND row_id = {}.id + );", + quoted, quoted + ))?; + statement.bind_text(1, short_name, sqlite::Destructor::STATIC)?; + + statement.exec()?; + } + + Ok(1) +} + +/// Older versions of the JavaScript SDK for PowerSync used to encode the subkey in oplog data +/// entries as JSON. +/// +/// It wasn't supposed to do that, since the keys are regular strings already. To make databases +/// created with those SDKs compatible with other SDKs or the sync client implemented in the core +/// extensions, a migration is necessary. Since this migration is only relevant for the JS SDK, it +/// is mostly implemented there. However, the helper function to remove the key encoding is +/// implemented here because user-defined functions are expensive on JavaScript. +fn remove_duplicate_key_encoding(key: &str) -> Option { + // Acceptable format: // + // Inacceptable format: //"" + // This is a bit of a tricky conversion because both type and id can contain slashes and quotes. + // However, the subkey is either a UUID value or a `/UUID` value - so we know it can't + // end in a quote unless the improper encoding was used. + if !key.ends_with('"') { + return None; + } + + // Since the subkey is JSON-encoded, find the start quote by going backwards. + let mut chars = key.char_indices(); + chars.next_back()?; // Skip the quote ending the string + + enum FoundStartingQuote { + HasQuote { index: usize }, + HasBackslachThenQuote { quote_index: usize }, + } + let mut state: Option = None; + let found_starting_quote = loop { + if let Some((i, char)) = chars.next_back() { + state = match state { + Some(FoundStartingQuote::HasQuote { index }) => { + if char == '\\' { + // We've seen a \" pattern, not the start of the string + Some(FoundStartingQuote::HasBackslachThenQuote { quote_index: index }) + } else { + break Some(index); + } + } + Some(FoundStartingQuote::HasBackslachThenQuote { quote_index }) => { + if char == '\\' { + // \\" pattern, the quote is unescaped + break Some(quote_index); + } else { + None + } + } + None => { + if char == '"' { + Some(FoundStartingQuote::HasQuote { index: i }) + } else { + None + } + } + } + } else { + break None; + } + }?; + + let before_json = &key[..found_starting_quote]; + let mut result: String = serde_json::from_str(&key[found_starting_quote..]).ok()?; + + result.insert_str(0, before_json); + Some(result) +} + +fn powersync_remove_duplicate_key_encoding_impl( + _ctx: *mut sqlite::context, + args: &[*mut sqlite::value], +) -> Result, SQLiteError> { + let arg = args.get(0).ok_or(ResultCode::MISUSE)?; + + if arg.value_type() != ColumnType::Text { + return Err(ResultCode::MISMATCH.into()); + } + + return Ok(remove_duplicate_key_encoding(arg.text())); +} + +create_sqlite_optional_text_fn!( + powersync_remove_duplicate_key_encoding, + powersync_remove_duplicate_key_encoding_impl, + "powersync_remove_duplicate_key_encoding" +); + +pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { + db.create_function_v2( + "powersync_remove_duplicate_key_encoding", + 1, + sqlite::UTF8 | sqlite::DETERMINISTIC, + None, + Some(powersync_remove_duplicate_key_encoding), + None, + None, + None, + )?; + Ok(()) +} + +#[cfg(test)] +mod test { + use core::assert_matches::assert_matches; + + use super::remove_duplicate_key_encoding; + + fn assert_unaffected(source: &str) { + assert_matches!(remove_duplicate_key_encoding(source), None); + } + + #[test] + fn does_not_change_unaffected_keys() { + assert_unaffected("object_type/object_id/subkey"); + assert_unaffected("object_type/object_id/null"); + + // Object type and ID could technically contain quotes and forward slashes + assert_unaffected(r#""object"/"type"/subkey"#); + assert_unaffected("object\"/type/object\"/id/subkey"); + + // Invalid key, but we shouldn't crash + assert_unaffected("\"key\""); + } + + #[test] + fn removes_quotes() { + assert_eq!( + remove_duplicate_key_encoding("foo/bar/\"baz\"").unwrap(), + "foo/bar/baz", + ); + + assert_eq!( + remove_duplicate_key_encoding(r#"foo/bar/"nested/subkey""#).unwrap(), + "foo/bar/nested/subkey" + ); + + assert_eq!( + remove_duplicate_key_encoding(r#"foo/bar/"escaped\"key""#).unwrap(), + "foo/bar/escaped\"key" + ); + assert_eq!( + remove_duplicate_key_encoding(r#"foo/bar/"escaped\\key""#).unwrap(), + "foo/bar/escaped\\key" + ); + assert_eq!( + remove_duplicate_key_encoding(r#"foo/bar/"/\\"subkey""#).unwrap(), + "foo/bar/\"/\\\\subkey" + ); + } +} diff --git a/crates/core/src/json_merge.rs b/crates/core/src/json_merge.rs index 80c1687..cb31479 100644 --- a/crates/core/src/json_merge.rs +++ b/crates/core/src/json_merge.rs @@ -1,6 +1,5 @@ extern crate alloc; -use alloc::format; use alloc::string::{String, ToString}; use core::ffi::c_int; diff --git a/crates/core/src/kv.rs b/crates/core/src/kv.rs index 811409d..c8d9848 100644 --- a/crates/core/src/kv.rs +++ b/crates/core/src/kv.rs @@ -8,10 +8,10 @@ use sqlite::ResultCode; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, Context}; -use crate::bucket_priority::BucketPriority; use crate::create_sqlite_optional_text_fn; use crate::create_sqlite_text_fn; use crate::error::SQLiteError; +use crate::sync::BucketPriority; fn powersync_client_id_impl( ctx: *mut sqlite::context, diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index b191fd0..cd50ea3 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -2,7 +2,6 @@ #![feature(vec_into_raw_parts)] #![allow(internal_features)] #![feature(core_intrinsics)] -#![feature(error_in_core)] #![feature(assert_matches)] extern crate alloc; @@ -12,13 +11,12 @@ use core::ffi::{c_char, c_int}; use sqlite::ResultCode; use sqlite_nostd as sqlite; -mod bucket_priority; mod checkpoint; mod crud_vtab; mod diff; mod error; mod ext; -mod fix035; +mod fix_data; mod json_merge; mod kv; mod macros; @@ -26,8 +24,8 @@ mod migrations; mod operations; mod operations_vtab; mod schema; +mod sync; mod sync_local; -mod sync_types; mod util; mod uuid; mod version; @@ -57,6 +55,7 @@ fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { crate::views::register(db)?; crate::uuid::register(db)?; crate::diff::register(db)?; + crate::fix_data::register(db)?; crate::json_merge::register(db)?; crate::view_admin::register(db)?; crate::checkpoint::register(db)?; diff --git a/crates/core/src/macros.rs b/crates/core/src/macros.rs index aeb1f1a..459a5ab 100644 --- a/crates/core/src/macros.rs +++ b/crates/core/src/macros.rs @@ -11,18 +11,7 @@ macro_rules! create_sqlite_text_fn { let result = $fn_impl_name(ctx, args); if let Err(err) = result { - let SQLiteError(code, message) = SQLiteError::from(err); - if message.is_some() { - ctx.result_error(&format!("{:} {:}", $description, message.unwrap())); - } else { - let error = ctx.db_handle().errmsg().unwrap(); - if error == "not an error" { - ctx.result_error(&format!("{:}", $description)); - } else { - ctx.result_error(&format!("{:} {:}", $description, error)); - } - } - ctx.result_error_code(code); + SQLiteError::from(err).apply_to_ctx($description, ctx); } else if let Ok(r) = result { ctx.result_text_transient(&r); } @@ -43,18 +32,7 @@ macro_rules! create_sqlite_optional_text_fn { let result = $fn_impl_name(ctx, args); if let Err(err) = result { - let SQLiteError(code, message) = SQLiteError::from(err); - if message.is_some() { - ctx.result_error(&format!("{:} {:}", $description, message.unwrap())); - } else { - let error = ctx.db_handle().errmsg().unwrap(); - if error == "not an error" { - ctx.result_error(&format!("{:}", $description)); - } else { - ctx.result_error(&format!("{:} {:}", $description, error)); - } - } - ctx.result_error_code(code); + SQLiteError::from(err).apply_to_ctx($description, ctx); } else if let Ok(r) = result { if let Some(s) = r { ctx.result_text_transient(&s); diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index 7241957..8fd5357 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -8,9 +8,9 @@ use sqlite::ResultCode; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, Context}; -use crate::bucket_priority::BucketPriority; use crate::error::{PSResult, SQLiteError}; -use crate::fix035::apply_v035_fix; +use crate::fix_data::apply_v035_fix; +use crate::sync::BucketPriority; pub const LATEST_VERSION: i32 = 9; diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index c0b7622..74e7cd3 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -1,7 +1,13 @@ use alloc::format; use alloc::string::String; +use alloc::vec::Vec; +use num_traits::Zero; +use serde::Deserialize; use crate::error::{PSResult, SQLiteError}; +use crate::sync::line::DataLine; +use crate::sync::operations::insert_bucket_operations; +use crate::sync::Checksum; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, ResultCode}; @@ -9,246 +15,15 @@ use crate::ext::SafeManagedStmt; // Run inside a transaction pub fn insert_operation(db: *mut sqlite::sqlite3, data: &str) -> Result<(), SQLiteError> { - // language=SQLite - let statement = db.prepare_v2( - "\ -SELECT - json_extract(e.value, '$.bucket') as bucket, - json_extract(e.value, '$.data') as data, - json_extract(e.value, '$.has_more') as has_more, - json_extract(e.value, '$.after') as after, - json_extract(e.value, '$.next_after') as next_after -FROM json_each(json_extract(?1, '$.buckets')) e", - )?; - statement.bind_text(1, data, sqlite::Destructor::STATIC)?; - - while statement.step()? == ResultCode::ROW { - let bucket = statement.column_text(0)?; - let data = statement.column_text(1)?; - // let _has_more = statement.column_int(2)? != 0; - // let _after = statement.column_text(3)?; - // let _next_after = statement.column_text(4)?; - - insert_bucket_operations(db, bucket, data)?; - } - - Ok(()) -} - -pub fn insert_bucket_operations( - db: *mut sqlite::sqlite3, - bucket: &str, - data: &str, -) -> Result<(), SQLiteError> { - // Statement to insert new operations (only for PUT and REMOVE). - // language=SQLite - let iterate_statement = db.prepare_v2( - "\ -SELECT - json_extract(e.value, '$.op_id') as op_id, - json_extract(e.value, '$.op') as op, - json_extract(e.value, '$.object_type') as object_type, - json_extract(e.value, '$.object_id') as object_id, - json_extract(e.value, '$.checksum') as checksum, - json_extract(e.value, '$.data') as data, - json_extract(e.value, '$.subkey') as subkey -FROM json_each(?) e", - )?; - iterate_statement.bind_text(1, data, sqlite::Destructor::STATIC)?; - - // We do an ON CONFLICT UPDATE simply so that the RETURNING bit works for existing rows. - // We can consider splitting this into separate SELECT and INSERT statements. - // language=SQLite - let bucket_statement = db.prepare_v2( - "INSERT INTO ps_buckets(name) - VALUES(?) - ON CONFLICT DO UPDATE - SET last_applied_op = last_applied_op - RETURNING id, last_applied_op", - )?; - bucket_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; - bucket_statement.step()?; - - let bucket_id = bucket_statement.column_int64(0); - - // This is an optimization for initial sync - we can avoid persisting individual REMOVE - // operations when last_applied_op = 0. - // We do still need to do the "supersede_statement" step for this case, since a REMOVE - // operation can supersede another PUT operation we're syncing at the same time. - let mut is_empty = bucket_statement.column_int64(1) == 0; - - // Statement to supersede (replace) operations with the same key. - // language=SQLite - let supersede_statement = db.prepare_v2( - "\ -DELETE FROM ps_oplog - WHERE unlikely(ps_oplog.bucket = ?1) - AND ps_oplog.key = ?2 -RETURNING op_id, hash", - )?; - supersede_statement.bind_int64(1, bucket_id)?; - - // language=SQLite - let insert_statement = db.prepare_v2("\ -INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, ?, ?)")?; - insert_statement.bind_int64(1, bucket_id)?; - - let updated_row_statement = db.prepare_v2( - "\ -INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)", - )?; - - bucket_statement.reset()?; - - let mut last_op: Option = None; - let mut add_checksum: i32 = 0; - let mut op_checksum: i32 = 0; - let mut added_ops: i32 = 0; - - while iterate_statement.step()? == ResultCode::ROW { - let op_id = iterate_statement.column_int64(0); - let op = iterate_statement.column_text(1)?; - let object_type = iterate_statement.column_text(2); - let object_id = iterate_statement.column_text(3); - let checksum = iterate_statement.column_int(4); - let op_data = iterate_statement.column_text(5); - - last_op = Some(op_id); - added_ops += 1; - - if op == "PUT" || op == "REMOVE" { - let key: String; - if let (Ok(object_type), Ok(object_id)) = (object_type.as_ref(), object_id.as_ref()) { - let subkey = iterate_statement.column_text(6).unwrap_or("null"); - key = format!("{}/{}/{}", &object_type, &object_id, subkey); - } else { - key = String::from(""); - } - - supersede_statement.bind_text(2, &key, sqlite::Destructor::STATIC)?; - - let mut superseded = false; - - while supersede_statement.step()? == ResultCode::ROW { - // Superseded (deleted) a previous operation, add the checksum - let supersede_checksum = supersede_statement.column_int(1); - add_checksum = add_checksum.wrapping_add(supersede_checksum); - op_checksum = op_checksum.wrapping_sub(supersede_checksum); - - // Superseded an operation, only skip if the bucket was empty - // Previously this checked "superseded_op <= last_applied_op". - // However, that would not account for a case where a previous - // PUT operation superseded the original PUT operation in this - // same batch, in which case superseded_op is not accurate for this. - if !is_empty { - superseded = true; - } - } - supersede_statement.reset()?; - - if op == "REMOVE" { - let should_skip_remove = !superseded; - - add_checksum = add_checksum.wrapping_add(checksum); - - if !should_skip_remove { - if let (Ok(object_type), Ok(object_id)) = (object_type, object_id) { - updated_row_statement.bind_text( - 1, - object_type, - sqlite::Destructor::STATIC, - )?; - updated_row_statement.bind_text( - 2, - object_id, - sqlite::Destructor::STATIC, - )?; - updated_row_statement.exec()?; - } - } - - continue; - } - - insert_statement.bind_int64(2, op_id)?; - if key != "" { - insert_statement.bind_text(3, &key, sqlite::Destructor::STATIC)?; - } else { - insert_statement.bind_null(3)?; - } - - if let (Ok(object_type), Ok(object_id)) = (object_type, object_id) { - insert_statement.bind_text(4, object_type, sqlite::Destructor::STATIC)?; - insert_statement.bind_text(5, object_id, sqlite::Destructor::STATIC)?; - } else { - insert_statement.bind_null(4)?; - insert_statement.bind_null(5)?; - } - if let Ok(data) = op_data { - insert_statement.bind_text(6, data, sqlite::Destructor::STATIC)?; - } else { - insert_statement.bind_null(6)?; - } - - insert_statement.bind_int(7, checksum)?; - insert_statement.exec()?; - - op_checksum = op_checksum.wrapping_add(checksum); - } else if op == "MOVE" { - add_checksum = add_checksum.wrapping_add(checksum); - } else if op == "CLEAR" { - // Any remaining PUT operations should get an implicit REMOVE - // language=SQLite - let clear_statement1 = db - .prepare_v2( - "INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) -SELECT row_type, row_id -FROM ps_oplog -WHERE bucket = ?1", - ) - .into_db_result(db)?; - clear_statement1.bind_int64(1, bucket_id)?; - clear_statement1.exec()?; - - let clear_statement2 = db - .prepare_v2("DELETE FROM ps_oplog WHERE bucket = ?1") - .into_db_result(db)?; - clear_statement2.bind_int64(1, bucket_id)?; - clear_statement2.exec()?; - - // And we need to re-apply all of those. - // We also replace the checksum with the checksum of the CLEAR op. - // language=SQLite - let clear_statement2 = db.prepare_v2( - "UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE id = ?2", - )?; - clear_statement2.bind_int64(2, bucket_id)?; - clear_statement2.bind_int(1, checksum)?; - clear_statement2.exec()?; - - add_checksum = 0; - is_empty = true; - op_checksum = 0; - } + #[derive(Deserialize)] + struct BucketBatch<'a> { + #[serde(borrow)] + buckets: Vec>, } - if let Some(last_op) = &last_op { - // language=SQLite - let statement = db.prepare_v2( - "UPDATE ps_buckets - SET last_op = ?2, - add_checksum = (add_checksum + ?3) & 0xffffffff, - op_checksum = (op_checksum + ?4) & 0xffffffff, - count_since_last = count_since_last + ?5 - WHERE id = ?1", - )?; - statement.bind_int64(1, bucket_id)?; - statement.bind_int64(2, *last_op)?; - statement.bind_int(3, add_checksum)?; - statement.bind_int(4, op_checksum)?; - statement.bind_int(5, added_ops)?; - - statement.exec()?; + let batch: BucketBatch = serde_json::from_str(data)?; + for line in &batch.buckets { + insert_bucket_operations(db, line)?; } Ok(()) diff --git a/crates/core/src/schema/management.rs b/crates/core/src/schema/management.rs index 6f66d03..bccba3f 100644 --- a/crates/core/src/schema/management.rs +++ b/crates/core/src/schema/management.rs @@ -1,8 +1,8 @@ extern crate alloc; -use alloc::format; use alloc::string::String; use alloc::vec::Vec; +use alloc::{format, vec}; use core::ffi::c_int; use sqlite::{Connection, ResultCode, Value}; @@ -14,6 +14,8 @@ use crate::ext::ExtendedDatabase; use crate::util::{quote_identifier, quote_json_path}; use crate::{create_auto_tx_function, create_sqlite_text_fn}; +use super::Schema; + fn update_tables(db: *mut sqlite::sqlite3, schema: &str) -> Result<(), SQLiteError> { { // In a block so that the statement is finalized before dropping tables @@ -138,87 +140,83 @@ SELECT name, internal_name, local_only FROM powersync_tables WHERE name NOT IN ( fn update_indexes(db: *mut sqlite::sqlite3, schema: &str) -> Result<(), SQLiteError> { let mut statements: Vec = alloc::vec![]; + let schema = serde_json::from_str::(schema)?; + let mut expected_index_names: Vec = vec![]; { // In a block so that the statement is finalized before dropping indexes // language=SQLite - let statement = db.prepare_v2("\ -SELECT - powersync_internal_table_name(tables.value) as table_name, - (powersync_internal_table_name(tables.value) || '__' || json_extract(indexes.value, '$.name')) as index_name, - json_extract(indexes.value, '$.columns') as index_columns, - ifnull(sqlite_master.sql, '') as sql - FROM json_each(json_extract(?, '$.tables')) tables - CROSS JOIN json_each(json_extract(tables.value, '$.indexes')) indexes - LEFT JOIN sqlite_master ON sqlite_master.name = index_name AND sqlite_master.type = 'index' - ").into_db_result(db)?; - statement.bind_text(1, schema, sqlite::Destructor::STATIC)?; + let find_index = + db.prepare_v2("SELECT sql FROM sqlite_master WHERE name = ? AND type = 'index'")?; - while statement.step().into_db_result(db)? == ResultCode::ROW { - let table_name = statement.column_text(0)?; - let index_name = statement.column_text(1)?; - let columns = statement.column_text(2)?; - let existing_sql = statement.column_text(3)?; - - // language=SQLite - let stmt2 = db.prepare_v2("select json_extract(e.value, '$.name') as name, json_extract(e.value, '$.type') as type, json_extract(e.value, '$.ascending') as ascending from json_each(?) e")?; - stmt2.bind_text(1, columns, sqlite::Destructor::STATIC)?; - - let mut column_values: Vec = alloc::vec![]; - while stmt2.step()? == ResultCode::ROW { - let name = stmt2.column_text(0)?; - let type_name = stmt2.column_text(1)?; - let ascending = stmt2.column_int(2) != 0; - - if ascending { - let value = format!( + for table in &schema.tables { + let table_name = table.internal_name(); + + for index in &table.indexes { + let index_name = format!("{}__{}", table_name, &index.name); + + let existing_sql = { + find_index.reset()?; + find_index.bind_text(1, &index_name, sqlite::Destructor::STATIC)?; + + let result = if let ResultCode::ROW = find_index.step()? { + Some(find_index.column_text(0)?) + } else { + None + }; + + result + }; + + let mut column_values: Vec = alloc::vec![]; + for indexed_column in &index.columns { + let mut value = format!( "CAST(json_extract(data, {:}) as {:})", - quote_json_path(name), - type_name - ); - column_values.push(value); - } else { - let value = format!( - "CAST(json_extract(data, {:}) as {:}) DESC", - quote_json_path(name), - type_name + quote_json_path(&indexed_column.name), + &indexed_column.type_name ); + + if !indexed_column.ascending { + value += " DESC"; + } + column_values.push(value); } - } - let sql = format!( - "CREATE INDEX {} ON {}({})", - quote_identifier(index_name), - quote_identifier(table_name), - column_values.join(", ") - ); - if existing_sql == "" { - statements.push(sql); - } else if existing_sql != sql { - statements.push(format!("DROP INDEX {}", quote_identifier(index_name))); - statements.push(sql); + let sql = format!( + "CREATE INDEX {} ON {}({})", + quote_identifier(&index_name), + quote_identifier(&table_name), + column_values.join(", ") + ); + + if existing_sql.is_none() { + statements.push(sql); + } else if existing_sql != Some(&sql) { + statements.push(format!("DROP INDEX {}", quote_identifier(&index_name))); + statements.push(sql); + } + + expected_index_names.push(index_name); } } // In a block so that the statement is finalized before dropping indexes // language=SQLite - let statement = db.prepare_v2("\ -WITH schema_indexes AS ( -SELECT - powersync_internal_table_name(tables.value) as table_name, - (powersync_internal_table_name(tables.value) || '__' || json_extract(indexes.value, '$.name')) as index_name - FROM json_each(json_extract(?1, '$.tables')) tables - CROSS JOIN json_each(json_extract(tables.value, '$.indexes')) indexes -) + let statement = db + .prepare_v2( + "\ SELECT sqlite_master.name as index_name FROM sqlite_master WHERE sqlite_master.type = 'index' AND sqlite_master.name GLOB 'ps_data_*' - AND sqlite_master.name NOT IN (SELECT index_name FROM schema_indexes) -").into_db_result(db)?; - statement.bind_text(1, schema, sqlite::Destructor::STATIC)?; + AND sqlite_master.name NOT IN (SELECT value FROM json_each(?)) +", + ) + .into_db_result(db)?; + let json_names = serde_json::to_string(&expected_index_names)?; + statement.bind_text(1, &json_names, sqlite::Destructor::STATIC)?; while statement.step()? == ResultCode::ROW { let name = statement.column_text(0)?; diff --git a/crates/core/src/schema/mod.rs b/crates/core/src/schema/mod.rs index a0a277e..76a8c4a 100644 --- a/crates/core/src/schema/mod.rs +++ b/crates/core/src/schema/mod.rs @@ -1,11 +1,16 @@ mod management; mod table_info; +use alloc::vec::Vec; +use serde::Deserialize; use sqlite::ResultCode; use sqlite_nostd as sqlite; -pub use table_info::{ - ColumnInfo, ColumnNameAndTypeStatement, DiffIncludeOld, TableInfo, TableInfoFlags, -}; +pub use table_info::{Column, DiffIncludeOld, Table, TableInfoFlags}; + +#[derive(Deserialize)] +pub struct Schema { + tables: Vec, +} pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { management::register(db) diff --git a/crates/core/src/schema/table_info.rs b/crates/core/src/schema/table_info.rs index 0bfbfa5..4224221 100644 --- a/crates/core/src/schema/table_info.rs +++ b/crates/core/src/schema/table_info.rs @@ -1,103 +1,127 @@ -use core::marker::PhantomData; +use alloc::{format, string::String, vec, vec::Vec}; +use serde::{de::Visitor, Deserialize}; -use alloc::{ - string::{String, ToString}, - vec::Vec, -}; -use streaming_iterator::StreamingIterator; - -use crate::error::SQLiteError; -use sqlite::{Connection, ResultCode}; -use sqlite_nostd::{self as sqlite, ManagedStmt}; - -pub struct TableInfo { +#[derive(Deserialize)] +pub struct Table { pub name: String, - pub view_name: String, + #[serde(rename = "view_name")] + pub view_name_override: Option, + pub columns: Vec, + #[serde(default)] + pub indexes: Vec, + #[serde( + default, + rename = "include_old", + deserialize_with = "deserialize_include_old" + )] pub diff_include_old: Option, + #[serde(flatten)] pub flags: TableInfoFlags, } -impl TableInfo { - pub fn parse_from(db: *mut sqlite::sqlite3, data: &str) -> Result { - // language=SQLite - let statement = db.prepare_v2( - "SELECT - json_extract(?1, '$.name'), - ifnull(json_extract(?1, '$.view_name'), json_extract(?1, '$.name')), - json_extract(?1, '$.local_only'), - json_extract(?1, '$.insert_only'), - json_extract(?1, '$.include_old'), - json_extract(?1, '$.include_metadata'), - json_extract(?1, '$.include_old_only_when_changed'), - json_extract(?1, '$.ignore_empty_update')", - )?; - statement.bind_text(1, data, sqlite::Destructor::STATIC)?; - - let step_result = statement.step()?; - if step_result != ResultCode::ROW { - return Err(SQLiteError::from(ResultCode::SCHEMA)); - } +impl Table { + pub fn from_json(text: &str) -> Result { + serde_json::from_str(text) + } - let name = statement.column_text(0)?.to_string(); - let view_name = statement.column_text(1)?.to_string(); - let flags = { - let local_only = statement.column_int(2) != 0; - let insert_only = statement.column_int(3) != 0; - let include_metadata = statement.column_int(5) != 0; - let include_old_only_when_changed = statement.column_int(6) != 0; - let ignore_empty_update = statement.column_int(7) != 0; - - let mut flags = TableInfoFlags::default(); - flags = flags.set_flag(TableInfoFlags::LOCAL_ONLY, local_only); - flags = flags.set_flag(TableInfoFlags::INSERT_ONLY, insert_only); - flags = flags.set_flag(TableInfoFlags::INCLUDE_METADATA, include_metadata); - flags = flags.set_flag( - TableInfoFlags::INCLUDE_OLD_ONLY_WHEN_CHANGED, - include_old_only_when_changed, - ); - flags = flags.set_flag(TableInfoFlags::IGNORE_EMPTY_UPDATE, ignore_empty_update); - flags - }; - - let include_old = match statement.column_type(4)? { - sqlite_nostd::ColumnType::Text => { - let columns: Vec = serde_json::from_str(statement.column_text(4)?)?; - Some(DiffIncludeOld::OnlyForColumns { columns }) - } + pub fn view_name(&self) -> &str { + self.view_name_override + .as_deref() + .unwrap_or(self.name.as_str()) + } - sqlite_nostd::ColumnType::Integer => { - if statement.column_int(4) != 0 { - Some(DiffIncludeOld::ForAllColumns) - } else { - None - } - } - _ => None, - }; - - // Don't allow include_metadata for local_only tables, it breaks our trigger setup and makes - // no sense because these changes are never inserted into ps_crud. - if flags.include_metadata() && flags.local_only() { - return Err(SQLiteError( - ResultCode::ERROR, - Some("include_metadata and local_only are incompatible".to_string()), - )); + pub fn internal_name(&self) -> String { + if self.flags.local_only() { + format!("ps_data_local__{:}", self.name) + } else { + format!("ps_data__{:}", self.name) } + } - return Ok(TableInfo { - name, - view_name, - diff_include_old: include_old, - flags, - }); + pub fn column_names(&self) -> impl Iterator { + self.columns.iter().map(|c| c.name.as_str()) } } +#[derive(Deserialize)] +pub struct Column { + pub name: String, + #[serde(rename = "type")] + pub type_name: String, +} + +#[derive(Deserialize)] +pub struct Index { + pub name: String, + pub columns: Vec, +} + +#[derive(Deserialize)] +pub struct IndexedColumn { + pub name: String, + pub ascending: bool, + #[serde(rename = "type")] + pub type_name: String, +} + pub enum DiffIncludeOld { OnlyForColumns { columns: Vec }, ForAllColumns, } +fn deserialize_include_old<'de, D: serde::Deserializer<'de>>( + deserializer: D, +) -> Result, D::Error> { + struct IncludeOldVisitor; + + impl<'de> Visitor<'de> for IncludeOldVisitor { + type Value = Option; + + fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result { + write!(formatter, "an array of columns, or true") + } + + fn visit_some(self, deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + deserializer.deserialize_any(self) + } + + fn visit_none(self) -> Result + where + E: serde::de::Error, + { + return Ok(None); + } + + fn visit_bool(self, v: bool) -> Result + where + E: serde::de::Error, + { + Ok(if v { + Some(DiffIncludeOld::ForAllColumns) + } else { + None + }) + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + let mut elements: Vec = vec![]; + while let Some(next) = seq.next_element::()? { + elements.push(next); + } + + Ok(Some(DiffIncludeOld::OnlyForColumns { columns: elements })) + } + } + + deserializer.deserialize_option(IncludeOldVisitor) +} + #[derive(Clone, Copy)] #[repr(transparent)] pub struct TableInfoFlags(pub u32); @@ -148,53 +172,56 @@ impl Default for TableInfoFlags { } } -pub struct ColumnNameAndTypeStatement<'a> { - pub stmt: ManagedStmt, - table: PhantomData<&'a str>, -} +impl<'de> Deserialize<'de> for TableInfoFlags { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct FlagsVisitor; -impl ColumnNameAndTypeStatement<'_> { - pub fn new(db: *mut sqlite::sqlite3, table: &str) -> Result { - let stmt = db.prepare_v2("select json_extract(e.value, '$.name'), json_extract(e.value, '$.type') from json_each(json_extract(?, '$.columns')) e")?; - stmt.bind_text(1, table, sqlite::Destructor::STATIC)?; + impl<'de> Visitor<'de> for FlagsVisitor { + type Value = TableInfoFlags; - Ok(Self { - stmt, - table: PhantomData, - }) - } + fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result { + write!(formatter, "an object with table flags") + } - fn step(stmt: &ManagedStmt) -> Result, ResultCode> { - if stmt.step()? == ResultCode::ROW { - let name = stmt.column_text(0)?; - let type_name = stmt.column_text(1)?; + fn visit_map(self, mut map: A) -> Result + where + A: serde::de::MapAccess<'de>, + { + let mut flags = TableInfoFlags::default(); + + while let Some((key, value)) = map.next_entry::<&'de str, bool>()? { + flags = flags.set_flag( + match key { + "local_only" => TableInfoFlags::LOCAL_ONLY, + "insert_only" => TableInfoFlags::INSERT_ONLY, + "include_metadata" => TableInfoFlags::INCLUDE_METADATA, + "include_old_only_when_changed" => { + TableInfoFlags::INCLUDE_OLD_ONLY_WHEN_CHANGED + } + "ignore_empty_update" => TableInfoFlags::IGNORE_EMPTY_UPDATE, + _ => continue, + }, + value, + ); + } - return Ok(Some(ColumnInfo { name, type_name })); + Ok(flags) + } } - Ok(None) + deserializer.deserialize_struct( + "TableInfoFlags", + &[ + "local_only", + "insert_only", + "include_metadata", + "include_old_only_when_changed", + "ignore_empty_update", + ], + FlagsVisitor, + ) } - - pub fn streaming_iter( - &mut self, - ) -> impl StreamingIterator> { - streaming_iterator::from_fn(|| match Self::step(&self.stmt) { - Err(e) => Some(Err(e)), - Ok(Some(other)) => Some(Ok(other)), - Ok(None) => None, - }) - } - - pub fn names_iter(&mut self) -> impl StreamingIterator> { - self.streaming_iter().map(|item| match item { - Ok(row) => Ok(row.name), - Err(e) => Err(*e), - }) - } -} - -#[derive(Clone)] -pub struct ColumnInfo<'a> { - pub name: &'a str, - pub type_name: &'a str, } diff --git a/crates/core/src/bucket_priority.rs b/crates/core/src/sync/bucket_priority.rs similarity index 92% rename from crates/core/src/bucket_priority.rs rename to crates/core/src/sync/bucket_priority.rs index 454f1fe..bd685f7 100644 --- a/crates/core/src/bucket_priority.rs +++ b/crates/core/src/sync/bucket_priority.rs @@ -4,7 +4,7 @@ use sqlite_nostd::ResultCode; use crate::error::SQLiteError; #[repr(transparent)] -#[derive(Clone, Copy, PartialEq, Eq)] +#[derive(Clone, Copy, PartialEq, Eq, Debug)] pub struct BucketPriority { pub number: i32, } @@ -14,6 +14,8 @@ impl BucketPriority { self == BucketPriority::HIGHEST } + /// The priority to use when the sync service doesn't attach priorities in checkpoints. + pub const FALLBACK: BucketPriority = BucketPriority { number: 3 }; pub const HIGHEST: BucketPriority = BucketPriority { number: 0 }; /// A low priority used to represent fully-completed sync operations across all priorities. diff --git a/crates/core/src/sync/checkpoint.rs b/crates/core/src/sync/checkpoint.rs new file mode 100644 index 0000000..57c9b7c --- /dev/null +++ b/crates/core/src/sync/checkpoint.rs @@ -0,0 +1,91 @@ +use alloc::{string::String, vec::Vec}; +use num_traits::Zero; + +use crate::{ + error::SQLiteError, + sync::{line::BucketChecksum, BucketPriority, Checksum}, +}; +use sqlite_nostd::{self as sqlite, Connection, ResultCode}; + +/// A structure cloned from [BucketChecksum]s with an owned bucket name instead of one borrowed from +/// a sync line. +#[derive(Debug, Clone)] +pub struct OwnedBucketChecksum { + pub bucket: String, + pub checksum: Checksum, + pub priority: BucketPriority, + pub count: Option, +} + +impl OwnedBucketChecksum { + pub fn is_in_priority(&self, prio: Option) -> bool { + match prio { + None => true, + Some(prio) => self.priority >= prio, + } + } +} + +impl From<&'_ BucketChecksum<'_>> for OwnedBucketChecksum { + fn from(value: &'_ BucketChecksum<'_>) -> Self { + Self { + bucket: value.bucket.clone().into_owned(), + checksum: value.checksum, + priority: value.priority.unwrap_or(BucketPriority::FALLBACK), + count: value.count, + } + } +} + +pub struct ChecksumMismatch { + pub bucket_name: String, + pub expected_checksum: Checksum, + pub actual_op_checksum: Checksum, + pub actual_add_checksum: Checksum, +} + +pub fn validate_checkpoint<'a>( + buckets: impl Iterator, + priority: Option, + db: *mut sqlite::sqlite3, +) -> Result, SQLiteError> { + // language=SQLite + let statement = db.prepare_v2( + " +SELECT + ps_buckets.add_checksum as add_checksum, + ps_buckets.op_checksum as oplog_checksum +FROM ps_buckets WHERE name = ?;", + )?; + + let mut failures: Vec = Vec::new(); + for bucket in buckets { + if bucket.is_in_priority(priority) { + statement.bind_text(1, &bucket.bucket, sqlite_nostd::Destructor::STATIC)?; + + let (add_checksum, oplog_checksum) = match statement.step()? { + ResultCode::ROW => { + let add_checksum = Checksum::from_i32(statement.column_int(0)); + let oplog_checksum = Checksum::from_i32(statement.column_int(1)); + (add_checksum, oplog_checksum) + } + _ => (Checksum::zero(), Checksum::zero()), + }; + + let actual = add_checksum + oplog_checksum; + + if actual != bucket.checksum { + failures.push(ChecksumMismatch { + bucket_name: bucket.bucket.clone(), + expected_checksum: bucket.checksum, + actual_add_checksum: add_checksum, + actual_op_checksum: oplog_checksum, + }); + } + + statement.reset()?; + } + } + + Ok(failures) +} diff --git a/crates/core/src/sync/checksum.rs b/crates/core/src/sync/checksum.rs new file mode 100644 index 0000000..c6f2bc6 --- /dev/null +++ b/crates/core/src/sync/checksum.rs @@ -0,0 +1,214 @@ +use core::{ + fmt::Display, + num::Wrapping, + ops::{Add, AddAssign, Sub, SubAssign}, +}; + +use num_traits::float::FloatCore; +use num_traits::Zero; +use serde::{de::Visitor, Deserialize, Serialize}; + +/// A checksum as received from the sync service. +/// +/// Conceptually, we use unsigned 32 bit integers to represent checksums, and adding checksums +/// should be a wrapping add. +#[repr(transparent)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)] +pub struct Checksum(Wrapping); + +impl Checksum { + pub const fn value(self) -> u32 { + self.0 .0 + } + + pub const fn from_value(value: u32) -> Self { + Self(Wrapping(value)) + } + + pub const fn from_i32(value: i32) -> Self { + Self::from_value(value as u32) + } + + pub const fn bitcast_i32(self) -> i32 { + self.value() as i32 + } +} + +impl Zero for Checksum { + fn zero() -> Self { + const { Self::from_value(0) } + } + + fn is_zero(&self) -> bool { + self.value() == 0 + } +} + +impl Add for Checksum { + type Output = Self; + + #[inline] + fn add(self, rhs: Self) -> Self::Output { + Self(self.0 + rhs.0) + } +} + +impl AddAssign for Checksum { + #[inline] + fn add_assign(&mut self, rhs: Self) { + self.0 += rhs.0 + } +} + +impl Sub for Checksum { + type Output = Self; + + fn sub(self, rhs: Self) -> Self::Output { + Self(self.0 - rhs.0) + } +} + +impl SubAssign for Checksum { + fn sub_assign(&mut self, rhs: Self) { + self.0 -= rhs.0; + } +} + +impl From for Checksum { + fn from(value: u32) -> Self { + Self::from_value(value) + } +} + +impl Display for Checksum { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "{:#010x}", self.value()) + } +} + +impl<'de> Deserialize<'de> for Checksum { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct MyVisitor; + + impl<'de> Visitor<'de> for MyVisitor { + type Value = Checksum; + + fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result { + write!(formatter, "a number to interpret as a checksum") + } + + fn visit_u32(self, v: u32) -> Result + where + E: serde::de::Error, + { + Ok(v.into()) + } + + fn visit_u64(self, v: u64) -> Result + where + E: serde::de::Error, + { + let as_u32: u32 = v.try_into().map_err(|_| { + E::invalid_value(serde::de::Unexpected::Unsigned(v), &"a 32-bit int") + })?; + Ok(as_u32.into()) + } + + fn visit_i32(self, v: i32) -> Result + where + E: serde::de::Error, + { + Ok(Checksum::from_i32(v)) + } + + fn visit_i64(self, v: i64) -> Result + where + E: serde::de::Error, + { + // This is supposed to be an u32, but it could also be a i32 that we need to + // normalize. + let min: i64 = u32::MIN.into(); + let max: i64 = u32::MAX.into(); + + if v >= min && v <= max { + return Ok(Checksum::from(v as u32)); + } + + let as_i32: i32 = v.try_into().map_err(|_| { + E::invalid_value(serde::de::Unexpected::Signed(v), &"a 32-bit int") + })?; + Ok(Checksum::from_i32(as_i32)) + } + + fn visit_f64(self, v: f64) -> Result + where + E: serde::de::Error, + { + if !v.is_finite() || f64::trunc(v) != v { + return Err(E::invalid_value( + serde::de::Unexpected::Float(v), + &"a whole number", + )); + } + + self.visit_i64(v as i64) + } + } + + deserializer.deserialize_u32(MyVisitor) + } +} + +#[cfg(test)] +mod test { + use num_traits::Zero; + + use super::Checksum; + + #[test] + pub fn test_binary_representation() { + assert_eq!(Checksum::from_i32(-1).value(), u32::MAX); + assert_eq!(Checksum::from(u32::MAX).value(), u32::MAX); + assert_eq!(Checksum::from(u32::MAX).bitcast_i32(), -1); + } + + fn deserialize(from: &str) -> Checksum { + serde_json::from_str(from).expect("should deserialize") + } + + #[test] + pub fn test_deserialize() { + assert_eq!(deserialize("0").value(), 0); + assert_eq!(deserialize("-1").value(), u32::MAX); + assert_eq!(deserialize("-1.0").value(), u32::MAX); + + assert_eq!(deserialize("3573495687").value(), 3573495687); + assert_eq!(deserialize("3573495687.0").value(), 3573495687); + assert_eq!(deserialize("-721471609.0").value(), 3573495687); + } + + #[test] + pub fn test_arithmetic() { + assert_eq!(Checksum::from(3) + Checksum::from(7), Checksum::from(10)); + + // Checksums should always wrap around + assert_eq!( + Checksum::from(0xFFFFFFFF) + Checksum::from(1), + Checksum::zero() + ); + assert_eq!( + Checksum::zero() - Checksum::from(1), + Checksum::from(0xFFFFFFFF) + ); + + let mut cs = Checksum::from(0x8FFFFFFF); + cs += Checksum::from(0x80000000); + assert_eq!(cs, Checksum::from(0x0FFFFFFF)); + + cs -= Checksum::from(0x80000001); + assert_eq!(cs, Checksum::from(0x8FFFFFFE)); + } +} diff --git a/crates/core/src/sync/line.rs b/crates/core/src/sync/line.rs new file mode 100644 index 0000000..771b6b9 --- /dev/null +++ b/crates/core/src/sync/line.rs @@ -0,0 +1,97 @@ +use alloc::borrow::Cow; +use alloc::vec::Vec; +use serde::Deserialize; + +use super::BucketPriority; +use super::Checksum; + +use crate::util::{deserialize_optional_string_to_i64, deserialize_string_to_i64}; + +/// While we would like to always borrow strings for efficiency, that's not consistently possible. +/// With the JSON decoder, borrowing from input data is only possible when the string contains no +/// escape sequences (otherwise, the string is not a direct view of input data and we need an +/// internal copy). +type SyncLineStr<'a> = Cow<'a, str>; + +#[derive(Deserialize, Debug)] +pub struct Checkpoint<'a> { + #[serde(deserialize_with = "deserialize_string_to_i64")] + pub last_op_id: i64, + #[serde(default)] + #[serde(deserialize_with = "deserialize_optional_string_to_i64")] + pub write_checkpoint: Option, + #[serde(borrow)] + pub buckets: Vec>, +} + +#[derive(Deserialize, Debug)] +pub struct BucketChecksum<'a> { + #[serde(borrow)] + pub bucket: SyncLineStr<'a>, + pub checksum: Checksum, + #[serde(default)] + pub priority: Option, + #[serde(default)] + pub count: Option, + // #[serde(default)] + // #[serde(deserialize_with = "deserialize_optional_string_to_i64")] + // pub last_op_id: Option, +} + +#[derive(Deserialize, Debug)] +pub struct DataLine<'a> { + #[serde(borrow)] + pub bucket: SyncLineStr<'a>, + pub data: Vec>, + // #[serde(default)] + // pub has_more: bool, + // #[serde(default, borrow)] + // pub after: Option>, + // #[serde(default, borrow)] + // pub next_after: Option>, +} + +#[derive(Deserialize, Debug)] +pub struct OplogEntry<'a> { + pub checksum: Checksum, + #[serde(deserialize_with = "deserialize_string_to_i64")] + pub op_id: i64, + pub op: OpType, + #[serde(default, borrow)] + pub object_id: Option>, + #[serde(default, borrow)] + pub object_type: Option>, + #[serde(default, borrow)] + pub subkey: Option>, + #[serde(default, borrow)] + pub data: Option>, +} + +#[derive(Debug)] +pub enum OplogData<'a> { + /// A string encoding a well-formed JSON object representing values of the row. + Json { data: Cow<'a, str> }, + // BsonDocument { data: Cow<'a, [u8]> }, +} + +#[derive(Deserialize, Debug, Clone, Copy, PartialEq, Eq)] +pub enum OpType { + CLEAR, + MOVE, + PUT, + REMOVE, +} + +impl<'a, 'de: 'a> Deserialize<'de> for OplogData<'a> { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + // For now, we will always get oplog data as a string. In the future, there may be the + // option of the sync service sending BSON-encoded data lines too, but that's not relevant + // for now. + return Ok(OplogData::Json { + data: Deserialize::deserialize(deserializer)?, + }); + } +} diff --git a/crates/core/src/sync/mod.rs b/crates/core/src/sync/mod.rs new file mode 100644 index 0000000..afe3393 --- /dev/null +++ b/crates/core/src/sync/mod.rs @@ -0,0 +1,8 @@ +mod bucket_priority; +pub mod checkpoint; +mod checksum; +pub mod line; +pub mod operations; + +pub use bucket_priority::BucketPriority; +pub use checksum::Checksum; diff --git a/crates/core/src/sync/operations.rs b/crates/core/src/sync/operations.rs new file mode 100644 index 0000000..7e7499a --- /dev/null +++ b/crates/core/src/sync/operations.rs @@ -0,0 +1,218 @@ +use alloc::format; +use alloc::string::String; +use num_traits::Zero; +use sqlite_nostd::Connection; +use sqlite_nostd::{self as sqlite, ResultCode}; + +use crate::{ + error::{PSResult, SQLiteError}, + ext::SafeManagedStmt, +}; + +use super::line::OplogData; +use super::line::{DataLine, OpType}; +use super::Checksum; + +pub fn insert_bucket_operations( + db: *mut sqlite::sqlite3, + data: &DataLine, +) -> Result<(), SQLiteError> { + // We do an ON CONFLICT UPDATE simply so that the RETURNING bit works for existing rows. + // We can consider splitting this into separate SELECT and INSERT statements. + // language=SQLite + let bucket_statement = db.prepare_v2( + "INSERT INTO ps_buckets(name) + VALUES(?) + ON CONFLICT DO UPDATE + SET last_applied_op = last_applied_op + RETURNING id, last_applied_op", + )?; + bucket_statement.bind_text(1, &data.bucket, sqlite::Destructor::STATIC)?; + bucket_statement.step()?; + + let bucket_id = bucket_statement.column_int64(0); + + // This is an optimization for initial sync - we can avoid persisting individual REMOVE + // operations when last_applied_op = 0. + // We do still need to do the "supersede_statement" step for this case, since a REMOVE + // operation can supersede another PUT operation we're syncing at the same time. + let mut is_empty = bucket_statement.column_int64(1) == 0; + + // Statement to supersede (replace) operations with the same key. + // language=SQLite + let supersede_statement = db.prepare_v2( + "\ +DELETE FROM ps_oplog + WHERE unlikely(ps_oplog.bucket = ?1) + AND ps_oplog.key = ?2 +RETURNING op_id, hash", + )?; + supersede_statement.bind_int64(1, bucket_id)?; + + // language=SQLite + let insert_statement = db.prepare_v2("\ +INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, ?, ?)")?; + insert_statement.bind_int64(1, bucket_id)?; + + let updated_row_statement = db.prepare_v2( + "\ +INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)", + )?; + + bucket_statement.reset()?; + + let mut last_op: Option = None; + let mut add_checksum = Checksum::zero(); + let mut op_checksum = Checksum::zero(); + let mut added_ops: i32 = 0; + + for line in &data.data { + let op_id = line.op_id; + let op = line.op; + let object_type = line.object_type.as_ref(); + let object_id = line.object_id.as_ref(); + let checksum = line.checksum; + let op_data = line.data.as_ref(); + + last_op = Some(op_id); + added_ops += 1; + + if op == OpType::PUT || op == OpType::REMOVE { + let key: String; + if let (Some(object_type), Some(object_id)) = (object_type, object_id) { + let subkey = line.subkey.as_ref().map(|i| &**i).unwrap_or("null"); + key = format!("{}/{}/{}", &object_type, &object_id, subkey); + } else { + key = String::from(""); + } + + supersede_statement.bind_text(2, &key, sqlite::Destructor::STATIC)?; + + let mut superseded = false; + + while supersede_statement.step()? == ResultCode::ROW { + // Superseded (deleted) a previous operation, add the checksum + let supersede_checksum = Checksum::from_i32(supersede_statement.column_int(1)); + add_checksum += supersede_checksum; + op_checksum -= supersede_checksum; + + // Superseded an operation, only skip if the bucket was empty + // Previously this checked "superseded_op <= last_applied_op". + // However, that would not account for a case where a previous + // PUT operation superseded the original PUT operation in this + // same batch, in which case superseded_op is not accurate for this. + if !is_empty { + superseded = true; + } + } + supersede_statement.reset()?; + + if op == OpType::REMOVE { + let should_skip_remove = !superseded; + + add_checksum += checksum; + + if !should_skip_remove { + if let (Some(object_type), Some(object_id)) = (object_type, object_id) { + updated_row_statement.bind_text( + 1, + object_type, + sqlite::Destructor::STATIC, + )?; + updated_row_statement.bind_text( + 2, + object_id, + sqlite::Destructor::STATIC, + )?; + updated_row_statement.exec()?; + } + } + + continue; + } + + insert_statement.bind_int64(2, op_id)?; + if key != "" { + insert_statement.bind_text(3, &key, sqlite::Destructor::STATIC)?; + } else { + insert_statement.bind_null(3)?; + } + + if let (Some(object_type), Some(object_id)) = (object_type, object_id) { + insert_statement.bind_text(4, object_type, sqlite::Destructor::STATIC)?; + insert_statement.bind_text(5, object_id, sqlite::Destructor::STATIC)?; + } else { + insert_statement.bind_null(4)?; + insert_statement.bind_null(5)?; + } + if let Some(data) = op_data { + let OplogData::Json { data } = data; + + insert_statement.bind_text(6, data, sqlite::Destructor::STATIC)?; + } else { + insert_statement.bind_null(6)?; + } + + insert_statement.bind_int(7, checksum.bitcast_i32())?; + insert_statement.exec()?; + + op_checksum += checksum; + } else if op == OpType::MOVE { + add_checksum += checksum; + } else if op == OpType::CLEAR { + // Any remaining PUT operations should get an implicit REMOVE + // language=SQLite + let clear_statement1 = db + .prepare_v2( + "INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) +SELECT row_type, row_id +FROM ps_oplog +WHERE bucket = ?1", + ) + .into_db_result(db)?; + clear_statement1.bind_int64(1, bucket_id)?; + clear_statement1.exec()?; + + let clear_statement2 = db + .prepare_v2("DELETE FROM ps_oplog WHERE bucket = ?1") + .into_db_result(db)?; + clear_statement2.bind_int64(1, bucket_id)?; + clear_statement2.exec()?; + + // And we need to re-apply all of those. + // We also replace the checksum with the checksum of the CLEAR op. + // language=SQLite + let clear_statement2 = db.prepare_v2( + "UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE id = ?2", + )?; + clear_statement2.bind_int64(2, bucket_id)?; + clear_statement2.bind_int(1, checksum.bitcast_i32())?; + clear_statement2.exec()?; + + add_checksum = Checksum::zero(); + is_empty = true; + op_checksum = Checksum::zero(); + } + } + + if let Some(last_op) = &last_op { + // language=SQLite + let statement = db.prepare_v2( + "UPDATE ps_buckets + SET last_op = ?2, + add_checksum = (add_checksum + ?3) & 0xffffffff, + op_checksum = (op_checksum + ?4) & 0xffffffff, + count_since_last = count_since_last + ?5 + WHERE id = ?1", + )?; + statement.bind_int64(1, bucket_id)?; + statement.bind_int64(2, *last_op)?; + statement.bind_int(3, add_checksum.bitcast_i32())?; + statement.bind_int(4, op_checksum.bitcast_i32())?; + statement.bind_int(5, added_ops)?; + + statement.exec()?; + } + + Ok(()) +} diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index 40ddcd5..225796b 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -4,8 +4,8 @@ use alloc::string::String; use alloc::vec::Vec; use serde::Deserialize; -use crate::bucket_priority::BucketPriority; use crate::error::{PSResult, SQLiteError}; +use crate::sync::BucketPriority; use sqlite_nostd::{self as sqlite, Destructor, ManagedStmt, Value}; use sqlite_nostd::{ColumnType, Connection, ResultCode}; @@ -103,11 +103,20 @@ impl<'a> SyncOperation<'a> { self.collect_tables()?; let statement = self.collect_full_operations()?; - // TODO: cache statements + + // We cache the last insert and delete statements for each row + let mut last_insert_table: Option = None; + let mut last_insert_statement: Option = None; + + let mut last_delete_table: Option = None; + let mut last_delete_statement: Option = None; + + let mut untyped_delete_statement: Option = None; + let mut untyped_insert_statement: Option = None; + while statement.step().into_db_result(self.db)? == ResultCode::ROW { let type_name = statement.column_text(0)?; let id = statement.column_text(1)?; - let buckets = statement.column_int(3); let data = statement.column_text(2); let table_name = internal_table_name(type_name); @@ -115,42 +124,74 @@ impl<'a> SyncOperation<'a> { if self.data_tables.contains(&table_name) { let quoted = quote_internal_name(type_name, false); - if buckets == 0 { + // is_err() is essentially a NULL check here. + // NULL data means no PUT operations found, so we delete the row. + if data.is_err() { // DELETE - let delete_statement = self - .db - .prepare_v2(&format!("DELETE FROM {} WHERE id = ?", quoted)) - .into_db_result(self.db)?; + if last_delete_table.as_deref() != Some("ed) { + // Prepare statement when the table changed + last_delete_statement = Some( + self.db + .prepare_v2(&format!("DELETE FROM {} WHERE id = ?", quoted)) + .into_db_result(self.db)?, + ); + last_delete_table = Some(quoted.clone()); + } + let delete_statement = last_delete_statement.as_mut().unwrap(); + + delete_statement.reset()?; delete_statement.bind_text(1, id, sqlite::Destructor::STATIC)?; delete_statement.exec()?; } else { // INSERT/UPDATE - let insert_statement = self - .db - .prepare_v2(&format!("REPLACE INTO {}(id, data) VALUES(?, ?)", quoted)) - .into_db_result(self.db)?; + if last_insert_table.as_deref() != Some("ed) { + // Prepare statement when the table changed + last_insert_statement = Some( + self.db + .prepare_v2(&format!( + "REPLACE INTO {}(id, data) VALUES(?, ?)", + quoted + )) + .into_db_result(self.db)?, + ); + last_insert_table = Some(quoted.clone()); + } + let insert_statement = last_insert_statement.as_mut().unwrap(); + insert_statement.reset()?; insert_statement.bind_text(1, id, sqlite::Destructor::STATIC)?; insert_statement.bind_text(2, data?, sqlite::Destructor::STATIC)?; insert_statement.exec()?; } } else { - if buckets == 0 { + if data.is_err() { // DELETE - // language=SQLite - let delete_statement = self - .db - .prepare_v2("DELETE FROM ps_untyped WHERE type = ? AND id = ?") - .into_db_result(self.db)?; + if untyped_delete_statement.is_none() { + // Prepare statement on first use + untyped_delete_statement = Some( + self.db + .prepare_v2("DELETE FROM ps_untyped WHERE type = ? AND id = ?") + .into_db_result(self.db)?, + ); + } + let delete_statement = untyped_delete_statement.as_mut().unwrap(); + delete_statement.reset()?; delete_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?; delete_statement.bind_text(2, id, sqlite::Destructor::STATIC)?; delete_statement.exec()?; } else { // INSERT/UPDATE - // language=SQLite - let insert_statement = self - .db - .prepare_v2("REPLACE INTO ps_untyped(type, id, data) VALUES(?, ?, ?)") - .into_db_result(self.db)?; + if untyped_insert_statement.is_none() { + // Prepare statement on first use + untyped_insert_statement = Some( + self.db + .prepare_v2( + "REPLACE INTO ps_untyped(type, id, data) VALUES(?, ?, ?)", + ) + .into_db_result(self.db)?, + ); + } + let insert_statement = untyped_insert_statement.as_mut().unwrap(); + insert_statement.reset()?; insert_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?; insert_statement.bind_text(2, id, sqlite::Destructor::STATIC)?; insert_statement.bind_text(3, data?, sqlite::Destructor::STATIC)?; @@ -185,32 +226,29 @@ impl<'a> SyncOperation<'a> { Ok(match &self.partial { None => { // Complete sync + // See dart/test/sync_local_performance_test.dart for an annotated version of this query. self.db .prepare_v2( "\ --- 1. Filter oplog by the ops added but not applied yet (oplog b). --- SELECT DISTINCT / UNION is important for cases with many duplicate ids. WITH updated_rows AS ( - SELECT DISTINCT b.row_type, b.row_id FROM ps_buckets AS buckets - CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id - AND (b.op_id > buckets.last_applied_op) - UNION SELECT row_type, row_id FROM ps_updated_rows + SELECT b.row_type, b.row_id FROM ps_buckets AS buckets + CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id + AND (b.op_id > buckets.last_applied_op) + UNION ALL SELECT row_type, row_id FROM ps_updated_rows ) --- 3. Group the objects from different buckets together into a single one (ops). -SELECT b.row_type as type, - b.row_id as id, - r.data as data, - count(r.bucket) as buckets, - /* max() affects which row is used for 'data' */ - max(r.op_id) as op_id --- 2. Find *all* current ops over different buckets for those objects (oplog r). -FROM updated_rows b - LEFT OUTER JOIN ps_oplog AS r - ON r.row_type = b.row_type - AND r.row_id = b.row_id --- Group for (3) -GROUP BY b.row_type, b.row_id", +SELECT + b.row_type, + b.row_id, + ( + SELECT iif(max(r.op_id), r.data, null) + FROM ps_oplog r + WHERE r.row_type = b.row_type + AND r.row_id = b.row_id + + ) as data + FROM updated_rows b + GROUP BY b.row_type, b.row_id;", ) .into_db_result(self.db)? } @@ -220,33 +258,38 @@ GROUP BY b.row_type, b.row_id", .prepare_v2( "\ -- 1. Filter oplog by the ops added but not applied yet (oplog b). --- SELECT DISTINCT / UNION is important for cases with many duplicate ids. +-- We do not do any DISTINCT operation here, since that introduces a temp b-tree. +-- We filter out duplicates using the GROUP BY below. WITH involved_buckets (id) AS MATERIALIZED ( SELECT id FROM ps_buckets WHERE ?1 IS NULL OR name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets'))) ), updated_rows AS ( - SELECT DISTINCT FALSE as local, b.row_type, b.row_id FROM ps_buckets AS buckets - CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id AND (b.op_id > buckets.last_applied_op) - WHERE buckets.id IN (SELECT id FROM involved_buckets) + SELECT b.row_type, b.row_id FROM ps_buckets AS buckets + CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id + AND (b.op_id > buckets.last_applied_op) + WHERE buckets.id IN (SELECT id FROM involved_buckets) ) --- 3. Group the objects from different buckets together into a single one (ops). -SELECT b.row_type as type, - b.row_id as id, - r.data as data, - count(r.bucket) as buckets, - /* max() affects which row is used for 'data' */ - max(r.op_id) as op_id -- 2. Find *all* current ops over different buckets for those objects (oplog r). -FROM updated_rows b - LEFT OUTER JOIN ps_oplog AS r - ON r.row_type = b.row_type - AND r.row_id = b.row_id - AND r.bucket IN (SELECT id FROM involved_buckets) --- Group for (3) -GROUP BY b.row_type, b.row_id", +SELECT + b.row_type, + b.row_id, + ( + -- 3. For each unique row, select the data from the latest oplog entry. + -- The max(r.op_id) clause is used to select the latest oplog entry. + -- The iif is to avoid the max(r.op_id) column ending up in the results. + SELECT iif(max(r.op_id), r.data, null) + FROM ps_oplog r + WHERE r.row_type = b.row_type + AND r.row_id = b.row_id + AND r.bucket IN (SELECT id FROM involved_buckets) + + ) as data + FROM updated_rows b + -- Group for (2) + GROUP BY b.row_type, b.row_id;", ) .into_db_result(self.db)?; stmt.bind_text(1, partial.args, Destructor::STATIC)?; diff --git a/crates/core/src/sync_types.rs b/crates/core/src/sync_types.rs deleted file mode 100644 index 060dd25..0000000 --- a/crates/core/src/sync_types.rs +++ /dev/null @@ -1,22 +0,0 @@ -use alloc::string::String; -use alloc::vec::Vec; -use serde::{Deserialize, Serialize}; - -use crate::util::{deserialize_optional_string_to_i64, deserialize_string_to_i64}; - -#[derive(Serialize, Deserialize, Debug)] -pub struct Checkpoint { - #[serde(deserialize_with = "deserialize_string_to_i64")] - pub last_op_id: i64, - #[serde(default)] - #[serde(deserialize_with = "deserialize_optional_string_to_i64")] - pub write_checkpoint: Option, - pub buckets: Vec, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct BucketChecksum { - pub bucket: String, - pub checksum: i32, - pub priority: Option, -} diff --git a/crates/core/src/util.rs b/crates/core/src/util.rs index 2e50951..a9e0842 100644 --- a/crates/core/src/util.rs +++ b/crates/core/src/util.rs @@ -3,11 +3,9 @@ extern crate alloc; use alloc::format; use alloc::string::String; -use serde::Deserialize; -use serde_json as json; - #[cfg(not(feature = "getrandom"))] use crate::sqlite; +use serde::de::Visitor; use uuid::Uuid; @@ -46,25 +44,56 @@ pub fn deserialize_string_to_i64<'de, D>(deserializer: D) -> Result, { - let value = json::Value::deserialize(deserializer)?; + struct ValueVisitor; + + impl<'de> Visitor<'de> for ValueVisitor { + type Value = i64; + + fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result { + formatter.write_str("a string representation of a number") + } - match value { - json::Value::String(s) => s.parse::().map_err(serde::de::Error::custom), - _ => Err(serde::de::Error::custom("Expected a string.")), + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + v.parse::().map_err(serde::de::Error::custom) + } } + + // Using a custom visitor here to avoid an intermediate string allocation + deserializer.deserialize_str(ValueVisitor) } pub fn deserialize_optional_string_to_i64<'de, D>(deserializer: D) -> Result, D::Error> where D: serde::Deserializer<'de>, { - let value = json::Value::deserialize(deserializer)?; - - match value { - json::Value::Null => Ok(None), - json::Value::String(s) => s.parse::().map(Some).map_err(serde::de::Error::custom), - _ => Err(serde::de::Error::custom("Expected a string or null.")), + struct ValueVisitor; + + impl<'de> Visitor<'de> for ValueVisitor { + type Value = Option; + + fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result { + formatter.write_str("a string or null") + } + + fn visit_none(self) -> Result + where + E: serde::de::Error, + { + Ok(None) + } + + fn visit_some(self, deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + Ok(Some(deserialize_string_to_i64(deserializer)?)) + } } + + deserializer.deserialize_option(ValueVisitor) } // Use getrandom crate to generate UUID. diff --git a/crates/core/src/uuid.rs b/crates/core/src/uuid.rs index db617f9..82d9046 100644 --- a/crates/core/src/uuid.rs +++ b/crates/core/src/uuid.rs @@ -1,6 +1,5 @@ extern crate alloc; -use alloc::format; use alloc::string::String; use alloc::string::ToString; use core::ffi::c_int; diff --git a/crates/core/src/views.rs b/crates/core/src/views.rs index dea0b2b..03cbdd8 100644 --- a/crates/core/src/views.rs +++ b/crates/core/src/views.rs @@ -6,49 +6,41 @@ use alloc::string::String; use alloc::vec::Vec; use core::ffi::c_int; use core::fmt::Write; -use streaming_iterator::StreamingIterator; use sqlite::{Connection, Context, ResultCode, Value}; use sqlite_nostd::{self as sqlite}; use crate::create_sqlite_text_fn; use crate::error::SQLiteError; -use crate::schema::{ColumnInfo, ColumnNameAndTypeStatement, DiffIncludeOld, TableInfo}; +use crate::schema::{DiffIncludeOld, Table}; use crate::util::*; fn powersync_view_sql_impl( - ctx: *mut sqlite::context, + _ctx: *mut sqlite::context, args: &[*mut sqlite::value], ) -> Result { - let db = ctx.db_handle(); - let table = args[0].text(); - let table_info = TableInfo::parse_from(db, table)?; + let table_info = Table::from_json(args[0].text())?; let name = &table_info.name; - let view_name = &table_info.view_name; + let view_name = &table_info.view_name(); let local_only = table_info.flags.local_only(); let include_metadata = table_info.flags.include_metadata(); let quoted_name = quote_identifier(view_name); let internal_name = quote_internal_name(name, local_only); - let mut columns = ColumnNameAndTypeStatement::new(db, table)?; - let mut iter = columns.streaming_iter(); - let mut column_names_quoted: Vec = alloc::vec![]; let mut column_values: Vec = alloc::vec![]; column_names_quoted.push(quote_identifier("id")); column_values.push(String::from("id")); - while let Some(row) = iter.next() { - let ColumnInfo { name, type_name } = row.clone()?; - column_names_quoted.push(quote_identifier(name)); + for column in &table_info.columns { + column_names_quoted.push(quote_identifier(&column.name)); - let foo = format!( + column_values.push(format!( "CAST(json_extract(data, {:}) as {:})", - quote_json_path(name), - type_name - ); - column_values.push(foo); + quote_json_path(&column.name), + &column.type_name + )); } if include_metadata { @@ -77,14 +69,13 @@ create_sqlite_text_fn!( ); fn powersync_trigger_delete_sql_impl( - ctx: *mut sqlite::context, + _ctx: *mut sqlite::context, args: &[*mut sqlite::value], ) -> Result { - let table = args[0].text(); - let table_info = TableInfo::parse_from(ctx.db_handle(), table)?; + let table_info = Table::from_json(args[0].text())?; let name = &table_info.name; - let view_name = &table_info.view_name; + let view_name = &table_info.view_name(); let local_only = table_info.flags.local_only(); let insert_only = table_info.flags.insert_only(); @@ -93,23 +84,14 @@ fn powersync_trigger_delete_sql_impl( let trigger_name = quote_identifier_prefixed("ps_view_delete_", view_name); let type_string = quote_string(name); - let db = ctx.db_handle(); - let old_fragment: Cow<'static, str> = match table_info.diff_include_old { + let old_fragment: Cow<'static, str> = match &table_info.diff_include_old { Some(include_old) => { - let mut columns = ColumnNameAndTypeStatement::new(db, table)?; - let json = match include_old { DiffIncludeOld::OnlyForColumns { columns } => { - let mut iterator = columns.iter(); - let mut columns = - streaming_iterator::from_fn(|| -> Option> { - Some(Ok(iterator.next()?.as_str())) - }); - - json_object_fragment("OLD", &mut columns) + json_object_fragment("OLD", &mut columns.iter().map(|c| c.as_str())) } DiffIncludeOld::ForAllColumns => { - json_object_fragment("OLD", &mut columns.names_iter()) + json_object_fragment("OLD", &mut table_info.column_names()) } }?; @@ -179,15 +161,13 @@ create_sqlite_text_fn!( ); fn powersync_trigger_insert_sql_impl( - ctx: *mut sqlite::context, + _ctx: *mut sqlite::context, args: &[*mut sqlite::value], ) -> Result { - let table = args[0].text(); - - let table_info = TableInfo::parse_from(ctx.db_handle(), table)?; + let table_info = Table::from_json(args[0].text())?; let name = &table_info.name; - let view_name = &table_info.view_name; + let view_name = &table_info.view_name(); let local_only = table_info.flags.local_only(); let insert_only = table_info.flags.insert_only(); @@ -196,10 +176,7 @@ fn powersync_trigger_insert_sql_impl( let trigger_name = quote_identifier_prefixed("ps_view_insert_", view_name); let type_string = quote_string(name); - let local_db = ctx.db_handle(); - - let mut columns = ColumnNameAndTypeStatement::new(local_db, table)?; - let json_fragment = json_object_fragment("NEW", &mut columns.names_iter())?; + let json_fragment = json_object_fragment("NEW", &mut table_info.column_names())?; let metadata_fragment = if table_info.flags.include_metadata() { ", 'metadata', NEW._metadata" @@ -258,15 +235,13 @@ create_sqlite_text_fn!( ); fn powersync_trigger_update_sql_impl( - ctx: *mut sqlite::context, + _ctx: *mut sqlite::context, args: &[*mut sqlite::value], ) -> Result { - let table = args[0].text(); - - let table_info = TableInfo::parse_from(ctx.db_handle(), table)?; + let table_info = Table::from_json(args[0].text())?; let name = &table_info.name; - let view_name = &table_info.view_name; + let view_name = &table_info.view_name(); let insert_only = table_info.flags.insert_only(); let local_only = table_info.flags.local_only(); @@ -275,23 +250,16 @@ fn powersync_trigger_update_sql_impl( let trigger_name = quote_identifier_prefixed("ps_view_update_", view_name); let type_string = quote_string(name); - let db = ctx.db_handle(); - let mut columns = ColumnNameAndTypeStatement::new(db, table)?; - let json_fragment_new = json_object_fragment("NEW", &mut columns.names_iter())?; - let json_fragment_old = json_object_fragment("OLD", &mut columns.names_iter())?; + let json_fragment_new = json_object_fragment("NEW", &mut table_info.column_names())?; + let json_fragment_old = json_object_fragment("OLD", &mut table_info.column_names())?; let mut old_values_fragment = match &table_info.diff_include_old { None => None, Some(DiffIncludeOld::ForAllColumns) => Some(json_fragment_old.clone()), - Some(DiffIncludeOld::OnlyForColumns { columns }) => { - let mut iterator = columns.iter(); - let mut columns = - streaming_iterator::from_fn(|| -> Option> { - Some(Ok(iterator.next()?.as_str())) - }); - - Some(json_object_fragment("OLD", &mut columns)?) - } + Some(DiffIncludeOld::OnlyForColumns { columns }) => Some(json_object_fragment( + "OLD", + &mut columns.iter().map(|c| c.as_str()), + )?), }; if table_info.flags.include_old_only_when_changed() { @@ -301,15 +269,9 @@ fn powersync_trigger_update_sql_impl( let filtered_new_fragment = match &table_info.diff_include_old { // When include_old_only_when_changed is combined with a column filter, make sure we // only include the powersync_diff of columns matched by the filter. - Some(DiffIncludeOld::OnlyForColumns { columns }) => { - let mut iterator = columns.iter(); - let mut columns = - streaming_iterator::from_fn(|| -> Option> { - Some(Ok(iterator.next()?.as_str())) - }); - - Cow::Owned(json_object_fragment("NEW", &mut columns)?) - } + Some(DiffIncludeOld::OnlyForColumns { columns }) => Cow::Owned( + json_object_fragment("NEW", &mut columns.iter().map(|c| c.as_str()))?, + ), _ => Cow::Borrowed(json_fragment_new.as_str()), }; @@ -444,7 +406,7 @@ pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { /// Example output with prefix "NEW": "json_object('id', NEW.id, 'name', NEW.name, 'age', NEW.age)". fn json_object_fragment<'a>( prefix: &str, - name_results: &mut dyn StreamingIterator>, + name_results: &mut dyn Iterator, ) -> Result { // floor(SQLITE_MAX_FUNCTION_ARG / 2). // To keep databases portable, we use the default limit of 100 args for this, @@ -452,9 +414,7 @@ fn json_object_fragment<'a>( const MAX_ARG_COUNT: usize = 50; let mut column_names_quoted: Vec = alloc::vec![]; - while let Some(row) = name_results.next() { - let name = (*row)?; - + while let Some(name) = name_results.next() { let quoted: String = format!( "{:}, {:}.{:}", quote_string(name), diff --git a/crates/loadable/src/lib.rs b/crates/loadable/src/lib.rs index c6ca649..9e5a9de 100644 --- a/crates/loadable/src/lib.rs +++ b/crates/loadable/src/lib.rs @@ -3,7 +3,6 @@ #![feature(core_intrinsics)] #![allow(internal_features)] #![feature(lang_items)] -#![feature(error_in_core)] extern crate alloc; diff --git a/dart/pubspec.lock b/dart/pubspec.lock index 915c28e..2731635 100644 --- a/dart/pubspec.lock +++ b/dart/pubspec.lock @@ -293,10 +293,10 @@ packages: dependency: "direct main" description: name: sqlite3 - sha256: "310af39c40dd0bb2058538333c9d9840a2725ae0b9f77e4fd09ad6696aa8f66e" + sha256: c0503c69b44d5714e6abbf4c1f51a3c3cc42b75ce785f44404765e4635481d38 url: "https://pub.dev" source: hosted - version: "2.7.5" + version: "2.7.6" sqlite3_test: dependency: "direct dev" description: diff --git a/dart/pubspec.yaml b/dart/pubspec.yaml index 1867319..836e9ad 100644 --- a/dart/pubspec.yaml +++ b/dart/pubspec.yaml @@ -5,9 +5,9 @@ description: Tests for powersync-sqlite-core environment: sdk: ^3.4.0 dependencies: - sqlite3: ^2.4.5 + sqlite3: ^2.7.6 dev_dependencies: test: ^1.25.0 file: ^7.0.1 sqlite3_test: ^0.1.1 - fake_async: ^1.3.3 + fake_async: ^1.3.3 \ No newline at end of file diff --git a/dart/test/js_key_encoding_test.dart b/dart/test/js_key_encoding_test.dart new file mode 100644 index 0000000..dd86e06 --- /dev/null +++ b/dart/test/js_key_encoding_test.dart @@ -0,0 +1,79 @@ +import 'dart:convert'; + +import 'package:file/local.dart'; +import 'package:sqlite3/common.dart'; +import 'package:sqlite3/sqlite3.dart'; +import 'package:sqlite3_test/sqlite3_test.dart'; +import 'package:test/test.dart'; + +import 'utils/native_test_utils.dart'; + +void main() { + // Needs an unique name per test file to avoid concurrency issues + final vfs = TestSqliteFileSystem( + fs: const LocalFileSystem(), name: 'js-key-encoding-test-vfs'); + late CommonDatabase db; + + setUpAll(() { + loadExtension(); + sqlite3.registerVirtualFileSystem(vfs, makeDefault: false); + }); + tearDownAll(() => sqlite3.unregisterVirtualFileSystem(vfs)); + + setUp(() async { + db = openTestDatabase(vfs: vfs) + ..select('select powersync_init();') + ..select('select powersync_replace_schema(?)', [json.encode(_schema)]); + }); + + tearDown(() { + db.dispose(); + }); + + test('can fix JS key encoding', () { + db.execute('insert into powersync_operations (op, data) VALUES (?, ?);', [ + 'save', + json.encode({ + 'buckets': [ + { + 'bucket': 'a', + 'data': [ + { + 'op_id': '1', + 'op': 'PUT', + 'object_type': 'items', + 'object_id': '1', + 'subkey': json.encode('subkey'), + 'checksum': 0, + 'data': json.encode({'col': 'a'}), + } + ], + } + ], + }) + ]); + + db.execute('INSERT INTO powersync_operations(op, data) VALUES (?, ?)', + ['sync_local', null]); + var [row] = db.select('select * from ps_oplog'); + expect(row['key'], 'items/1/"subkey"'); + + // Apply migration + db.execute( + 'UPDATE ps_oplog SET key = powersync_remove_duplicate_key_encoding(key);'); + + [row] = db.select('select * from ps_oplog'); + expect(row['key'], 'items/1/subkey'); + }); +} + +const _schema = { + 'tables': [ + { + 'name': 'items', + 'columns': [ + {'name': 'col', 'type': 'text'} + ], + } + ] +}; diff --git a/dart/test/sync_local_performance_test.dart b/dart/test/sync_local_performance_test.dart new file mode 100644 index 0000000..9441138 --- /dev/null +++ b/dart/test/sync_local_performance_test.dart @@ -0,0 +1,327 @@ +import 'dart:convert'; + +import 'package:sqlite3/common.dart'; +import 'package:sqlite3/sqlite3.dart'; +import 'package:test/test.dart'; + +import 'utils/native_test_utils.dart'; +import 'utils/tracking_vfs.dart'; +import './schema_test.dart' show schema; + +// These test how many filesystem reads and writes are performed during sync_local. +// The real world performane of filesystem operations depend a lot on the specific system. +// For example, on native desktop systems, the performance of temporary filesystem storage could +// be close to memory performance. However, on web and mobile, (temporary) filesystem operations +// could drastically slow down performance. So rather than only testing the real time for these +// queries, we count the number of filesystem operations. +void testFilesystemOperations( + {bool unique = true, + int count = 200000, + int alreadyApplied = 10000, + int buckets = 10, + bool rawQueries = false}) { + late TrackingFileSystem vfs; + late CommonDatabase db; + final skip = rawQueries == false ? 'For manual query testing only' : null; + + setUpAll(() { + loadExtension(); + }); + + setUp(() async { + // Needs an unique name per test file to avoid concurrency issues + vfs = new TrackingFileSystem( + parent: new InMemoryFileSystem(), name: 'perf-test-vfs'); + sqlite3.registerVirtualFileSystem(vfs, makeDefault: false); + db = openTestDatabase(vfs: vfs, fileName: 'test.db'); + }); + + tearDown(() { + db.dispose(); + sqlite3.unregisterVirtualFileSystem(vfs); + }); + + setUp(() { + // Optional: set a custom cache size - it affects the number of filesystem operations. + // db.execute('PRAGMA cache_size=-50000'); + db.execute('SELECT powersync_replace_schema(?)', [json.encode(schema)]); + // Generate dummy data + // We can replace this with actual similated download operations later + db.execute(''' +BEGIN TRANSACTION; + +WITH RECURSIVE generate_rows(n) AS ( + SELECT 1 + UNION ALL + SELECT n + 1 FROM generate_rows WHERE n < $count +) +INSERT INTO ps_oplog (bucket, op_id, row_type, row_id, key, data, hash) +SELECT + (n % $buckets), -- Generate n different buckets + n, + 'assets', + ${unique ? 'uuid()' : "'duplicated_id'"}, + uuid(), + '{"description": "' || n || '", "make": "test", "model": "this is just filler data. this is just filler data. this is just filler data. this is just filler data. this is just filler data. this is just filler data. this is just filler data. "}', + (n * 17) % 1000000000 -- Some pseudo-random hash + +FROM generate_rows; + +WITH RECURSIVE generate_bucket_rows(n) AS ( + SELECT 1 + UNION ALL + SELECT n + 1 FROM generate_bucket_rows WHERE n < $buckets +) +INSERT INTO ps_buckets (id, name, last_applied_op) +SELECT + (n % $buckets), + 'bucket' || n, + $alreadyApplied -- simulate a percentage of operations previously applied + +FROM generate_bucket_rows; + +COMMIT; +'''); + // Enable this to see stats for initial data generation + // print('init stats: ${vfs.stats()}'); + + vfs.clearStats(); + }); + + test('sync_local (full)', () { + var timer = Stopwatch()..start(); + db.select('insert into powersync_operations(op, data) values(?, ?)', + ['sync_local', '']); + print('${timer.elapsed.inMilliseconds}ms ${vfs.stats()}'); + + // These are fairly generous limits, to catch significant regressions only. + expect(vfs.tempWrites, lessThan(count / 50)); + expect(timer.elapsed, + lessThan(Duration(milliseconds: 100 + (count / 50).round()))); + }); + + test('sync_local (partial)', () { + var timer = Stopwatch()..start(); + db.select('insert into powersync_operations(op, data) values(?, ?)', [ + 'sync_local', + jsonEncode({ + 'buckets': ['bucket0', 'bucket3', 'bucket4', 'bucket5', 'bucket6'], + 'priority': 2 + }) + ]); + print('${timer.elapsed.inMilliseconds}ms ${vfs.stats()}'); + expect(vfs.tempWrites, lessThan(count / 50)); + expect(timer.elapsed, + lessThan(Duration(milliseconds: 100 + (count / 50).round()))); + }); + + // The tests below are for comparing different queries, not run as part of the + // standard test suite. + + test('sync_local new query', () { + // This is the query we're using now. + // This query only uses a single TEMP B-TREE for the GROUP BY operation, + // leading to fairly efficient execution. + + // QUERY PLAN + // |--CO-ROUTINE updated_rows + // | `--COMPOUND QUERY + // | |--LEFT-MOST SUBQUERY + // | | |--SCAN buckets + // | | `--SEARCH b USING INDEX ps_oplog_opid (bucket=? AND op_id>?) + // | `--UNION ALL + // | `--SCAN ps_updated_rows + // |--SCAN b + // |--USE TEMP B-TREE FOR GROUP BY + // `--CORRELATED SCALAR SUBQUERY 3 + // `--SEARCH r USING INDEX ps_oplog_row (row_type=? AND row_id=?) + // + // For details on the max(r.op_id) clause, see: + // https://sqlite.org/lang_select.html#bare_columns_in_an_aggregate_query + // > If there is exactly one min() or max() aggregate in the query, then all bare columns in the result + // > set take values from an input row which also contains the minimum or maximum. + + var timer = Stopwatch()..start(); + final q = ''' +-- 1. Filter oplog by the ops added but not applied yet (oplog b). +-- We do not do any DISTINCT operation here, since that introduces a temp b-tree. +-- We filter out duplicates using the GROUP BY below. +WITH updated_rows AS ( + SELECT b.row_type, b.row_id FROM ps_buckets AS buckets + CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id + AND (b.op_id > buckets.last_applied_op) + UNION ALL SELECT row_type, row_id FROM ps_updated_rows +) + +-- 2. Find *all* current ops over different buckets for those objects (oplog r). +SELECT + b.row_type, + b.row_id, + ( + -- 3. For each unique row, select the data from the latest oplog entry. + -- The max(r.op_id) clause is used to select the latest oplog entry. + -- The iif is to avoid the max(r.op_id) column ending up in the results. + SELECT iif(max(r.op_id), r.data, null) + FROM ps_oplog r + WHERE r.row_type = b.row_type + AND r.row_id = b.row_id + + ) as data + FROM updated_rows b + -- Group for (2) + GROUP BY b.row_type, b.row_id; +'''; + db.select(q); + print('${timer.elapsed.inMilliseconds}ms ${vfs.stats()}'); + }, skip: skip); + + test('old query', () { + // This query used a TEMP B-TREE for the first part of finding unique updated rows, + // then another TEMP B-TREE for the second GROUP BY. This redundant B-TREE causes + // a lot of temporary storage overhead. + + // QUERY PLAN + // |--CO-ROUTINE updated_rows + // | `--COMPOUND QUERY + // | |--LEFT-MOST SUBQUERY + // | | |--SCAN buckets + // | | `--SEARCH b USING INDEX ps_oplog_opid (bucket=? AND op_id>?) + // | `--UNION USING TEMP B-TREE + // | `--SCAN ps_updated_rows + // |--SCAN b + // |--SEARCH r USING INDEX ps_oplog_row (row_type=? AND row_id=?) LEFT-JOIN + // `--USE TEMP B-TREE FOR GROUP BY + + var timer = Stopwatch()..start(); + final q = ''' +WITH updated_rows AS ( + SELECT DISTINCT b.row_type, b.row_id FROM ps_buckets AS buckets + CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id + AND (b.op_id > buckets.last_applied_op) + UNION SELECT row_type, row_id FROM ps_updated_rows +) +SELECT b.row_type as type, + b.row_id as id, + r.data as data, + count(r.bucket) as buckets, + max(r.op_id) as op_id +FROM updated_rows b + LEFT OUTER JOIN ps_oplog AS r + ON r.row_type = b.row_type + AND r.row_id = b.row_id +GROUP BY b.row_type, b.row_id; +'''; + db.select(q); + print('${timer.elapsed.inMilliseconds}ms ${vfs.stats()}'); + }, skip: skip); + + test('group_by query', () { + // This is similar to the new query, but uses a GROUP BY .. LIMIT 1 clause instead of the max(op_id) hack. + // It is similar in the number of filesystem operations, but slightly slower in real time. + + // QUERY PLAN + // |--CO-ROUTINE updated_rows + // | `--COMPOUND QUERY + // | |--LEFT-MOST SUBQUERY + // | | |--SCAN buckets + // | | `--SEARCH b USING INDEX ps_oplog_opid (bucket=? AND op_id>?) + // | `--UNION ALL + // | `--SCAN ps_updated_rows + // |--SCAN b + // |--USE TEMP B-TREE FOR GROUP BY + // `--CORRELATED SCALAR SUBQUERY 3 + // |--SEARCH r USING INDEX ps_oplog_row (row_type=? AND row_id=?) + // `--USE TEMP B-TREE FOR ORDER BY + + var timer = Stopwatch()..start(); + final q = ''' +WITH updated_rows AS ( + SELECT b.row_type, b.row_id FROM ps_buckets AS buckets + CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id + AND (b.op_id > buckets.last_applied_op) + UNION ALL SELECT row_type, row_id FROM ps_updated_rows +) + +SELECT + b.row_type, + b.row_id, + ( + SELECT r.data FROM ps_oplog r + WHERE r.row_type = b.row_type + AND r.row_id = b.row_id + ORDER BY r.op_id DESC + LIMIT 1 + + ) as data + FROM updated_rows b + GROUP BY b.row_type, b.row_id; +'''; + db.select(q); + print('${timer.elapsed.inMilliseconds}ms ${vfs.stats()}'); + }, skip: skip); + + test('full scan query', () { + // This is a nice alternative for initial sync or resyncing large amounts of data. + // This is very efficient for reading all data, but not for incremental updates. + + // QUERY PLAN + // |--SCAN r USING INDEX ps_oplog_row + // |--CORRELATED SCALAR SUBQUERY 1 + // | `--SEARCH ps_buckets USING INTEGER PRIMARY KEY (rowid=?) + // `--CORRELATED SCALAR SUBQUERY 1 + // `--SEARCH ps_buckets USING INTEGER PRIMARY KEY (rowid=?) + + var timer = Stopwatch()..start(); + final q = ''' +SELECT r.row_type as type, + r.row_id as id, + r.data as data, + max(r.op_id) as op_id, + sum((select 1 from ps_buckets where ps_buckets.id = r.bucket and r.op_id > ps_buckets.last_applied_op)) as buckets + +FROM ps_oplog r +GROUP BY r.row_type, r.row_id +HAVING buckets > 0; +'''; + db.select(q); + print('${timer.elapsed.inMilliseconds}ms ${vfs.stats()}'); + }, skip: skip); +} + +main() { + group('test filesystem operations with unique ids', () { + testFilesystemOperations( + unique: true, + count: 500000, + alreadyApplied: 10000, + buckets: 10, + rawQueries: false); + }); + group('test filesytem operations with duplicate ids', () { + // If this takes more than a couple of milliseconds to complete, there is a performance bug + testFilesystemOperations( + unique: false, + count: 500000, + alreadyApplied: 1000, + buckets: 10, + rawQueries: false); + }); + + group('test filesystem operations with a small number of changes', () { + testFilesystemOperations( + unique: true, + count: 100000, + alreadyApplied: 95000, + buckets: 10, + rawQueries: false); + }); + + group('test filesystem operations with a large number of buckets', () { + testFilesystemOperations( + unique: true, + count: 100000, + alreadyApplied: 10000, + buckets: 1000, + rawQueries: false); + }); +} diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index 39b2bb7..8eb832e 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -10,7 +10,9 @@ import 'package:test/test.dart'; import 'utils/native_test_utils.dart'; void main() { - final vfs = TestSqliteFileSystem(fs: const LocalFileSystem()); + // Needs an unique name per test file to avoid concurrency issues + final vfs = + TestSqliteFileSystem(fs: const LocalFileSystem(), name: 'sync-test-vfs'); setUpAll(() { loadExtension(); @@ -22,7 +24,7 @@ void main() { late CommonDatabase db; setUp(() async { - db = openTestDatabase(vfs) + db = openTestDatabase(vfs: vfs) ..select('select powersync_init();') ..select('select powersync_replace_schema(?)', [json.encode(_schema)]); }); @@ -50,7 +52,7 @@ void main() { 'object_type': 'items', 'object_id': rowId, 'checksum': 0, - 'data': data, + 'data': json.encode(data), } ], } diff --git a/dart/test/utils/native_test_utils.dart b/dart/test/utils/native_test_utils.dart index a6ec244..e65c753 100644 --- a/dart/test/utils/native_test_utils.dart +++ b/dart/test/utils/native_test_utils.dart @@ -26,13 +26,14 @@ void applyOpenOverride() { }); } -CommonDatabase openTestDatabase([VirtualFileSystem? vfs]) { +CommonDatabase openTestDatabase( + {VirtualFileSystem? vfs, String fileName = ':memory:'}) { applyOpenOverride(); if (!didLoadExtension) { loadExtension(); } - return sqlite3.open(':memory:', vfs: vfs?.name); + return sqlite3.open(fileName, vfs: vfs?.name); } void loadExtension() { diff --git a/dart/test/utils/tracking_vfs.dart b/dart/test/utils/tracking_vfs.dart new file mode 100644 index 0000000..86c2707 --- /dev/null +++ b/dart/test/utils/tracking_vfs.dart @@ -0,0 +1,118 @@ +import 'dart:typed_data'; + +import 'package:sqlite3/sqlite3.dart'; + +final class TrackingFileSystem extends BaseVirtualFileSystem { + BaseVirtualFileSystem parent; + int tempReads = 0; + int tempWrites = 0; + int dataReads = 0; + int dataWrites = 0; + + TrackingFileSystem({super.name = 'tracking', required this.parent}); + + @override + int xAccess(String path, int flags) { + return parent.xAccess(path, flags); + } + + @override + void xDelete(String path, int syncDir) { + parent.xDelete(path, syncDir); + } + + @override + String xFullPathName(String path) { + return parent.xFullPathName(path); + } + + @override + XOpenResult xOpen(Sqlite3Filename path, int flags) { + final result = parent.xOpen(path, flags); + return ( + outFlags: result.outFlags, + file: TrackingFile( + result.file, this, flags & SqlFlag.SQLITE_OPEN_DELETEONCLOSE != 0), + ); + } + + @override + void xSleep(Duration duration) {} + + String stats() { + return "Reads: $dataReads + $tempReads | Writes: $dataWrites + $tempWrites"; + } + + void clearStats() { + tempReads = 0; + tempWrites = 0; + dataReads = 0; + dataWrites = 0; + } +} + +class TrackingFile implements VirtualFileSystemFile { + final TrackingFileSystem vfs; + final VirtualFileSystemFile parentFile; + final bool deleteOnClose; + + TrackingFile(this.parentFile, this.vfs, this.deleteOnClose); + + @override + void xWrite(Uint8List buffer, int fileOffset) { + if (deleteOnClose) { + vfs.tempWrites++; + } else { + vfs.dataWrites++; + } + parentFile.xWrite(buffer, fileOffset); + } + + @override + void xRead(Uint8List buffer, int offset) { + if (deleteOnClose) { + vfs.tempReads++; + } else { + vfs.dataReads++; + } + parentFile.xRead(buffer, offset); + } + + @override + int xCheckReservedLock() { + return parentFile.xCheckReservedLock(); + } + + @override + void xClose() { + return parentFile.xClose(); + } + + @override + int xFileSize() { + return parentFile.xFileSize(); + } + + @override + void xLock(int mode) { + return parentFile.xLock(mode); + } + + @override + void xSync(int flags) { + return parentFile.xSync(flags); + } + + @override + void xTruncate(int size) { + return parentFile.xTruncate(size); + } + + @override + void xUnlock(int mode) { + return parentFile.xUnlock(mode); + } + + @override + int get xDeviceCharacteristics => parentFile.xDeviceCharacteristics; +} diff --git a/rust-toolchain.toml b/rust-toolchain.toml index e418b22..5d54722 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "nightly-2024-05-18" +channel = "nightly-2025-04-15" diff --git a/tool/build_wasm.sh b/tool/build_wasm.sh index da505e7..f40d8ed 100755 --- a/tool/build_wasm.sh +++ b/tool/build_wasm.sh @@ -1,5 +1,6 @@ #!/bin/bash set -e +emcc --version # Normal build # target/wasm32-unknown-emscripten/wasm/powersync.wasm @@ -31,13 +32,13 @@ cp "target/wasm32-unknown-emscripten/wasm_asyncify/powersync.wasm" "libpowersync # Static lib. # Works for both sync and asyncify builds. # Works for both emscripten and wasi. -# target/wasm32-wasi/wasm/libpowersync.a +# target/wasm32-wasip1/wasm/libpowersync.a cargo build \ -p powersync_loadable \ --profile wasm \ --no-default-features \ --features "powersync_core/static powersync_core/omit_load_extension sqlite_nostd/omit_load_extension" \ -Z build-std=panic_abort,core,alloc \ - --target wasm32-wasi + --target wasm32-wasip1 -cp "target/wasm32-wasi/wasm/libpowersync.a" "libpowersync-wasm.a" +cp "target/wasm32-wasip1/wasm/libpowersync.a" "libpowersync-wasm.a"