Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
pub use spacetimedb_durability::{DurabilityExited, DurableOffset};
use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject};
use spacetimedb_execution::RelValue;
use spacetimedb_expr::expr::CollectViews;
use spacetimedb_expr::expr::{BindEnv, CollectViews};
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
use spacetimedb_lib::http::{Request as HttpRequest, Response as HttpResponse};
use spacetimedb_lib::identity::{AuthCtx, RequestId};
Expand Down Expand Up @@ -3272,13 +3272,14 @@ impl ModuleHost {
plans,
_,
table_name,
_,
requires_sender_binding,
) = compile_subscription(query, &schema_tx, auth)?;
let bind_env = BindEnv::for_sender_binding(requires_sender_binding, auth.caller());

// Optimize each fragment.
let optimized = plans
.into_iter()
.map(|plan| plan.optimize(auth))
.map(|plan| plan.optimize(auth).map(|plan| plan.bind_params(&bind_env)))
.collect::<Result<Vec<_>, _>>()?;

check_row_limit(
Expand Down
10 changes: 6 additions & 4 deletions crates/core/src/subscription/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::host::module_host::UpdatesRelValue;
use anyhow::Result;
use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap};
use spacetimedb_execution::{Datastore, DeltaStore, RelValue, Row};
use spacetimedb_expr::expr::BindEnv;
use spacetimedb_lib::metrics::ExecutionMetrics;
use spacetimedb_primitives::ColList;
use spacetimedb_sats::product_value::InvalidFieldError;
Expand All @@ -20,6 +21,7 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>(
tx: &'a Tx,
metrics: &mut ExecutionMetrics,
plan: &SubscriptionPlan,
bind_env: &BindEnv,
) -> Result<Option<UpdatesRelValue<'a>>> {
metrics.delta_queries_evaluated += 1;

