diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index e69c2540ddd..9192ef7587a 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -52,8 +52,42 @@ pub struct InstanceEnv { procedure_last_tx_offset: Option, } +/// `InstanceEnv` needs to be `Send` because it is created on the host thread +/// and moved to module threads for execution (see [`ModuleHost::with_instance`]). +/// +/// `TxSlot` must be `None` whenever `InstanceEnv` is moved across threads, which is +/// not enforced at compile time but seems to be upheld in practice. +/// +/// In the future, we may push to use `InstanceEnv` only within a module thread, +/// but this still helps prevent a set of bugs that occurred due to `MutTxId` being `Send`, +/// such as: +/// https://github.com/clockworklabs/SpacetimeDB/pull/3938 and +/// https://github.com/clockworklabs/SpacetimeDB/pull/3968. +/// `InstanceEnv` needs to be `Send` because it is created on the host thread +/// and moved to module threads for execution (see [`ModuleHost::with_instance`]). +/// +/// `TxSlot` must be `None` whenever `InstanceEnv` is moved across threads, which is +/// not enforced at compile time but seems to be upheld in practice. +/// +/// In the future, we may push to use `InstanceEnv` only within a module thread, +/// but this still helps prevent a set of bugs that occurred due to `MutTxId` being `Send`, +/// such as: +/// https://github.com/clockworklabs/SpacetimeDB/pull/3938 and +/// https://github.com/clockworklabs/SpacetimeDB/pull/3968. +/// +/// # Safety +/// +/// `InstanceEnv` doesn't auto-derive `Send` because it may hold a `MutTxId`, +/// which we've manually made `!Send` to preserve logical invariants. +/// As described above, sending an `InstanceEnv` while it holds a `MutTxId` will violate logical invariants, +/// but this is not a safety concern. +/// Transferring a `MutTxId` between threads will never cause Undefined Behavior, +/// though it is likely to lead to deadlocks. +unsafe impl Send for InstanceEnv {} + #[derive(Clone, Default)] pub struct TxSlot { + // Wrapped in Mutex for interior mutability. inner: Arc>>, } diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index f0f12e35f5a..1055292b444 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1500,9 +1500,9 @@ impl ModuleHost { Ok(self .call( &reducer_def.name, - (None, call_reducer_params), - |(tx, p), inst| inst.call_reducer(tx, p), - |(_, p), inst| inst.call_reducer(p), + call_reducer_params, + |p, inst| inst.call_reducer(p), + |p, inst| inst.call_reducer(p), ) .await?) } diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index b3e7c105be5..5a46ae01bef 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -394,78 +394,99 @@ pub(super) async fn call_scheduled_function( }) }; - let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); - - // Determine the call params. - // This also lets us know whether to call a reducer or procedure. - let params = call_params_for_queued_item(module_info, db, &tx, item); - let (timestamp, instant, params) = match params { - // If the function was already deleted, leave the `ScheduledFunction` - // in the database for when the module restarts. - Ok(None) => return (CallScheduledFunctionResult { reschedule: None }, false), - Ok(Some(params)) => params, - Err(err) => { - // All we can do here is log an error. - log::error!("could not determine scheduled function or its parameters: {err:#}"); - let reschedule = delete_scheduled_function_row(Some(tx), None); - return (CallScheduledFunctionResult { reschedule }, false); - } - }; + enum Function { + Reducer(CallScheduledFunctionResult, bool), + Procedure { + params: CallProcedureParams, + reschedule: Option, + }, + } - // We've determined whether we have a reducer or procedure. - // The logic between them will now split, - // as for scheduled procedures, it's incorrect to retry them if execution aborts midway, - // so we must remove the schedule row before executing. - match params { - CallParams::Reducer(params) => { - // Patch the transaction context with ReducerContext so the commitlog - // records the reducer's name, caller, timestamp, and arguments. - // - // Background: Scheduled reducers start with Workload::Internal, but - // call_reducer_with_tx only sets ReducerContext when tx is None. - // Since we pass Some(tx), we must set it here. - let reducer_name = &*module_info.module_def.reducer_by_id(params.reducer_id).name; - tx.ctx = ExecutionContext::with_workload( - tx.ctx.database_identity(), - Workload::Reducer(ReducerContext { - name: reducer_name.into(), - caller_identity: params.caller_identity, - caller_connection_id: params.caller_connection_id, - timestamp: params.timestamp, - arg_bsatn: params.args.get_bsatn().clone(), - }), - ); + let next_step = { + let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); + + // Determine the call params. + // This also lets us know whether to call a reducer or procedure. + let params = call_params_for_queued_item(module_info, db, &tx, item); + let (timestamp, instant, params) = match params { + // If the function was already deleted, leave the `ScheduledFunction` + // in the database for when the module restarts. + Ok(None) => return (CallScheduledFunctionResult { reschedule: None }, false), + Ok(Some(params)) => params, + Err(err) => { + // All we can do here is log an error. + log::error!("could not determine scheduled function or its parameters: {err:#}"); + let reschedule = delete_scheduled_function_row(Some(tx), None); + return (CallScheduledFunctionResult { reschedule }, false); + } + }; - // We don't want a panic in the module host to affect the scheduler, as unlikely - // as it might be, so catch it so we can handle it "gracefully". Panics will - // print their message and backtrace when they occur, so we don't need to do - // anything with the error payload. - let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { - inst_common.call_reducer_with_tx(Some(tx), params, inst) - })); - let reschedule = delete_scheduled_function_row(None, None); - // Currently, we drop the return value from the function call. In the future, - // we might want to handle it somehow. - let trapped = match result { - Ok((_res, trapped)) => trapped, - Err(_err) => true, - }; - (CallScheduledFunctionResult { reschedule }, trapped) + // We've determined whether we have a reducer or procedure. + // The logic between them will now split, + // as for scheduled procedures, it's incorrect to retry them if execution aborts midway, + // so we must remove the schedule row before executing. + match params { + CallParams::Reducer(params) => { + // Patch the transaction context with ReducerContext so the commitlog + // records the reducer's name, caller, timestamp, and arguments. + // + // Background: Scheduled reducers start with Workload::Internal, but + // call_reducer_with_tx only sets ReducerContext when tx is None. + // Since we pass Some(tx), we must set it here. + let reducer_name = &*module_info.module_def.reducer_by_id(params.reducer_id).name; + tx.ctx = ExecutionContext::with_workload( + tx.ctx.database_identity(), + Workload::Reducer(ReducerContext { + name: reducer_name.into(), + caller_identity: params.caller_identity, + caller_connection_id: params.caller_connection_id, + timestamp: params.timestamp, + arg_bsatn: params.args.get_bsatn().clone(), + }), + ); + + // We don't want a panic in the module host to affect the scheduler, as unlikely + // as it might be, so catch it so we can handle it "gracefully". Panics will + // print their message and backtrace when they occur, so we don't need to do + // anything with the error payload. + let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { + inst_common.call_reducer_with_tx(Some(tx), params, inst) + })); + let reschedule = delete_scheduled_function_row(None, None); + // Currently, we drop the return value from the function call. In the future, + // we might want to handle it somehow. + let trapped = match result { + Ok((_res, trapped)) => trapped, + Err(_err) => true, + }; + Function::Reducer(CallScheduledFunctionResult { reschedule }, trapped) + } + CallParams::Procedure(params) => { + // Delete scheduled row. + let reschedule = delete_scheduled_function_row(Some(tx), Some((timestamp, instant))); + Function::Procedure { params, reschedule } + } } - CallParams::Procedure(params) => { - // Delete scheduled row. - let reschedule = delete_scheduled_function_row(Some(tx), Some((timestamp, instant))); + }; + // Below code is outside of the DB transaction scope because the + // compiler complains about holding mutable borrow across await point while calling a procedure, + // even though it has been already moved during `delete_scheduled_function_row` call. + match next_step { + Function::Reducer(result, trapped) => (result, trapped), + Function::Procedure { params, reschedule } => { // Execute the procedure. See above for commentary on `catch_unwind()`. let result = panic::AssertUnwindSafe(inst_common.call_procedure(params, inst)) .catch_unwind() .await; + // Currently, we drop the return value from the function call. In the future, // we might want to handle it somehow. let trapped = match result { Ok((_res, trapped)) => trapped, Err(_err) => true, }; + (CallScheduledFunctionResult { reschedule }, trapped) } } diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index de90c248ef7..e1534c539c4 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -394,8 +394,8 @@ impl WasmModuleInstance { .update_database(program, old_module_info, policy, &mut self.instance) } - pub fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult { - let (res, trapped) = self.call_reducer_with_tx(tx, params); + pub fn call_reducer(&mut self, params: CallReducerParams) -> ReducerCallResult { + let (res, trapped) = self.call_reducer_with_tx(None, params); self.trapped = trapped; res } diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 4326e2c6249..87de3ee4f6f 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -943,6 +943,7 @@ impl MutTx for Locking { timer, ctx, metrics, + _not_send: std::marker::PhantomData, } } diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 953bf44d05b..e4b417ba497 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -70,6 +70,7 @@ use spacetimedb_table::{ table_index::{IndexCannotSeekRange, IndexSeekRangeResult, TableIndex}, }; use std::{ + marker::PhantomData, sync::Arc, time::{Duration, Instant}, }; @@ -246,6 +247,8 @@ pub struct MutTxId { pub timer: Instant, pub ctx: ExecutionContext, pub metrics: ExecutionMetrics, + // Marks `MutTxId` as `!Send` by embedding a non-`Send` type. + pub(crate) _not_send: PhantomData>, } static_assert_size!(MutTxId, 432);