Skip to content
34 changes: 34 additions & 0 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,42 @@ pub struct InstanceEnv {
procedure_last_tx_offset: Option<TransactionOffset>,
}

/// `InstanceEnv` needs to be `Send` because it is created on the host thread
/// and moved to module threads for execution (see [`ModuleHost::with_instance`]).
///
/// `TxSlot` must be `None` whenever `InstanceEnv` is moved across threads, which is
/// not enforced at compile time but seems to be upheld in practice.
///
/// In the future, we may push to use `InstanceEnv` only within a module thread,
/// but this still helps prevent a set of bugs that occurred due to `MutTxId` being `Send`,
/// such as:
/// https://github.com/clockworklabs/SpacetimeDB/pull/3938 and
/// https://github.com/clockworklabs/SpacetimeDB/pull/3968.
/// `InstanceEnv` needs to be `Send` because it is created on the host thread
/// and moved to module threads for execution (see [`ModuleHost::with_instance`]).
///
/// `TxSlot` must be `None` whenever `InstanceEnv` is moved across threads, which is
/// not enforced at compile time but seems to be upheld in practice.
///
/// In the future, we may push to use `InstanceEnv` only within a module thread,
/// but this still helps prevent a set of bugs that occurred due to `MutTxId` being `Send`,
/// such as:
/// https://github.com/clockworklabs/SpacetimeDB/pull/3938 and
/// https://github.com/clockworklabs/SpacetimeDB/pull/3968.
///
/// # Safety
///
/// `InstanceEnv` doesn't auto-derive `Send` because it may hold a `MutTxId`,
/// which we've manually made `!Send` to preserve logical invariants.
/// As described above, sending an `InstanceEnv` while it holds a `MutTxId` will violate logical invariants,
/// but this is not a safety concern.
/// Transferring a `MutTxId` between threads will never cause Undefined Behavior,
/// though it is likely to lead to deadlocks.
unsafe impl Send for InstanceEnv {}

#[derive(Clone, Default)]
pub struct TxSlot {
// Wrapped in Mutex for interior mutability.
inner: Arc<Mutex<Option<MutTxId>>>,
}

Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1504,9 +1504,9 @@ impl ModuleHost {
Ok(self
.call(
&reducer_def.name,
(None, call_reducer_params),
|(tx, p), inst| inst.call_reducer(tx, p),
|(_, p), inst| inst.call_reducer(p),
call_reducer_params,
|p, inst| inst.call_reducer(p),
|p, inst| inst.call_reducer(p),
)
.await?)
}
Expand Down
137 changes: 79 additions & 58 deletions crates/core/src/host/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,78 +394,99 @@ pub(super) async fn call_scheduled_function(
})
};

let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);

// Determine the call params.
// This also lets us know whether to call a reducer or procedure.
let params = call_params_for_queued_item(module_info, db, &tx, item);
let (timestamp, instant, params) = match params {
// If the function was already deleted, leave the `ScheduledFunction`
// in the database for when the module restarts.
Ok(None) => return (CallScheduledFunctionResult { reschedule: None }, false),
Ok(Some(params)) => params,
Err(err) => {
// All we can do here is log an error.
log::error!("could not determine scheduled function or its parameters: {err:#}");
let reschedule = delete_scheduled_function_row(Some(tx), None);
return (CallScheduledFunctionResult { reschedule }, false);
}
};
enum Function {
Reducer(CallScheduledFunctionResult, bool),
Procedure {
params: CallProcedureParams,
reschedule: Option<Reschedule>,
},
}

