diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index 0aaf79fb7c9..e3a8afa34ac 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -2,6 +2,7 @@ use std::{ io, num::NonZeroU16, panic, + path::PathBuf, sync::{ atomic::{AtomicU64, Ordering::Relaxed}, Arc, @@ -110,8 +111,7 @@ impl Local { // We could just place a lock on the commitlog directory, // yet for backwards-compatibility, we keep using the `db.lock` file. - let lock_path = replica_dir.0.join("db.lock"); - let lockfile = LockedFile::lock(lock_path)?; + let lock = Lock::create(replica_dir.0.join("db.lock"))?; let clog = Arc::new(Commitlog::open( replica_dir.commit_log(), @@ -134,7 +134,7 @@ impl Local { sync_interval: opts.sync_interval, max_records_in_commit: opts.commitlog.max_records_in_commit, - lockfile, + lock, } .run(txdata_rx, shutdown_rx), ) @@ -194,7 +194,7 @@ struct Actor { max_records_in_commit: NonZeroU16, #[allow(unused)] - lockfile: LockedFile, + lock: Lock, } impl Actor { @@ -206,9 +206,6 @@ impl Actor { ) { info!("starting durability actor"); - // Always notify waiters (i.e. [Close] futures) when the actor exits. - let done = scopeguard::guard(Arc::new(Notify::new()), |done| done.notify_waiters()); - let mut sync_interval = interval(self.sync_interval); sync_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); // `flush_and_sync` when the loop exits without panicking, @@ -228,7 +225,7 @@ impl Actor { Some(reply) = shutdown_rx.recv() => { txdata_rx.close(); - let _ = reply.send(done.clone().notified_owned()); + let _ = reply.send(self.lock.notified()); }, _ = sync_interval.tick() => { @@ -317,6 +314,34 @@ impl Actor { } } +struct Lock { + file: Option, + notify_on_drop: Arc, +} + +impl Lock { + pub fn create(path: PathBuf) -> Result { + let file = LockedFile::lock(path).map(Some)?; + let notify_on_drop = Arc::new(Notify::new()); + + Ok(Self { file, notify_on_drop }) + } + + pub fn notified(&self) -> OwnedNotified { + self.notify_on_drop.clone().notified_owned() + } +} + +impl Drop for Lock { + fn drop(&mut self) { + // Ensure the file lock is dropped before notifying. + if let Some(file) = self.file.take() { + drop(file); + } + self.notify_on_drop.notify_waiters(); + } +} + /// Handle an error flushing the commitlog. /// /// Panics if the error indicates that the log may be permanently unwritable.