diff --git a/crates/db/src/db.rs b/crates/db/src/db.rs index ea355a91a6..6de51cb0e6 100644 --- a/crates/db/src/db.rs +++ b/crates/db/src/db.rs @@ -2,6 +2,7 @@ pub mod kvp; // Re-export pub use anyhow; +use anyhow::Context; pub use indoc::indoc; pub use lazy_static; pub use smol; @@ -14,9 +15,13 @@ use sqlez_macros::sql; use std::fs::{create_dir_all, remove_dir_all}; use std::path::Path; use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::{SystemTime, UNIX_EPOCH}; +use util::{async_iife, ResultExt}; use util::channel::{ReleaseChannel, RELEASE_CHANNEL, RELEASE_CHANNEL_NAME}; use util::paths::DB_DIR; +// TODO: Add a savepoint to the thread safe connection initialization and migrations + const CONNECTION_INITIALIZE_QUERY: &'static str = sql!( PRAGMA synchronous=NORMAL; PRAGMA busy_timeout=1; @@ -28,31 +33,90 @@ const DB_INITIALIZE_QUERY: &'static str = sql!( PRAGMA journal_mode=WAL; ); +const FALLBACK_DB_NAME: &'static str = "FALLBACK_MEMORY_DB"; + lazy_static::lazy_static! { static ref DB_WIPED: AtomicBool = AtomicBool::new(false); } /// Open or create a database at the given directory path. pub async fn open_db() -> ThreadSafeConnection { - // Use 0 for now. Will implement incrementing and clearing of old db files soon TM - let current_db_dir = (*DB_DIR).join(Path::new(&format!("0-{}", *RELEASE_CHANNEL_NAME))); + let db_dir = (*DB_DIR).join(Path::new(&format!("0-{}", *RELEASE_CHANNEL_NAME))); + // If WIPE_DB, delete 0-{channel} if *RELEASE_CHANNEL == ReleaseChannel::Dev && std::env::var("WIPE_DB").is_ok() && !DB_WIPED.load(Ordering::Acquire) { - remove_dir_all(¤t_db_dir).ok(); - DB_WIPED.store(true, Ordering::Relaxed); + remove_dir_all(&db_dir).ok(); + DB_WIPED.store(true, Ordering::Release); } - create_dir_all(¤t_db_dir).expect("Should be able to create the database directory"); - let db_path = current_db_dir.join(Path::new("db.sqlite")); + let connection = async_iife!({ + // If no db folder, create one at 0-{channel} + create_dir_all(&db_dir).context("Could not create db directory")?; + let db_path = db_dir.join(Path::new("db.sqlite")); - ThreadSafeConnection::::builder(db_path.to_string_lossy().as_ref(), true) + // Try building a connection + if let Some(connection) = ThreadSafeConnection::::builder(db_path.to_string_lossy().as_ref(), true) + .with_db_initialization_query(DB_INITIALIZE_QUERY) + .with_connection_initialize_query(CONNECTION_INITIALIZE_QUERY) + .build() + .await + .log_err() { + return Ok(connection) + } + + let backup_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect( + "System clock is set before the unix timestamp, Zed does not support this region of spacetime" + ) + .as_millis(); + + // If failed, move 0-{channel} to {current unix timestamp}-{channel} + let backup_db_dir = (*DB_DIR).join(Path::new(&format!( + "{}{}", + backup_timestamp, + *RELEASE_CHANNEL_NAME + ))); + + std::fs::rename(&db_dir, backup_db_dir) + .context("Failed clean up corrupted database, panicking.")?; + + // TODO: Set a constant with the failed timestamp and error so we can notify the user + + // Create a new 0-{channel} + create_dir_all(&db_dir).context("Should be able to create the database directory")?; + let db_path = db_dir.join(Path::new("db.sqlite")); + + // Try again + ThreadSafeConnection::::builder(db_path.to_string_lossy().as_ref(), true) + .with_db_initialization_query(DB_INITIALIZE_QUERY) + .with_connection_initialize_query(CONNECTION_INITIALIZE_QUERY) + .build() + .await + }).await.log_err(); + + if let Some(connection) = connection { + return connection; + } + + // TODO: Set another constant so that we can escalate the notification + + // If still failed, create an in memory db with a known name + open_fallback_db().await +} + +async fn open_fallback_db() -> ThreadSafeConnection { + ThreadSafeConnection::::builder(FALLBACK_DB_NAME, false) .with_db_initialization_query(DB_INITIALIZE_QUERY) .with_connection_initialize_query(CONNECTION_INITIALIZE_QUERY) .build() .await + .expect( + "Fallback in memory database failed. Likely initialization queries or migrations have fundamental errors", + ) } #[cfg(any(test, feature = "test-support"))] @@ -66,6 +130,7 @@ pub async fn open_test_db(db_name: &str) -> ThreadSafeConnection .with_write_queue_constructor(locking_queue()) .build() .await + .unwrap() } /// Implements a basic DB wrapper for a given domain diff --git a/crates/sqlez/src/thread_safe_connection.rs b/crates/sqlez/src/thread_safe_connection.rs index 82697d1f90..4849e785b5 100644 --- a/crates/sqlez/src/thread_safe_connection.rs +++ b/crates/sqlez/src/thread_safe_connection.rs @@ -1,3 +1,4 @@ +use anyhow::Context; use futures::{channel::oneshot, Future, FutureExt}; use lazy_static::lazy_static; use parking_lot::{Mutex, RwLock}; @@ -72,7 +73,7 @@ impl ThreadSafeConnectionBuilder { self } - pub async fn build(self) -> ThreadSafeConnection { + pub async fn build(self) -> anyhow::Result> { self.connection .initialize_queues(self.write_queue_constructor); @@ -81,26 +82,33 @@ impl ThreadSafeConnectionBuilder { self.connection .write(move |connection| { if let Some(db_initialize_query) = db_initialize_query { - connection.exec(db_initialize_query).expect(&format!( - "Db initialize query failed to execute: {}", - db_initialize_query - ))() - .unwrap(); + connection.exec(db_initialize_query).with_context(|| { + format!( + "Db initialize query failed to execute: {}", + db_initialize_query + ) + })?()?; } - let mut failure_result = None; + // Retry failed migrations in case they were run in parallel from different + // processes. This gives a best attempt at migrating before bailing + let mut migration_result = + anyhow::Result::<()>::Err(anyhow::anyhow!("Migration never run")); + for _ in 0..MIGRATION_RETRIES { - failure_result = Some(M::migrate(connection)); - if failure_result.as_ref().unwrap().is_ok() { + migration_result = connection + .with_savepoint("thread_safe_multi_migration", || M::migrate(connection)); + + if migration_result.is_ok() { break; } } - failure_result.unwrap().expect("Migration failed"); + migration_result }) - .await; + .await?; - self.connection + Ok(self.connection) } } @@ -240,10 +248,6 @@ impl Clone for ThreadSafeConnection { } } -// TODO: -// 1. When migration or initialization fails, move the corrupted db to a holding place and create a new one -// 2. If the new db also fails, downgrade to a shared in memory db -// 3. In either case notify the user about what went wrong impl Deref for ThreadSafeConnection { type Target = Connection; @@ -265,7 +269,7 @@ pub fn locking_queue() -> WriteQueueConstructor { #[cfg(test)] mod test { use indoc::indoc; - use lazy_static::__Deref; + use std::ops::Deref; use std::thread; use crate::{domain::Domain, thread_safe_connection::ThreadSafeConnection}; @@ -295,7 +299,8 @@ mod test { PRAGMA foreign_keys=TRUE; PRAGMA case_sensitive_like=TRUE; "}); - let _ = smol::block_on(builder.build()).deref(); + + let _ = smol::block_on(builder.build()).unwrap().deref(); })); } @@ -341,6 +346,6 @@ mod test { ThreadSafeConnection::::builder("wild_zed_lost_failure", false) .with_connection_initialize_query("PRAGMA FOREIGN_KEYS=true"); - smol::block_on(builder.build()); + smol::block_on(builder.build()).unwrap(); } } diff --git a/crates/util/src/lib.rs b/crates/util/src/lib.rs index 78536f01d0..0e83bb5f19 100644 --- a/crates/util/src/lib.rs +++ b/crates/util/src/lib.rs @@ -223,6 +223,13 @@ macro_rules! iife { }; } +#[macro_export] +macro_rules! async_iife { + ($block:block) => { + (|| async move { $block })() + }; +} + #[cfg(test)] mod tests { use super::*;