// We've determined whether we have a reducer or procedure.
// The logic between them will now split,
// as for scheduled procedures, it's incorrect to retry them if execution aborts midway,
// so we must remove the schedule row before executing.
match params {
CallParams::Reducer(params) => {
// Patch the transaction context with ReducerContext so the commitlog
// records the reducer's name, caller, timestamp, and arguments.
//
// Background: Scheduled reducers start with Workload::Internal, but
// call_reducer_with_tx only sets ReducerContext when tx is None.
// Since we pass Some(tx), we must set it here.
let reducer_name = &*module_info.module_def.reducer_by_id(params.reducer_id).name;
tx.ctx = ExecutionContext::with_workload(
tx.ctx.database_identity(),
Workload::Reducer(ReducerContext {
name: reducer_name.into(),
caller_identity: params.caller_identity,
caller_connection_id: params.caller_connection_id,
timestamp: params.timestamp,
arg_bsatn: params.args.get_bsatn().clone(),
}),
);
let next_step = {
let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);

// Determine the call params.
// This also lets us know whether to call a reducer or procedure.
let params = call_params_for_queued_item(module_info, db, &tx, item);
let (timestamp, instant, params) = match params {
// If the function was already deleted, leave the `ScheduledFunction`
// in the database for when the module restarts.
Ok(None) => return (CallScheduledFunctionResult { reschedule: None }, false),
Ok(Some(params)) => params,
Err(err) => {
// All we can do here is log an error.
log::error!("could not determine scheduled function or its parameters: {err:#}");
let reschedule = delete_scheduled_function_row(Some(tx), None);
return (CallScheduledFunctionResult { reschedule }, false);
}
};

// We don't want a panic in the module host to affect the scheduler, as unlikely
// as it might be, so catch it so we can handle it "gracefully". Panics will
// print their message and backtrace when they occur, so we don't need to do
// anything with the error payload.
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
inst_common.call_reducer_with_tx(Some(tx), params, inst)
}));
let reschedule = delete_scheduled_function_row(None, None);
// Currently, we drop the return value from the function call. In the future,
// we might want to handle it somehow.
let trapped = match result {
Ok((_res, trapped)) => trapped,
Err(_err) => true,
};
(CallScheduledFunctionResult { reschedule }, trapped)
// We've determined whether we have a reducer or procedure.
// The logic between them will now split,
// as for scheduled procedures, it's incorrect to retry them if execution aborts midway,
// so we must remove the schedule row before executing.
match params {
CallParams::Reducer(params) => {
// Patch the transaction context with ReducerContext so the commitlog
// records the reducer's name, caller, timestamp, and arguments.
//
// Background: Scheduled reducers start with Workload::Internal, but
// call_reducer_with_tx only sets ReducerContext when tx is None.
// Since we pass Some(tx), we must set it here.
let reducer_name = &*module_info.module_def.reducer_by_id(params.reducer_id).name;
tx.ctx = ExecutionContext::with_workload(
tx.ctx.database_identity(),
Workload::Reducer(ReducerContext {
name: reducer_name.into(),
caller_identity: params.caller_identity,
caller_connection_id: params.caller_connection_id,
timestamp: params.timestamp,
arg_bsatn: params.args.get_bsatn().clone(),
}),
);

// We don't want a panic in the module host to affect the scheduler, as unlikely
// as it might be, so catch it so we can handle it "gracefully". Panics will
// print their message and backtrace when they occur, so we don't need to do
// anything with the error payload.
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
inst_common.call_reducer_with_tx(Some(tx), params, inst)
}));
let reschedule = delete_scheduled_function_row(None, None);
// Currently, we drop the return value from the function call. In the future,
// we might want to handle it somehow.
let trapped = match result {
Ok((_res, trapped)) => trapped,
Err(_err) => true,
};
Function::Reducer(CallScheduledFunctionResult { reschedule }, trapped)
}
CallParams::Procedure(params) => {
// Delete scheduled row.
let reschedule = delete_scheduled_function_row(Some(tx), Some((timestamp, instant)));
Function::Procedure { params, reschedule }
}
}
CallParams::Procedure(params) => {
// Delete scheduled row.
let reschedule = delete_scheduled_function_row(Some(tx), Some((timestamp, instant)));
};

// Below code is outside of the DB transaction scope because the
// compiler complains about holding mutable borrow across await point while calling a procedure,
// even though it has been already moved during `delete_scheduled_function_row` call.
match next_step {
Function::Reducer(result, trapped) => (result, trapped),
Function::Procedure { params, reschedule } => {
// Execute the procedure. See above for commentary on `catch_unwind()`.
let result = panic::AssertUnwindSafe(inst_common.call_procedure(params, inst))
.catch_unwind()
.await;

// Currently, we drop the return value from the function call. In the future,
// we might want to handle it somehow.
let trapped = match result {
Ok((_res, trapped)) => trapped,
Err(_err) => true,
};

(CallScheduledFunctionResult { reschedule }, trapped)
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,8 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
.update_database(program, old_module_info, policy, &mut self.instance)
}

pub fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> ReducerCallResult {
let (res, trapped) = self.call_reducer_with_tx(tx, params);
pub fn call_reducer(&mut self, params: CallReducerParams) -> ReducerCallResult {
let (res, trapped) = self.call_reducer_with_tx(None, params);
self.trapped = trapped;
res
}
Expand Down
1 change: 1 addition & 0 deletions crates/datastore/src/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,7 @@ impl MutTx for Locking {
timer,
ctx,
metrics,
_not_send: std::marker::PhantomData,
}
}

Expand Down
3 changes: 3 additions & 0 deletions crates/datastore/src/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ use spacetimedb_table::{
table_index::{IndexCannotSeekRange, IndexSeekRangeResult, TableIndex},
};
use std::{
marker::PhantomData,
sync::Arc,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -246,6 +247,8 @@ pub struct MutTxId {
pub timer: Instant,
pub ctx: ExecutionContext,
pub metrics: ExecutionMetrics,
// Marks `MutTxId` as `!Send` by embedding a non-`Send` type.
pub(crate) _not_send: PhantomData<std::rc::Rc<()>>,
}

static_assert_size!(MutTxId, 432);
Expand Down
Loading