Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 0 additions & 71 deletions crates/core/src/checkpoint.rs

This file was deleted.

7 changes: 2 additions & 5 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use sqlite::ResultCode;
use crate::{error::PowerSyncError, state::DatabaseState};

mod bson;
mod checkpoint;
mod constants;
mod crud_vtab;
mod diff;
Expand All @@ -21,8 +20,7 @@ mod json_util;
mod kv;
mod macros;
mod migrations;
mod operations;
mod operations_vtab;
mod pre_close_vtab;
mod schema;
mod state;
mod sync;
Expand Down Expand Up @@ -73,14 +71,13 @@ fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), PowerSyncError> {
crate::fix_data::register(db)?;
crate::json_util::register(db)?;
crate::view_admin::register(db, state.clone())?;
crate::checkpoint::register(db)?;
crate::kv::register(db)?;
crate::state::register(db, state.clone())?;
sync::register(db, state.clone())?;
update_hooks::register(db, state.clone())?;

crate::schema::register(db, state.clone())?;
crate::operations_vtab::register(db, state.clone())?;
crate::pre_close_vtab::register(db, state.clone())?;
crate::crud_vtab::register(db, state)?;

Ok(())
Expand Down
69 changes: 0 additions & 69 deletions crates/core/src/operations.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,21 @@ use alloc::rc::Rc;
use core::ffi::{c_char, c_int, c_void};

use powersync_sqlite_nostd as sqlite;
use sqlite::{Connection, ResultCode, Value};
use sqlite::{Connection, ResultCode};

use crate::operations::{
clear_remove_ops, delete_bucket, delete_pending_buckets, insert_operation,
};
use crate::state::DatabaseState;
use crate::sync_local::sync_local;
use crate::vtab_util::*;

/// A virtual table hack to implement a "pre-close hook" for databases.
///
/// When `sqlite3_close` is called, SQLite invokes the disconnect callback of virtual tables
/// attached to the connection. This gives us an opportunity to call
/// [DatabaseState::release_resources], allowing us to close active sync clients and associated
/// prepared statements. Without this, our internal statements might lease resources.
#[repr(C)]
struct VirtualTable {
base: sqlite::vtab,
db: *mut sqlite::sqlite3,
state: Rc<DatabaseState>,

target_applied: bool,
target_validated: bool,
}

extern "C" fn connect(
Expand All @@ -32,9 +30,7 @@ extern "C" fn connect(
vtab: *mut *mut sqlite::vtab,
_err: *mut *mut c_char,
) -> c_int {
if let Err(rc) =
sqlite::declare_vtab(db, "CREATE TABLE powersync_operations(op TEXT, data TEXT);")
{
if let Err(rc) = sqlite::declare_vtab(db, "CREATE TABLE powersync_internal_close(_ TEXT);") {
return rc as c_int;
}

Expand All @@ -45,10 +41,7 @@ extern "C" fn connect(
pModule: core::ptr::null(),
zErrMsg: core::ptr::null_mut(),
},
db,
state: DatabaseState::clone_from(aux),
target_validated: false,
target_applied: false,
}));
*vtab = tab.cast::<sqlite::vtab>();
let _ = sqlite::vtab_config(db, 0);
Expand All @@ -69,57 +62,12 @@ extern "C" fn disconnect(vtab: *mut sqlite::vtab) -> c_int {
}

extern "C" fn update(
vtab: *mut sqlite::vtab,
argc: c_int,
argv: *mut *mut sqlite::value,
p_row_id: *mut sqlite::int64,
_vtab: *mut sqlite::vtab,
_argc: c_int,
_argv: *mut *mut sqlite::value,
_p_row_id: *mut sqlite::int64,
) -> c_int {
let args = sqlite::args!(argc, argv);

let rowid = args[0];

return if args.len() == 1 {
// DELETE
ResultCode::MISUSE as c_int
} else if rowid.value_type() == sqlite::ColumnType::Null {
// INSERT
let op = args[2].text();

let tab = unsafe { &mut *vtab.cast::<VirtualTable>() };
let db = tab.db;

if op == "save" {
let result = insert_operation(db, args[3].text());
vtab_result(vtab, result)
} else if op == "sync_local" {
let result = sync_local(&tab.state, db, &args[3]);
if let Ok(result_row) = result {
unsafe {
*p_row_id = result_row;
}
}
vtab_result(vtab, result)
} else if op == "clear_remove_ops" {
let result = clear_remove_ops(db, args[3].text());
vtab_result(vtab, result)
} else if op == "delete_pending_buckets" {
let result = delete_pending_buckets(db, args[3].text());
vtab_result(vtab, result)
} else if op == "delete_bucket" {
let result = delete_bucket(db, args[3].text());
vtab_result(vtab, result)
} else if op == "noop" {
// We call this to ensure the table is added to an active database, ensuring that
// the disconnect callback runs before the database is closed (allowing us to free
// resources from the client state).
ResultCode::OK as c_int
} else {
ResultCode::MISUSE as c_int
}
} else {
// UPDATE - not supported
ResultCode::MISUSE as c_int
} as c_int;
0
}

// Insert-only virtual table.
Expand Down Expand Up @@ -155,7 +103,7 @@ static MODULE: sqlite::module = sqlite::module {

pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<(), ResultCode> {
db.create_module_v2(
"powersync_operations",
"powersync_internal_close",
&MODULE,
Some(Rc::into_raw(state) as *mut c_void),
Some(DatabaseState::destroy_rc),
Expand Down
26 changes: 8 additions & 18 deletions crates/core/src/sync/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,24 +224,14 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<()

let op = op.text();
let event = match op {
"start" => {
// Ensure the operations vtab exists. It's not actually used by the sync client,
// but we rely on that vtab being destroyed as a pre-close hook for the database
// connection to free statements preserved across multiple powersync_control
// invocations.
db.exec_safe(
"insert into powersync_operations (op, data) VALUES ('noop', null);",
)?;

SyncControlRequest::StartSyncStream({
if payload.value_type() == ColumnType::Text {
serde_json::from_str(payload.text())
.map_err(PowerSyncError::as_argument_error)?
} else {
StartSyncStream::default()
}
})
}
"start" => SyncControlRequest::StartSyncStream({
if payload.value_type() == ColumnType::Text {
serde_json::from_str(payload.text())
.map_err(PowerSyncError::as_argument_error)?
} else {
StartSyncStream::default()
}
}),
"stop" => SyncControlRequest::StopSyncStream,
"line_text" => SyncControlRequest::SyncEvent(SyncEvent::TextLine {
data: if payload.value_type() == ColumnType::Text {
Expand Down
44 changes: 41 additions & 3 deletions crates/core/src/sync/storage_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use serde::Serialize;
use crate::{
error::{PSResult, PowerSyncError},
ext::SafeManagedStmt,
operations::delete_bucket,
schema::Schema,
state::DatabaseState,
sync::{
Expand Down Expand Up @@ -132,9 +131,48 @@ impl StorageAdapter {
&self,
buckets: impl IntoIterator<Item = &'a str>,
) -> Result<(), ResultCode> {
// Prepare statements lazily, this method may be called without any buckets to delete.
let mut delete_bucket_returning_id = None::<ManagedStmt>;
let mut mark_updated = None::<ManagedStmt>;
let mut delete_oplog = None::<ManagedStmt>;

for bucket in buckets {
// TODO: This is a neat opportunity to create the statements here and cache them
delete_bucket(self.db, bucket)?;
let delete_bucket_returning_id = match delete_bucket_returning_id {
Some(ref stmt) => stmt,
None => delete_bucket_returning_id.insert(
self.db
.prepare_v2("DELETE FROM ps_buckets WHERE name = ?1 RETURNING id")?,
),
};

delete_bucket_returning_id.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
if let ResultCode::ROW = delete_bucket_returning_id.step()? {
let bucket_id = delete_bucket_returning_id.column_int64(0);

let mark_updated = match mark_updated {
Some(ref stmt) => stmt,
None => mark_updated.insert(self.db.prepare_v2(
"\
INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id)
SELECT row_type, row_id
FROM ps_oplog
WHERE bucket = ?1",
)?),
};
let delete_oplog = match delete_oplog {
Some(ref stmt) => stmt,
None => delete_oplog
.insert(self.db.prepare_v2("DELETE FROM ps_oplog WHERE bucket=?1")?),
};

mark_updated.bind_int64(1, bucket_id)?;
mark_updated.exec()?;

delete_oplog.bind_int64(1, bucket_id)?;
delete_oplog.exec()?;
}

delete_bucket_returning_id.reset()?;
}

Ok(())
Expand Down
Loading
Loading