Expand All @@ -42,12 +44,12 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>(
if !plan.is_join() {
// Single table plans will never return redundant rows,
// so there's no need to track row counts.
plan.for_each_insert(tx, metrics, &mut |row| {
plan.for_each_insert(bind_env, tx, metrics, &mut |row| {
inserts.push(maybe_project(row)?);
Ok(())
})?;

plan.for_each_delete(tx, metrics, &mut |row| {
plan.for_each_delete(bind_env, tx, metrics, &mut |row| {
deletes.push(maybe_project(row)?);
Ok(())
})?;
Expand All @@ -57,7 +59,7 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>(
let mut insert_counts = HashMap::new();
let mut delete_counts = HashMap::new();

plan.for_each_insert(tx, metrics, &mut |row| {
plan.for_each_insert(bind_env, tx, metrics, &mut |row| {
let row = maybe_project(row)?;
let n = insert_counts.entry(row).or_default();
if *n > 0 {
Expand All @@ -67,7 +69,7 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>(
Ok(())
})?;

plan.for_each_delete(tx, metrics, &mut |row| {
plan.for_each_delete(bind_env, tx, metrics, &mut |row| {
let row = maybe_project(row)?;
match insert_counts.get_mut(&row) {
// We have not seen an insert for this row.
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/subscription/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ fn extract_columns(
extract_columns(expr, schema, columns);
}
}
PhysicalExpr::Value(_) => {}
PhysicalExpr::Value(_) | PhysicalExpr::Param(..) => {}
}
}

Expand Down
21 changes: 17 additions & 4 deletions crates/core/src/subscription/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,14 +256,27 @@ pub fn execute_plans<F: BuildableWebsocketFormat>(
) -> Result<(ws_v1::DatabaseUpdate<F>, ExecutionMetrics, Vec<QueryMetrics>), DBError> {
plans
.par_iter()
.flat_map_iter(|plan| plan.plans_fragments().map(|fragment| (plan.sql(), fragment)))
.filter(|(_, plan)| {
.flat_map_iter(|plan| {
plan.plans_fragments()
.map(|fragment| (plan.sql(), plan.bind_env(), fragment))
})
.filter(|(_, _, plan)| {
// Since subscriptions only support selects and inner joins,
// we filter out any plans that read from an empty table.
plan.table_ids().all(|table_id| tx.row_count(table_id) > 0)
})
.map(|(sql, plan)| (sql, plan, plan.subscribed_table_id(), plan.subscribed_table_name()))
.map(|(sql, plan, table_id, table_name)| (sql, plan.optimized_physical_plan().clone(), table_id, table_name))
.map(|(sql, bind_env, plan)| {
(
sql,
bind_env,
plan,
plan.subscribed_table_id(),
plan.subscribed_table_name(),
)
})
.map(|(sql, bind_env, plan, table_id, table_name)| {
(sql, plan.bound_optimized_physical_plan(bind_env), table_id, table_name)
})
.map(|(sql, plan, table_id, table_name)| (sql, plan.optimize(auth), table_id, table_name))
.map(|(sql, plan, table_id, table_name)| {
plan.and_then(|plan| {
Expand Down
15 changes: 10 additions & 5 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,9 @@ impl ModuleSubscriptions {
tx,
|plan, tx| {
plan.plans_fragments()
.map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.optimized_physical_plan()))
.map(|plan_fragment| {
estimate_rows_scanned(tx, &plan_fragment.bound_optimized_physical_plan(plan.bind_env()))
})
.fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned))
},
auth,
Expand All @@ -438,8 +440,7 @@ impl ModuleSubscriptions {

let plans = query
.plans_fragments()
.map(|fragment| fragment.optimized_physical_plan())
.cloned()
.map(|fragment| fragment.bound_optimized_physical_plan(query.bind_env()))
.map(|plan| plan.optimize(auth))
.collect::<Result<Vec<_>, _>>()?;

Expand Down Expand Up @@ -533,7 +534,9 @@ impl ModuleSubscriptions {
tx,
|plan, tx| {
plan.plans_fragments()
.map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.optimized_physical_plan()))
.map(|plan_fragment| {
estimate_rows_scanned(tx, &plan_fragment.bound_optimized_physical_plan(plan.bind_env()))
})
.fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned))
},
auth,
Expand Down Expand Up @@ -1538,7 +1541,9 @@ impl ModuleSubscriptions {
&tx,
|plan, tx| {
plan.plans_fragments()
.map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.optimized_physical_plan()))
.map(|plan_fragment| {
estimate_rows_scanned(tx, &plan_fragment.bound_optimized_physical_plan(plan.bind_env()))
})
.fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned))
},
&auth,
Expand Down
103 changes: 76 additions & 27 deletions crates/core/src/subscription/module_subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use spacetimedb_data_structures::map::{
};
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
use spacetimedb_durability::TxOffset;
use spacetimedb_expr::expr::CollectViews;
use spacetimedb_expr::expr::{BindEnv, CollectViews};
use spacetimedb_lib::metrics::ExecutionMetrics;
use spacetimedb_lib::{AlgebraicValue, ConnectionId, Identity, ProductValue};
use spacetimedb_primitives::{ColId, IndexId, TableId, ViewId};
Expand All @@ -45,7 +45,7 @@ use tokio::sync::{mpsc, oneshot};
/// Identity is insufficient because different ConnectionIds can use the same Identity.
/// TODO: Determine if ConnectionId is sufficient for uniquely identifying a client.
type ClientId = (Identity, ConnectionId);
type Query = Arc<Plan>;
type Query = Arc<PlanInstance>;
type Client = Arc<ClientConnectionSender>;
type SwitchedTableUpdate =
ws_v1::FormatSwitch<ws_v1::TableUpdate<ws_v1::BsatnFormat>, ws_v1::TableUpdate<ws_v1::JsonFormat>>;
Expand All @@ -60,25 +60,61 @@ type ClientQueryId = ws_v1::QueryId;
type SubscriptionId = (ClientId, ClientQueryId);
type SubscriptionIdV2 = (ClientId, ClientQuerySetId);

/// The unbound, reusable subscription query plan.
///
/// Runtime values such as `:sender` are represented as formal parameters inside these plans.
/// TODO: Intern and share `PlanTemplate`s across bound [`PlanInstance`]s with the same template hash.
/// Today, [`PlanInstance::new`] still allocates a fresh template for each cached subscription instance.
#[derive(Debug)]
pub struct Plan {
hash: QueryHash,
pub struct PlanTemplate {
sql: String,
plans: Vec<SubscriptionPlan>,
}

impl CollectViews for Plan {
impl CollectViews for PlanTemplate {
fn collect_views(&self, views: &mut HashSet<ViewId>) {
for plan in &self.plans {
plan.collect_views(views);
}
}
}

impl Plan {
/// Create a new subscription plan to be cached
pub fn new(plans: Vec<SubscriptionPlan>, hash: QueryHash, text: String) -> Self {
Self { plans, hash, sql: text }
impl PlanTemplate {
pub fn new(plans: Vec<SubscriptionPlan>, text: String) -> Self {
Self { sql: text, plans }
}
}

/// A concrete subscription query instance with runtime parameter bindings.
#[derive(Debug)]
pub struct PlanInstance {
hash: QueryHash,
template: Arc<PlanTemplate>,
bind_env: BindEnv,
}

/// A subscription plan tracked by the subscription manager.
pub type Plan = PlanInstance;

impl CollectViews for PlanInstance {
fn collect_views(&self, views: &mut HashSet<ViewId>) {
self.template.collect_views(views);
}
}

impl PlanInstance {
/// Create a new subscription plan instance to be cached.
pub fn new(plans: Vec<SubscriptionPlan>, hash: QueryHash, text: String, bind_env: BindEnv) -> Self {
Self::from_template(Arc::new(PlanTemplate::new(plans, text)), hash, bind_env)
}

/// Create a new subscription plan instance from an existing plan template.
pub fn from_template(template: Arc<PlanTemplate>, hash: QueryHash, bind_env: BindEnv) -> Self {
Self {
hash,
template,
bind_env,
}
}

/// Returns the query hash for this subscription
Expand All @@ -89,18 +125,19 @@ impl Plan {
/// A subscription query return rows from a single table.
/// This method returns the id of that table.
pub fn subscribed_table_id(&self) -> TableId {
self.plans[0].subscribed_table_id()
self.template.plans[0].subscribed_table_id()
}

/// A subscription query return rows from a single table.
/// This method returns the name of that table.
pub fn subscribed_table_name(&self) -> &TableName {
self.plans[0].subscribed_table_name()
self.template.plans[0].subscribed_table_name()
}

/// Returns the index ids from which this subscription reads
pub fn index_ids(&self) -> impl Iterator<Item = (TableId, IndexId)> + use<> {
self.plans
self.template
.plans
.iter()
.flat_map(|plan| plan.index_ids())
.collect::<HashSet<_>>()
Expand All @@ -109,7 +146,8 @@ impl Plan {

/// Returns the table ids from which this subscription reads
pub fn table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
self.plans
self.template
.plans
.iter()
.flat_map(|plan| plan.table_ids())
.collect::<HashSet<_>>()
Expand All @@ -120,9 +158,10 @@ impl Plan {
fn search_args(&self) -> impl Iterator<Item = (TableId, ColId, AlgebraicValue)> + use<> {
let mut args = HashSet::new();
for arg in self
.template
.plans
.iter()
.flat_map(|subscription| subscription.optimized_physical_plan().search_args())
.flat_map(|subscription| subscription.bound_optimized_physical_plan(&self.bind_env).search_args())
{
args.insert(arg);
}
Expand All @@ -132,22 +171,30 @@ impl Plan {
/// Returns the plan fragments that comprise this subscription.
/// Will only return one element unless there is a table with multiple RLS rules.
pub fn plans_fragments(&self) -> impl Iterator<Item = &SubscriptionPlan> + '_ {
self.plans.iter()
self.template.plans.iter()
}

/// Returns the join edges for this plan, if any.
pub fn join_edges(&self) -> impl Iterator<Item = (JoinEdge, AlgebraicValue)> + '_ {
self.plans.iter().filter_map(|plan| plan.join_edge())
self.template
.plans
.iter()
.filter_map(|plan| plan.bound_join_edge(&self.bind_env))
}

/// The `SQL` text of this subscription.
pub fn sql(&self) -> &str {
&self.sql
&self.template.sql
}

/// Runtime parameter bindings for this subscription instance.
pub fn bind_env(&self) -> &BindEnv {
&self.bind_env
}

/// Does this plan return rows from an event table?
pub fn returns_event_table(&self) -> bool {
self.plans.iter().any(|p| p.returns_event_table())
self.template.plans.iter().any(|p| p.returns_event_table())
}
}

Expand Down Expand Up @@ -1480,15 +1527,15 @@ impl SubscriptionManager {
})
.fold(FoldState::default(), |mut acc, (qstate, plan, _hash)| {
let table_name = plan.subscribed_table_name().clone();
match eval_delta(tx, &mut acc.metrics, plan) {
match eval_delta(tx, &mut acc.metrics, plan, qstate.query.bind_env()) {
Err(err) => {
tracing::error!(
message = "Query errored during tx update",
sql = qstate.query.sql,
sql = qstate.query.sql(),
reason = ?err,
);
let err = DBError::WithSql {
sql: qstate.query.sql.as_str().into(),
sql: qstate.query.sql().into(),
error: Box::new(err.into()),
}
.to_string()
Expand Down Expand Up @@ -1637,15 +1684,15 @@ impl SubscriptionManager {

let clients_for_query = qstate.all_v1_clients();

match eval_delta(tx, &mut acc.metrics, plan) {
match eval_delta(tx, &mut acc.metrics, plan, qstate.query.bind_env()) {
Err(err) => {
tracing::error!(
message = "Query errored during tx update",
sql = qstate.query.sql,
sql = qstate.query.sql(),
reason = ?err,
);
let err = DBError::WithSql {
sql: qstate.query.sql.as_str().into(),
sql: qstate.query.sql().into(),
error: Box::new(err.into()),
}
.to_string()
Expand Down Expand Up @@ -2191,6 +2238,7 @@ mod tests {
use std::{sync::Arc, time::Duration};

use spacetimedb_client_api_messages::websocket::{v1 as ws_v1, v2 as ws_v2};
use spacetimedb_expr::expr::BindEnv;
use spacetimedb_lib::AlgebraicValue;
use spacetimedb_lib::{error::ResultTest, identity::AuthCtx, AlgebraicType, ConnectionId, Identity, Timestamp};
use spacetimedb_primitives::{ColId, TableId};
Expand Down Expand Up @@ -2231,9 +2279,10 @@ mod tests {
fn compile_plan_with_auth(db: &RelationalDB, sql: &str, auth: AuthCtx) -> ResultTest<Arc<Plan>> {
with_read_only(db, |tx| {
let tx = SchemaViewer::new(&*tx, &auth);
let (plans, has_param) = SubscriptionPlan::compile(sql, &tx, &auth).unwrap();
let hash = QueryHash::from_string(sql, auth.caller(), has_param);
Ok(Arc::new(Plan::new(plans, hash, sql.into())))
let (plans, requires_sender_binding) = SubscriptionPlan::compile(sql, &tx, &auth).unwrap();
let hash = QueryHash::from_string(sql, auth.caller(), requires_sender_binding);
let bind_env = BindEnv::for_sender_binding(requires_sender_binding, auth.caller());
Ok(Arc::new(Plan::new(plans, hash, sql.into(), bind_env)))
})
}

Expand Down
Loading
Loading