From 54b2cb84ef5d24b25e662f43bdcc57666f3d7582 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 29 Apr 2026 15:39:53 +0200 Subject: [PATCH] Remove helpers for legacy sync clients --- crates/core/src/checkpoint.rs | 71 ---- crates/core/src/lib.rs | 7 +- crates/core/src/operations.rs | 69 ---- .../{operations_vtab.rs => pre_close_vtab.rs} | 80 +---- crates/core/src/sync/interface.rs | 26 +- crates/core/src/sync/storage_adapter.rs | 44 ++- crates/core/src/sync_local.rs | 50 +-- crates/core/src/view_admin.rs | 5 + crates/sqlite_nostd/src/nostd.rs | 14 +- dart/test/js_key_encoding_test.dart | 44 +-- dart/test/legacy_sync_test.dart | 337 ------------------ dart/test/migration_test.dart | 20 +- dart/test/sync_local_performance_test.dart | 54 ++- dart/test/utils/fix_035_fixtures.dart | 4 +- 14 files changed, 158 insertions(+), 667 deletions(-) delete mode 100644 crates/core/src/checkpoint.rs delete mode 100644 crates/core/src/operations.rs rename crates/core/src/{operations_vtab.rs => pre_close_vtab.rs} (53%) delete mode 100644 dart/test/legacy_sync_test.dart diff --git a/crates/core/src/checkpoint.rs b/crates/core/src/checkpoint.rs deleted file mode 100644 index 30ce6ef3..00000000 --- a/crates/core/src/checkpoint.rs +++ /dev/null @@ -1,71 +0,0 @@ -extern crate alloc; - -use alloc::string::String; -use alloc::vec::Vec; -use core::ffi::c_int; - -use powersync_sqlite_nostd as sqlite; -use powersync_sqlite_nostd::{Connection, Context, Value}; -use serde::Serialize; -use serde_json as json; -use sqlite::ResultCode; - -use crate::create_sqlite_text_fn; -use crate::error::PowerSyncError; -use crate::sync::checkpoint::{OwnedBucketChecksum, validate_checkpoint}; -use crate::sync::line::Checkpoint; - -#[derive(Serialize)] -struct CheckpointResult { - valid: bool, - failed_buckets: Vec, -} - -fn powersync_validate_checkpoint_impl( - ctx: *mut sqlite::context, - args: &[*mut sqlite::value], -) -> Result { - let data = args[0].text(); - let checkpoint: Checkpoint = - serde_json::from_str(data).map_err(PowerSyncError::as_argument_error)?; - let db = ctx.db_handle(); - let buckets: Vec = checkpoint - .buckets - .iter() - .map(OwnedBucketChecksum::from) - .collect(); - - let failures = validate_checkpoint(buckets.iter(), None, db)?; - let mut failed_buckets = Vec::::with_capacity(failures.len()); - for failure in failures { - failed_buckets.push(failure.bucket_name); - } - - let result = CheckpointResult { - valid: failed_buckets.is_empty(), - failed_buckets: failed_buckets, - }; - - Ok(json::to_string(&result).map_err(PowerSyncError::internal)?) -} - -create_sqlite_text_fn!( - powersync_validate_checkpoint, - powersync_validate_checkpoint_impl, - "powersync_validate_checkpoint" -); - -pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { - db.create_function_v2( - "powersync_validate_checkpoint", - 1, - sqlite::UTF8 | sqlite::DETERMINISTIC, - None, - Some(powersync_validate_checkpoint), - None, - None, - None, - )?; - - Ok(()) -} diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 9b173974..2a5c3450 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -10,7 +10,6 @@ use sqlite::ResultCode; use crate::{error::PowerSyncError, state::DatabaseState}; mod bson; -mod checkpoint; mod constants; mod crud_vtab; mod diff; @@ -21,8 +20,7 @@ mod json_util; mod kv; mod macros; mod migrations; -mod operations; -mod operations_vtab; +mod pre_close_vtab; mod schema; mod state; mod sync; @@ -73,14 +71,13 @@ fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), PowerSyncError> { crate::fix_data::register(db)?; crate::json_util::register(db)?; crate::view_admin::register(db, state.clone())?; - crate::checkpoint::register(db)?; crate::kv::register(db)?; crate::state::register(db, state.clone())?; sync::register(db, state.clone())?; update_hooks::register(db, state.clone())?; crate::schema::register(db, state.clone())?; - crate::operations_vtab::register(db, state.clone())?; + crate::pre_close_vtab::register(db, state.clone())?; crate::crud_vtab::register(db, state)?; Ok(()) diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs deleted file mode 100644 index adbeef03..00000000 --- a/crates/core/src/operations.rs +++ /dev/null @@ -1,69 +0,0 @@ -use crate::error::PowerSyncError; -use crate::sync::line::DataLine; -use crate::sync::operations::insert_bucket_operations; -use crate::sync::storage_adapter::StorageAdapter; -use alloc::vec::Vec; -use powersync_sqlite_nostd as sqlite; -use powersync_sqlite_nostd::{Connection, ResultCode}; -use serde::Deserialize; - -use crate::ext::SafeManagedStmt; - -// Run inside a transaction -pub fn insert_operation(db: *mut sqlite::sqlite3, data: &str) -> Result<(), PowerSyncError> { - #[derive(Deserialize)] - struct BucketBatch<'a> { - #[serde(borrow)] - buckets: Vec>, - } - - let batch: BucketBatch = - serde_json::from_str(data).map_err(PowerSyncError::as_argument_error)?; - let adapter = StorageAdapter::new(db)?; - - for line in &batch.buckets { - insert_bucket_operations(&adapter, &line, 0)?; - } - - Ok(()) -} - -pub fn clear_remove_ops(_db: *mut sqlite::sqlite3, _data: &str) -> Result<(), ResultCode> { - // No-op - - Ok(()) -} - -pub fn delete_pending_buckets(_db: *mut sqlite::sqlite3, _data: &str) -> Result<(), ResultCode> { - // No-op - - Ok(()) -} - -pub fn delete_bucket(db: *mut sqlite::sqlite3, name: &str) -> Result<(), ResultCode> { - // language=SQLite - let statement = db.prepare_v2("DELETE FROM ps_buckets WHERE name = ?1 RETURNING id")?; - statement.bind_text(1, name, sqlite::Destructor::STATIC)?; - - if statement.step()? == ResultCode::ROW { - let bucket_id = statement.column_int64(0); - - // language=SQLite - let updated_statement = db.prepare_v2( - "\ -INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) -SELECT row_type, row_id -FROM ps_oplog -WHERE bucket = ?1", - )?; - updated_statement.bind_int64(1, bucket_id)?; - updated_statement.exec()?; - - // language=SQLite - let delete_statement = db.prepare_v2("DELETE FROM ps_oplog WHERE bucket=?1")?; - delete_statement.bind_int64(1, bucket_id)?; - delete_statement.exec()?; - } - - Ok(()) -} diff --git a/crates/core/src/operations_vtab.rs b/crates/core/src/pre_close_vtab.rs similarity index 53% rename from crates/core/src/operations_vtab.rs rename to crates/core/src/pre_close_vtab.rs index 9c0e42f6..75def692 100644 --- a/crates/core/src/operations_vtab.rs +++ b/crates/core/src/pre_close_vtab.rs @@ -5,23 +5,21 @@ use alloc::rc::Rc; use core::ffi::{c_char, c_int, c_void}; use powersync_sqlite_nostd as sqlite; -use sqlite::{Connection, ResultCode, Value}; +use sqlite::{Connection, ResultCode}; -use crate::operations::{ - clear_remove_ops, delete_bucket, delete_pending_buckets, insert_operation, -}; use crate::state::DatabaseState; -use crate::sync_local::sync_local; use crate::vtab_util::*; +/// A virtual table hack to implement a "pre-close hook" for databases. +/// +/// When `sqlite3_close` is called, SQLite invokes the disconnect callback of virtual tables +/// attached to the connection. This gives us an opportunity to call +/// [DatabaseState::release_resources], allowing us to close active sync clients and associated +/// prepared statements. Without this, our internal statements might lease resources. #[repr(C)] struct VirtualTable { base: sqlite::vtab, - db: *mut sqlite::sqlite3, state: Rc, - - target_applied: bool, - target_validated: bool, } extern "C" fn connect( @@ -32,9 +30,7 @@ extern "C" fn connect( vtab: *mut *mut sqlite::vtab, _err: *mut *mut c_char, ) -> c_int { - if let Err(rc) = - sqlite::declare_vtab(db, "CREATE TABLE powersync_operations(op TEXT, data TEXT);") - { + if let Err(rc) = sqlite::declare_vtab(db, "CREATE TABLE powersync_internal_close(_ TEXT);") { return rc as c_int; } @@ -45,10 +41,7 @@ extern "C" fn connect( pModule: core::ptr::null(), zErrMsg: core::ptr::null_mut(), }, - db, state: DatabaseState::clone_from(aux), - target_validated: false, - target_applied: false, })); *vtab = tab.cast::(); let _ = sqlite::vtab_config(db, 0); @@ -69,57 +62,12 @@ extern "C" fn disconnect(vtab: *mut sqlite::vtab) -> c_int { } extern "C" fn update( - vtab: *mut sqlite::vtab, - argc: c_int, - argv: *mut *mut sqlite::value, - p_row_id: *mut sqlite::int64, + _vtab: *mut sqlite::vtab, + _argc: c_int, + _argv: *mut *mut sqlite::value, + _p_row_id: *mut sqlite::int64, ) -> c_int { - let args = sqlite::args!(argc, argv); - - let rowid = args[0]; - - return if args.len() == 1 { - // DELETE - ResultCode::MISUSE as c_int - } else if rowid.value_type() == sqlite::ColumnType::Null { - // INSERT - let op = args[2].text(); - - let tab = unsafe { &mut *vtab.cast::() }; - let db = tab.db; - - if op == "save" { - let result = insert_operation(db, args[3].text()); - vtab_result(vtab, result) - } else if op == "sync_local" { - let result = sync_local(&tab.state, db, &args[3]); - if let Ok(result_row) = result { - unsafe { - *p_row_id = result_row; - } - } - vtab_result(vtab, result) - } else if op == "clear_remove_ops" { - let result = clear_remove_ops(db, args[3].text()); - vtab_result(vtab, result) - } else if op == "delete_pending_buckets" { - let result = delete_pending_buckets(db, args[3].text()); - vtab_result(vtab, result) - } else if op == "delete_bucket" { - let result = delete_bucket(db, args[3].text()); - vtab_result(vtab, result) - } else if op == "noop" { - // We call this to ensure the table is added to an active database, ensuring that - // the disconnect callback runs before the database is closed (allowing us to free - // resources from the client state). - ResultCode::OK as c_int - } else { - ResultCode::MISUSE as c_int - } - } else { - // UPDATE - not supported - ResultCode::MISUSE as c_int - } as c_int; + 0 } // Insert-only virtual table. @@ -155,7 +103,7 @@ static MODULE: sqlite::module = sqlite::module { pub fn register(db: *mut sqlite::sqlite3, state: Rc) -> Result<(), ResultCode> { db.create_module_v2( - "powersync_operations", + "powersync_internal_close", &MODULE, Some(Rc::into_raw(state) as *mut c_void), Some(DatabaseState::destroy_rc), diff --git a/crates/core/src/sync/interface.rs b/crates/core/src/sync/interface.rs index 4bc43d13..3fce1368 100644 --- a/crates/core/src/sync/interface.rs +++ b/crates/core/src/sync/interface.rs @@ -224,24 +224,14 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc) -> Result<() let op = op.text(); let event = match op { - "start" => { - // Ensure the operations vtab exists. It's not actually used by the sync client, - // but we rely on that vtab being destroyed as a pre-close hook for the database - // connection to free statements preserved across multiple powersync_control - // invocations. - db.exec_safe( - "insert into powersync_operations (op, data) VALUES ('noop', null);", - )?; - - SyncControlRequest::StartSyncStream({ - if payload.value_type() == ColumnType::Text { - serde_json::from_str(payload.text()) - .map_err(PowerSyncError::as_argument_error)? - } else { - StartSyncStream::default() - } - }) - } + "start" => SyncControlRequest::StartSyncStream({ + if payload.value_type() == ColumnType::Text { + serde_json::from_str(payload.text()) + .map_err(PowerSyncError::as_argument_error)? + } else { + StartSyncStream::default() + } + }), "stop" => SyncControlRequest::StopSyncStream, "line_text" => SyncControlRequest::SyncEvent(SyncEvent::TextLine { data: if payload.value_type() == ColumnType::Text { diff --git a/crates/core/src/sync/storage_adapter.rs b/crates/core/src/sync/storage_adapter.rs index 025b1875..094904ac 100644 --- a/crates/core/src/sync/storage_adapter.rs +++ b/crates/core/src/sync/storage_adapter.rs @@ -7,7 +7,6 @@ use serde::Serialize; use crate::{ error::{PSResult, PowerSyncError}, ext::SafeManagedStmt, - operations::delete_bucket, schema::Schema, state::DatabaseState, sync::{ @@ -132,9 +131,48 @@ impl StorageAdapter { &self, buckets: impl IntoIterator, ) -> Result<(), ResultCode> { + // Prepare statements lazily, this method may be called without any buckets to delete. + let mut delete_bucket_returning_id = None::; + let mut mark_updated = None::; + let mut delete_oplog = None::; + for bucket in buckets { - // TODO: This is a neat opportunity to create the statements here and cache them - delete_bucket(self.db, bucket)?; + let delete_bucket_returning_id = match delete_bucket_returning_id { + Some(ref stmt) => stmt, + None => delete_bucket_returning_id.insert( + self.db + .prepare_v2("DELETE FROM ps_buckets WHERE name = ?1 RETURNING id")?, + ), + }; + + delete_bucket_returning_id.bind_text(1, bucket, sqlite::Destructor::STATIC)?; + if let ResultCode::ROW = delete_bucket_returning_id.step()? { + let bucket_id = delete_bucket_returning_id.column_int64(0); + + let mark_updated = match mark_updated { + Some(ref stmt) => stmt, + None => mark_updated.insert(self.db.prepare_v2( + "\ +INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) +SELECT row_type, row_id +FROM ps_oplog +WHERE bucket = ?1", + )?), + }; + let delete_oplog = match delete_oplog { + Some(ref stmt) => stmt, + None => delete_oplog + .insert(self.db.prepare_v2("DELETE FROM ps_oplog WHERE bucket=?1")?), + }; + + mark_updated.bind_int64(1, bucket_id)?; + mark_updated.exec()?; + + delete_oplog.bind_int64(1, bucket_id)?; + delete_oplog.exec()?; + } + + delete_bucket_returning_id.reset()?; } Ok(()) diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index ea031cd1..874073c0 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -2,9 +2,8 @@ use alloc::collections::btree_map::BTreeMap; use alloc::format; use alloc::rc::Rc; use alloc::string::{String, ToString}; -use alloc::vec::Vec; +use serde::Serialize; use serde::ser::SerializeMap; -use serde::{Deserialize, Serialize}; use crate::error::{PSResult, PowerSyncError}; use crate::schema::inspection::ExistingTable; @@ -14,21 +13,11 @@ use crate::schema::{ use crate::state::DatabaseState; use crate::sync::BucketPriority; use crate::utils::SqlBuffer; -use powersync_sqlite_nostd::{self as sqlite, Destructor, ManagedStmt, Value}; -use powersync_sqlite_nostd::{ColumnType, Connection, ResultCode}; +use powersync_sqlite_nostd::{self as sqlite, Destructor, ManagedStmt}; +use powersync_sqlite_nostd::{Connection, ResultCode}; use crate::ext::SafeManagedStmt; -pub fn sync_local( - state: &DatabaseState, - db: *mut sqlite::sqlite3, - data: &V, -) -> Result { - let mut operation: SyncOperation<'_> = - SyncOperation::from_args(state, db, data).map_err(PowerSyncError::as_argument_error)?; - operation.apply() -} - pub struct PartialSyncOperation<'a> { /// The lowest priority part of the partial sync operation. pub priority: BucketPriority, @@ -45,39 +34,6 @@ pub struct SyncOperation<'a> { } impl<'a> SyncOperation<'a> { - fn from_args( - state: &'a DatabaseState, - db: *mut sqlite::sqlite3, - data: &'a V, - ) -> Result { - Ok(Self::new( - state, - db, - match data.value_type() { - ColumnType::Text => { - let text = data.text(); - if text.len() > 0 { - #[derive(Deserialize)] - struct PartialSyncLocalArguments { - #[serde(rename = "buckets")] - _buckets: Vec, - priority: BucketPriority, - } - - let args: PartialSyncLocalArguments = serde_json::from_str(text)?; - Some(PartialSyncOperation { - priority: args.priority, - args: text, - }) - } else { - None - } - } - _ => None, - }, - )) - } - pub fn new( state: &'a DatabaseState, db: *mut sqlite::sqlite3, diff --git a/crates/core/src/view_admin.rs b/crates/core/src/view_admin.rs index c6e2b248..75163438 100644 --- a/crates/core/src/view_admin.rs +++ b/crates/core/src/view_admin.rs @@ -37,6 +37,11 @@ fn powersync_init_impl( ) -> Result { powersync_migrate(ctx, LATEST_VERSION)?; + // Register the powersync_internal_close vtab to implement a "pre-close hook". + // See `pre_close_vtab.rs` for more details on how that works. + ctx.db_handle() + .exec(c"INSERT INTO powersync_internal_close(_) VALUES (null)")?; + Ok(String::from("")) } diff --git a/crates/sqlite_nostd/src/nostd.rs b/crates/sqlite_nostd/src/nostd.rs index 72f8c65b..d49b9c07 100644 --- a/crates/sqlite_nostd/src/nostd.rs +++ b/crates/sqlite_nostd/src/nostd.rs @@ -285,11 +285,7 @@ pub trait Connection { fn errmsg(&self) -> Result; fn error_offset(&self) -> Option; - /// sql should be a null terminated string! However you find is most efficient to craft those, - /// hence why we have no opinion on &str vs String vs CString vs whatever - /// todo: we should make some sort of opaque type to force null termination - /// this is inehritly unsafe - unsafe fn exec(&self, sql: *const c_char) -> Result; + fn exec(&self, sql: &CStr) -> Result; fn exec_safe(&self, sql: &str) -> Result; @@ -373,8 +369,8 @@ impl Connection for ManagedConnection { } #[inline] - unsafe fn exec(&self, sql: *const c_char) -> Result { - unsafe { self.db.exec(sql) } + fn exec(&self, sql: &CStr) -> Result { + self.db.exec(sql) } #[inline] @@ -526,8 +522,8 @@ impl Connection for *mut sqlite3 { } #[inline] - unsafe fn exec(&self, sql: *const c_char) -> Result { - convert_rc(exec(*self, sql)) + fn exec(&self, sql: &CStr) -> Result { + convert_rc(exec(*self, sql.as_ptr())) } #[inline] diff --git a/dart/test/js_key_encoding_test.dart b/dart/test/js_key_encoding_test.dart index 0b281162..17a773cf 100644 --- a/dart/test/js_key_encoding_test.dart +++ b/dart/test/js_key_encoding_test.dart @@ -15,32 +15,24 @@ void main() { }); test('can fix JS key encoding', () { - db.execute('insert into powersync_operations (op, data) VALUES (?, ?);', [ - 'save', - json.encode({ - 'buckets': [ - { - 'bucket': 'a', - 'data': [ - { - 'op_id': '1', - 'op': 'PUT', - 'object_type': 'items', - 'object_id': '1', - 'subkey': json.encode('subkey'), - 'checksum': 0, - 'data': json.encode({'col': 'a'}), - } - ], - } - ], - }) - ]); - - db.execute('INSERT INTO powersync_operations(op, data) VALUES (?, ?)', - ['sync_local', null]); - var [row] = db.select('select * from ps_oplog'); - expect(row['key'], 'items/1/"subkey"'); + var [row] = db + .select('INSERT INTO ps_buckets(name) VALUES (?) RETURNING id', ['a']); + final bucketId = row.columnAt(0) as int; + + db.execute( + 'INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, ?, ?)', + [ + bucketId, + '1', + // The JavaScript client used to insert keys like this (encoding the + // subkey part as JSON). + 'items/1/"subkey"', + 'items', + '1', + '{}', + 0 + ], + ); // Apply migration db.execute( diff --git a/dart/test/legacy_sync_test.dart b/dart/test/legacy_sync_test.dart deleted file mode 100644 index f2484105..00000000 --- a/dart/test/legacy_sync_test.dart +++ /dev/null @@ -1,337 +0,0 @@ -import 'dart:convert'; - -import 'package:fake_async/fake_async.dart'; -import 'package:file/local.dart'; -import 'package:sqlite3/common.dart'; -import 'package:sqlite3/sqlite3.dart'; -import 'package:sqlite3_test/sqlite3_test.dart'; -import 'package:test/test.dart'; - -import 'utils/native_test_utils.dart'; - -/// Tests that the older sync interfaces requiring clients to decode and handle -/// sync lines still work. -void main() { - final vfs = TestSqliteFileSystem( - fs: const LocalFileSystem(), name: 'legacy-sync-test'); - - setUpAll(() { - loadExtension(); - sqlite3.registerVirtualFileSystem(vfs, makeDefault: false); - }); - tearDownAll(() => sqlite3.unregisterVirtualFileSystem(vfs)); - - group('sync tests', () { - late CommonDatabase db; - - setUp(() async { - db = openTestDatabase(vfs: vfs) - ..select('select powersync_init();') - ..select('select powersync_replace_schema(?)', [json.encode(_schema)]); - }); - - void pushSyncData( - String bucket, - String opId, - String rowId, - Object op, - Object? data, { - Object? descriptions = _bucketDescriptions, - }) { - final encoded = json.encode({ - 'buckets': [ - { - 'bucket': bucket, - 'data': [ - { - 'op_id': opId, - 'op': op, - 'object_type': 'items', - 'object_id': rowId, - 'checksum': 0, - 'data': json.encode(data), - } - ], - } - ], - if (descriptions != null) 'descriptions': descriptions, - }); - - db.execute('insert into powersync_operations (op, data) VALUES (?, ?);', - ['save', encoded]); - } - - bool pushCheckpointComplete( - String lastOpId, String? writeCheckpoint, List checksums, - {int? priority}) { - final [row] = db.select('select powersync_validate_checkpoint(?) as r;', [ - json.encode({ - 'last_op_id': lastOpId, - 'write_checkpoint': writeCheckpoint, - 'buckets': [ - for (final cs in checksums.cast>()) - if (priority == null || cs['priority'] <= priority) cs - ], - 'priority': priority, - }) - ]); - - final decoded = json.decode(row['r']); - if (decoded['valid'] != true) { - fail(row['r']); - } - - db.execute( - 'UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))', - [ - lastOpId, - json.encode(checksums.map((e) => (e as Map)['bucket']).toList()) - ], - ); - - db.execute('INSERT INTO powersync_operations(op, data) VALUES (?, ?)', [ - 'sync_local', - priority != null - ? jsonEncode({ - 'priority': priority, - 'buckets': [ - for (final cs in checksums.cast>()) - if (cs['priority'] <= priority) cs['bucket'] - ], - }) - : null, - ]); - return db.lastInsertRowId == 1; - } - - ResultSet fetchRows() { - return db.select('select * from items'); - } - - test('does not publish until reaching checkpoint', () { - expect(fetchRows(), isEmpty); - pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'}); - expect(fetchRows(), isEmpty); - - expect( - pushCheckpointComplete( - '1', null, [_bucketChecksum('prio1', 1, checksum: 0)]), - isTrue); - expect(fetchRows(), [ - {'id': 'row-0', 'col': 'hi'} - ]); - }); - - test('does not publish with pending local data', () { - expect(fetchRows(), isEmpty); - db.execute("insert into items (id, col) values ('local', 'data');"); - expect(fetchRows(), isNotEmpty); - - pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'}); - expect( - pushCheckpointComplete( - '1', null, [_bucketChecksum('prio1', 1, checksum: 0)]), - isFalse); - expect(fetchRows(), [ - {'id': 'local', 'col': 'data'} - ]); - }); - - test('publishes with local data for prio=0 buckets', () { - expect(fetchRows(), isEmpty); - db.execute("insert into items (id, col) values ('local', 'data');"); - expect(fetchRows(), isNotEmpty); - - pushSyncData('prio0', '1', 'row-0', 'PUT', {'col': 'hi'}); - expect( - pushCheckpointComplete( - '1', - null, - [_bucketChecksum('prio0', 0, checksum: 0)], - priority: 0, - ), - isTrue, - ); - expect(fetchRows(), [ - {'id': 'local', 'col': 'data'}, - {'id': 'row-0', 'col': 'hi'}, - ]); - }); - - test('can publish partial checkpoints under different priorities', () { - for (var i = 0; i < 4; i++) { - pushSyncData('prio$i', '1', 'row-$i', 'PUT', {'col': '$i'}); - } - expect(fetchRows(), isEmpty); - - // Simulate a partial checkpoint complete for each of the buckets. - for (var i = 0; i < 4; i++) { - expect( - pushCheckpointComplete( - '1', - null, - [ - for (var j = 0; j <= 4; j++) - _bucketChecksum( - 'prio$j', - j, - // Give buckets outside of the current priority a wrong - // checksum. They should not be validated yet. - checksum: j <= i ? 0 : 1234, - ), - ], - priority: i, - ), - isTrue, - ); - - expect(fetchRows(), [ - for (var j = 0; j <= i; j++) {'id': 'row-$j', 'col': '$j'}, - ]); - - expect(db.select('select 1 from ps_sync_state where priority = ?', [i]), - isNotEmpty); - // A sync at this priority includes all higher priorities too, so they - // should be cleared. - expect(db.select('select 1 from ps_sync_state where priority < ?', [i]), - isEmpty); - } - }); - - test('can sync multiple times', () { - fakeAsync((controller) { - for (var i = 0; i < 10; i++) { - for (var prio in const [1, 2, 3, null]) { - pushCheckpointComplete('1', null, [], priority: prio); - - // Make sure there's only a single row in last_synced_at - expect( - db.select( - "SELECT datetime(last_synced_at, 'localtime') AS last_synced_at FROM ps_sync_state WHERE priority = ?", - [prio ?? 2147483647]), - [ - {'last_synced_at': '2025-03-01 ${10 + i}:00:00'} - ], - ); - - if (prio == null) { - expect( - db.select( - "SELECT datetime(powersync_last_synced_at(), 'localtime') AS last_synced_at"), - [ - {'last_synced_at': '2025-03-01 ${10 + i}:00:00'} - ], - ); - } - } - - controller.elapse(const Duration(hours: 1)); - } - }, initialTime: DateTime(2025, 3, 1, 10)); - }); - - test('clearing database clears sync status', () { - pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'}); - - expect( - pushCheckpointComplete( - '1', null, [_bucketChecksum('prio1', 1, checksum: 0)]), - isTrue); - expect(db.select('SELECT powersync_last_synced_at() AS r').single, - {'r': isNotNull}); - expect(db.select('SELECT priority FROM ps_sync_state').single, - {'priority': 2147483647}); - - db.execute('SELECT powersync_clear(0)'); - expect(db.select('SELECT powersync_last_synced_at() AS r').single, - {'r': isNull}); - expect(db.select('SELECT * FROM ps_sync_state'), hasLength(0)); - }); - - test('tracks download progress', () { - const bucket = 'bkt'; - void expectProgress(int atLast, int sinceLast) { - final [row] = db.select( - 'SELECT count_at_last, count_since_last FROM ps_buckets WHERE name = ?', - [bucket], - ); - final [actualAtLast, actualSinceLast] = row.values; - - expect(actualAtLast, atLast, reason: 'count_at_last mismatch'); - expect(actualSinceLast, sinceLast, reason: 'count_since_last mismatch'); - } - - pushSyncData(bucket, '1', 'row-0', 'PUT', {'col': 'hi'}); - expectProgress(0, 1); - - pushSyncData(bucket, '2', 'row-1', 'PUT', {'col': 'hi'}); - expectProgress(0, 2); - - expect( - pushCheckpointComplete( - '2', - null, - [_bucketChecksum(bucket, 1, checksum: 0)], - priority: 1, - ), - isTrue, - ); - - // Running partial or complete checkpoints should not reset stats, client - // SDKs are responsible for that. - expectProgress(0, 2); - expect(db.select('SELECT * FROM items'), isNotEmpty); - - expect( - pushCheckpointComplete( - '2', - null, - [_bucketChecksum(bucket, 1, checksum: 0)], - ), - isTrue, - ); - expectProgress(0, 2); - - db.execute(''' -UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name - WHERE ?1->name IS NOT NULL -''', [ - json.encode({bucket: 2}), - ]); - expectProgress(2, 0); - - // Run another iteration of this - pushSyncData(bucket, '3', 'row-3', 'PUT', {'col': 'hi'}); - expectProgress(2, 1); - db.execute(''' -UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name - WHERE ?1->name IS NOT NULL -''', [ - json.encode({bucket: 3}), - ]); - expectProgress(3, 0); - }); - }); -} - -Object? _bucketChecksum(String bucket, int prio, {int checksum = 0}) { - return {'bucket': bucket, 'priority': prio, 'checksum': checksum}; -} - -const _schema = { - 'tables': [ - { - 'name': 'items', - 'columns': [ - {'name': 'col', 'type': 'text'} - ], - } - ] -}; - -const _bucketDescriptions = { - 'prio0': {'priority': 0}, - 'prio1': {'priority': 1}, - 'prio2': {'priority': 2}, - 'prio3': {'priority': 3}, -}; diff --git a/dart/test/migration_test.dart b/dart/test/migration_test.dart index 9455e8e4..d442edd1 100644 --- a/dart/test/migration_test.dart +++ b/dart/test/migration_test.dart @@ -7,6 +7,7 @@ import 'utils/native_test_utils.dart'; import 'utils/migration_fixtures.dart' as fixtures; import 'utils/fix_035_fixtures.dart' as fix035; import 'utils/schema.dart'; +import 'utils/test_utils.dart'; void main() { group('Migration Tests', () { @@ -201,8 +202,23 @@ void main() { final data = getData(db); expect(data, equals(fix035.dataMigrated.trim())); - db.select('insert into powersync_operations(op, data) values(?, ?)', - ['sync_local', '']); + { + void control(String op, [Object? payload]) { + db.execute('select powersync_control(?, ?)', [op, payload]); + } + + db.execute('begin'); + control('start'); + control( + 'line_text', + json.encode(checkpoint(lastOpId: 3, buckets: [ + bucketDescription('b1', checksum: 120), + bucketDescription('b2', checksum: 3), + ])), + ); + control('line_text', json.encode(checkpointComplete(lastOpId: '3'))); + db.execute('commit'); + } final data2 = getData(db); expect(data2, equals(fix035.dataFixed.trim())); diff --git a/dart/test/sync_local_performance_test.dart b/dart/test/sync_local_performance_test.dart index ec0f5ba3..9f68597c 100644 --- a/dart/test/sync_local_performance_test.dart +++ b/dart/test/sync_local_performance_test.dart @@ -8,6 +8,7 @@ import 'package:sqlite3/sqlite3.dart'; import 'package:test/test.dart'; import 'utils/native_test_utils.dart'; +import 'utils/test_utils.dart'; import 'utils/tracking_vfs.dart'; import './schema_test.dart' show schema; @@ -90,11 +91,47 @@ COMMIT; vfs.clearStats(); }); - test('sync_local (full)', () { + Stopwatch runSyncLocal(int? priority) { + db.execute('begin'); + void control(String op, [Object? payload]) { + db.execute('SELECT powersync_control(?, ?)', [op, payload]); + } + + // Start a fake sync client to apply the changes we've already written to + // ps_oplog + control('start'); + final lastOpid = + db.select('select max(op_id) from ps_oplog').single.columnAt(0) as int; + final allBuckets = db + .select('SELECT name FROM ps_buckets') + .map((r) => r.columnAt(0) as String); + const highPriorityBuckets = { + 'bucket0', + 'bucket3', + 'bucket4', + 'bucket5', + 'bucket6' + }; + control( + 'line_text', + json.encode(checkpoint(lastOpId: lastOpid, buckets: [ + for (final bucket in allBuckets) + bucketDescription(bucket, + priority: highPriorityBuckets.contains(bucket) ? 2 : 3), + ])), + ); + var timer = Stopwatch()..start(); - db.select('insert into powersync_operations(op, data) values(?, ?)', - ['sync_local', '']); + control('line_text', json.encode(checkpointComplete(priority: priority))); + db.execute('commit'); + + timer.stop(); print('${timer.elapsed.inMilliseconds}ms ${vfs.stats()}'); + return timer; + } + + test('sync_local (full)', () { + final timer = runSyncLocal(null); // These are fairly generous limits, to catch significant regressions only. expect(vfs.tempWrites, lessThan(count / 50)); @@ -103,15 +140,8 @@ COMMIT; }); test('sync_local (partial)', () { - var timer = Stopwatch()..start(); - db.select('insert into powersync_operations(op, data) values(?, ?)', [ - 'sync_local', - jsonEncode({ - 'buckets': ['bucket0', 'bucket3', 'bucket4', 'bucket5', 'bucket6'], - 'priority': 2 - }) - ]); - print('${timer.elapsed.inMilliseconds}ms ${vfs.stats()}'); + final timer = runSyncLocal(2); + expect(vfs.tempWrites, lessThan(count / 50)); expect(timer.elapsed, lessThan(Duration(milliseconds: 100 + (count / 50).round()))); diff --git a/dart/test/utils/fix_035_fixtures.dart b/dart/test/utils/fix_035_fixtures.dart index 5f7b3978..fa912e89 100644 --- a/dart/test/utils/fix_035_fixtures.dart +++ b/dart/test/utils/fix_035_fixtures.dart @@ -40,8 +40,8 @@ const dataMigrated = ''' /// Data after applying the migration fix and sync_local const dataFixed = ''' ;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, downloaded_size) VALUES - (1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0, 0), - (2, 'b2', 0, 0, 0, 0, 3, 0, 0, 0, 0) + (1, 'b1', 3, 3, 0, 0, 120, 0, 1, 0, 0), + (2, 'b2', 3, 3, 0, 0, 3, 0, 1, 0, 0) ;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES (1, 1, 'todos', 't1', '', '{}', 100), (1, 2, 'todos', 't2', '', '{}', 20),