Dock persistence working!
Co-Authored-By: Mikayla Maki <mikayla@zed.dev>
This commit is contained in:
parent
c1f7902309
commit
d20d21c6a2
29 changed files with 783 additions and 443 deletions
|
@ -2,6 +2,7 @@ use std::{
|
|||
ffi::OsStr,
|
||||
os::unix::prelude::OsStrExt,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::Result;
|
||||
|
@ -118,6 +119,13 @@ impl Bind for &str {
|
|||
}
|
||||
}
|
||||
|
||||
impl Bind for Arc<str> {
|
||||
fn bind(&self, statement: &Statement, start_index: i32) -> Result<i32> {
|
||||
statement.bind_text(start_index, self.as_ref())?;
|
||||
Ok(start_index + 1)
|
||||
}
|
||||
}
|
||||
|
||||
impl Bind for String {
|
||||
fn bind(&self, statement: &Statement, start_index: i32) -> Result<i32> {
|
||||
statement.bind_text(start_index, self)?;
|
||||
|
@ -125,6 +133,13 @@ impl Bind for String {
|
|||
}
|
||||
}
|
||||
|
||||
impl Column for Arc<str> {
|
||||
fn column(statement: &mut Statement, start_index: i32) -> Result<(Self, i32)> {
|
||||
let result = statement.column_text(start_index)?;
|
||||
Ok((Arc::from(result), start_index + 1))
|
||||
}
|
||||
}
|
||||
|
||||
impl Column for String {
|
||||
fn column<'a>(statement: &mut Statement, start_index: i32) -> Result<(Self, i32)> {
|
||||
let result = statement.column_text(start_index)?;
|
||||
|
|
|
@ -54,10 +54,6 @@ impl Connection {
|
|||
self.persistent
|
||||
}
|
||||
|
||||
pub(crate) fn last_insert_id(&self) -> i64 {
|
||||
unsafe { sqlite3_last_insert_rowid(self.sqlite3) }
|
||||
}
|
||||
|
||||
pub fn backup_main(&self, destination: &Connection) -> Result<()> {
|
||||
unsafe {
|
||||
let backup = sqlite3_backup_init(
|
||||
|
@ -126,7 +122,7 @@ mod test {
|
|||
let text = "Some test text";
|
||||
|
||||
connection
|
||||
.insert_bound("INSERT INTO text (text) VALUES (?);")
|
||||
.exec_bound("INSERT INTO text (text) VALUES (?);")
|
||||
.unwrap()(text)
|
||||
.unwrap();
|
||||
|
||||
|
@ -155,7 +151,7 @@ mod test {
|
|||
let tuple2 = ("test2".to_string(), 32, vec![64, 32, 16, 8, 4, 2, 1, 0]);
|
||||
|
||||
let mut insert = connection
|
||||
.insert_bound::<(String, usize, Vec<u8>)>(
|
||||
.exec_bound::<(String, usize, Vec<u8>)>(
|
||||
"INSERT INTO test (text, integer, blob) VALUES (?, ?, ?)",
|
||||
)
|
||||
.unwrap();
|
||||
|
@ -185,7 +181,7 @@ mod test {
|
|||
.unwrap();
|
||||
|
||||
connection
|
||||
.insert_bound("INSERT INTO bools(t, f) VALUES (?, ?);")
|
||||
.exec_bound("INSERT INTO bools(t, f) VALUES (?, ?)")
|
||||
.unwrap()((true, false))
|
||||
.unwrap();
|
||||
|
||||
|
@ -210,7 +206,7 @@ mod test {
|
|||
.unwrap();
|
||||
let blob = vec![0, 1, 2, 4, 8, 16, 32, 64];
|
||||
connection1
|
||||
.insert_bound::<Vec<u8>>("INSERT INTO blobs (data) VALUES (?);")
|
||||
.exec_bound::<Vec<u8>>("INSERT INTO blobs (data) VALUES (?);")
|
||||
.unwrap()(blob.clone())
|
||||
.unwrap();
|
||||
|
||||
|
|
|
@ -1,39 +1,50 @@
|
|||
use crate::connection::Connection;
|
||||
|
||||
pub trait Domain {
|
||||
fn migrate(conn: &Connection) -> anyhow::Result<()>;
|
||||
fn name() -> &'static str;
|
||||
fn migrations() -> &'static [&'static str];
|
||||
}
|
||||
|
||||
impl<D1: Domain, D2: Domain> Domain for (D1, D2) {
|
||||
fn migrate(conn: &Connection) -> anyhow::Result<()> {
|
||||
D1::migrate(conn)?;
|
||||
D2::migrate(conn)
|
||||
pub trait Migrator {
|
||||
fn migrate(connection: &Connection) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
impl<D: Domain> Migrator for D {
|
||||
fn migrate(connection: &Connection) -> anyhow::Result<()> {
|
||||
connection.migrate(Self::name(), Self::migrations())
|
||||
}
|
||||
}
|
||||
|
||||
impl<D1: Domain, D2: Domain, D3: Domain> Domain for (D1, D2, D3) {
|
||||
fn migrate(conn: &Connection) -> anyhow::Result<()> {
|
||||
D1::migrate(conn)?;
|
||||
D2::migrate(conn)?;
|
||||
D3::migrate(conn)
|
||||
impl<D1: Domain, D2: Domain> Migrator for (D1, D2) {
|
||||
fn migrate(connection: &Connection) -> anyhow::Result<()> {
|
||||
D1::migrate(connection)?;
|
||||
D2::migrate(connection)
|
||||
}
|
||||
}
|
||||
|
||||
impl<D1: Domain, D2: Domain, D3: Domain, D4: Domain> Domain for (D1, D2, D3, D4) {
|
||||
fn migrate(conn: &Connection) -> anyhow::Result<()> {
|
||||
D1::migrate(conn)?;
|
||||
D2::migrate(conn)?;
|
||||
D3::migrate(conn)?;
|
||||
D4::migrate(conn)
|
||||
impl<D1: Domain, D2: Domain, D3: Domain> Migrator for (D1, D2, D3) {
|
||||
fn migrate(connection: &Connection) -> anyhow::Result<()> {
|
||||
D1::migrate(connection)?;
|
||||
D2::migrate(connection)?;
|
||||
D3::migrate(connection)
|
||||
}
|
||||
}
|
||||
|
||||
impl<D1: Domain, D2: Domain, D3: Domain, D4: Domain, D5: Domain> Domain for (D1, D2, D3, D4, D5) {
|
||||
fn migrate(conn: &Connection) -> anyhow::Result<()> {
|
||||
D1::migrate(conn)?;
|
||||
D2::migrate(conn)?;
|
||||
D3::migrate(conn)?;
|
||||
D4::migrate(conn)?;
|
||||
D5::migrate(conn)
|
||||
impl<D1: Domain, D2: Domain, D3: Domain, D4: Domain> Migrator for (D1, D2, D3, D4) {
|
||||
fn migrate(connection: &Connection) -> anyhow::Result<()> {
|
||||
D1::migrate(connection)?;
|
||||
D2::migrate(connection)?;
|
||||
D3::migrate(connection)?;
|
||||
D4::migrate(connection)
|
||||
}
|
||||
}
|
||||
|
||||
impl<D1: Domain, D2: Domain, D3: Domain, D4: Domain, D5: Domain> Migrator for (D1, D2, D3, D4, D5) {
|
||||
fn migrate(connection: &Connection) -> anyhow::Result<()> {
|
||||
D1::migrate(connection)?;
|
||||
D2::migrate(connection)?;
|
||||
D3::migrate(connection)?;
|
||||
D4::migrate(connection)?;
|
||||
D5::migrate(connection)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,53 +9,27 @@ use indoc::{formatdoc, indoc};
|
|||
|
||||
use crate::connection::Connection;
|
||||
|
||||
const MIGRATIONS_MIGRATION: Migration = Migration::new(
|
||||
"migrations",
|
||||
// The migrations migration must be infallable because it runs to completion
|
||||
// with every call to migration run and is run unchecked.
|
||||
&[indoc! {"
|
||||
CREATE TABLE IF NOT EXISTS migrations (
|
||||
domain TEXT,
|
||||
step INTEGER,
|
||||
migration TEXT
|
||||
)
|
||||
"}],
|
||||
);
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Migration {
|
||||
domain: &'static str,
|
||||
migrations: &'static [&'static str],
|
||||
}
|
||||
|
||||
impl Migration {
|
||||
pub const fn new(domain: &'static str, migrations: &'static [&'static str]) -> Self {
|
||||
Self { domain, migrations }
|
||||
}
|
||||
|
||||
fn run_unchecked(&self, connection: &Connection) -> Result<()> {
|
||||
for migration in self.migrations {
|
||||
connection.exec(migration)?()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn run(&self, connection: &Connection) -> Result<()> {
|
||||
impl Connection {
|
||||
pub fn migrate(&self, domain: &'static str, migrations: &[&'static str]) -> Result<()> {
|
||||
// Setup the migrations table unconditionally
|
||||
MIGRATIONS_MIGRATION.run_unchecked(connection)?;
|
||||
self.exec(indoc! {"
|
||||
CREATE TABLE IF NOT EXISTS migrations (
|
||||
domain TEXT,
|
||||
step INTEGER,
|
||||
migration TEXT
|
||||
)"})?()?;
|
||||
|
||||
let completed_migrations =
|
||||
connection.select_bound::<&str, (String, usize, String)>(indoc! {"
|
||||
self.select_bound::<&str, (String, usize, String)>(indoc! {"
|
||||
SELECT domain, step, migration FROM migrations
|
||||
WHERE domain = ?
|
||||
ORDER BY step
|
||||
"})?(self.domain)?;
|
||||
"})?(domain)?;
|
||||
|
||||
let mut store_completed_migration = connection
|
||||
.insert_bound("INSERT INTO migrations (domain, step, migration) VALUES (?, ?, ?)")?;
|
||||
let mut store_completed_migration =
|
||||
self.exec_bound("INSERT INTO migrations (domain, step, migration) VALUES (?, ?, ?)")?;
|
||||
|
||||
for (index, migration) in self.migrations.iter().enumerate() {
|
||||
for (index, migration) in migrations.iter().enumerate() {
|
||||
if let Some((_, _, completed_migration)) = completed_migrations.get(index) {
|
||||
if completed_migration != migration {
|
||||
return Err(anyhow!(formatdoc! {"
|
||||
|
@ -65,15 +39,15 @@ impl Migration {
|
|||
{}
|
||||
|
||||
Proposed migration:
|
||||
{}", self.domain, index, completed_migration, migration}));
|
||||
{}", domain, index, completed_migration, migration}));
|
||||
} else {
|
||||
// Migration already run. Continue
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
connection.exec(migration)?()?;
|
||||
store_completed_migration((self.domain, index, *migration))?;
|
||||
self.exec(migration)?()?;
|
||||
store_completed_migration((domain, index, *migration))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -84,22 +58,23 @@ impl Migration {
|
|||
mod test {
|
||||
use indoc::indoc;
|
||||
|
||||
use crate::{connection::Connection, migrations::Migration};
|
||||
use crate::connection::Connection;
|
||||
|
||||
#[test]
|
||||
fn test_migrations_are_added_to_table() {
|
||||
let connection = Connection::open_memory("migrations_are_added_to_table");
|
||||
|
||||
// Create first migration with a single step and run it
|
||||
let mut migration = Migration::new(
|
||||
"test",
|
||||
&[indoc! {"
|
||||
CREATE TABLE test1 (
|
||||
a TEXT,
|
||||
b TEXT
|
||||
)"}],
|
||||
);
|
||||
migration.run(&connection).unwrap();
|
||||
connection
|
||||
.migrate(
|
||||
"test",
|
||||
&[indoc! {"
|
||||
CREATE TABLE test1 (
|
||||
a TEXT,
|
||||
b TEXT
|
||||
)"}],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Verify it got added to the migrations table
|
||||
assert_eq!(
|
||||
|
@ -107,23 +82,31 @@ mod test {
|
|||
.select::<String>("SELECT (migration) FROM migrations")
|
||||
.unwrap()()
|
||||
.unwrap()[..],
|
||||
migration.migrations
|
||||
);
|
||||
|
||||
// Add another step to the migration and run it again
|
||||
migration.migrations = &[
|
||||
indoc! {"
|
||||
&[indoc! {"
|
||||
CREATE TABLE test1 (
|
||||
a TEXT,
|
||||
b TEXT
|
||||
)"},
|
||||
indoc! {"
|
||||
CREATE TABLE test2 (
|
||||
c TEXT,
|
||||
d TEXT
|
||||
)"},
|
||||
];
|
||||
migration.run(&connection).unwrap();
|
||||
)"}],
|
||||
);
|
||||
|
||||
// Add another step to the migration and run it again
|
||||
connection
|
||||
.migrate(
|
||||
"test",
|
||||
&[
|
||||
indoc! {"
|
||||
CREATE TABLE test1 (
|
||||
a TEXT,
|
||||
b TEXT
|
||||
)"},
|
||||
indoc! {"
|
||||
CREATE TABLE test2 (
|
||||
c TEXT,
|
||||
d TEXT
|
||||
)"},
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Verify it is also added to the migrations table
|
||||
assert_eq!(
|
||||
|
@ -131,7 +114,18 @@ mod test {
|
|||
.select::<String>("SELECT (migration) FROM migrations")
|
||||
.unwrap()()
|
||||
.unwrap()[..],
|
||||
migration.migrations
|
||||
&[
|
||||
indoc! {"
|
||||
CREATE TABLE test1 (
|
||||
a TEXT,
|
||||
b TEXT
|
||||
)"},
|
||||
indoc! {"
|
||||
CREATE TABLE test2 (
|
||||
c TEXT,
|
||||
d TEXT
|
||||
)"},
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -150,7 +144,7 @@ mod test {
|
|||
.unwrap();
|
||||
|
||||
let mut store_completed_migration = connection
|
||||
.insert_bound::<(&str, usize, String)>(indoc! {"
|
||||
.exec_bound::<(&str, usize, String)>(indoc! {"
|
||||
INSERT INTO migrations (domain, step, migration)
|
||||
VALUES (?, ?, ?)"})
|
||||
.unwrap();
|
||||
|
@ -171,8 +165,7 @@ mod test {
|
|||
fn migrations_dont_rerun() {
|
||||
let connection = Connection::open_memory("migrations_dont_rerun");
|
||||
|
||||
// Create migration which clears a table
|
||||
let migration = Migration::new("test", &["DELETE FROM test_table"]);
|
||||
// Create migration which clears a tabl
|
||||
|
||||
// Manually create the table for that migration with a row
|
||||
connection
|
||||
|
@ -197,7 +190,9 @@ mod test {
|
|||
);
|
||||
|
||||
// Run the migration verifying that the row got dropped
|
||||
migration.run(&connection).unwrap();
|
||||
connection
|
||||
.migrate("test", &["DELETE FROM test_table"])
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
connection
|
||||
.select_row::<usize>("SELECT * FROM test_table")
|
||||
|
@ -213,7 +208,9 @@ mod test {
|
|||
.unwrap();
|
||||
|
||||
// Run the same migration again and verify that the table was left unchanged
|
||||
migration.run(&connection).unwrap();
|
||||
connection
|
||||
.migrate("test", &["DELETE FROM test_table"])
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
connection
|
||||
.select_row::<usize>("SELECT * FROM test_table")
|
||||
|
@ -228,22 +225,22 @@ mod test {
|
|||
let connection = Connection::open_memory("changed_migration_fails");
|
||||
|
||||
// Create a migration with two steps and run it
|
||||
Migration::new(
|
||||
"test migration",
|
||||
&[
|
||||
indoc! {"
|
||||
connection
|
||||
.migrate(
|
||||
"test migration",
|
||||
&[
|
||||
indoc! {"
|
||||
CREATE TABLE test (
|
||||
col INTEGER
|
||||
)"},
|
||||
indoc! {"
|
||||
INSERT INTO test (col) VALUES (1)"},
|
||||
],
|
||||
)
|
||||
.run(&connection)
|
||||
.unwrap();
|
||||
indoc! {"
|
||||
INSERT INTO test (col) VALUES (1)"},
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Create another migration with the same domain but different steps
|
||||
let second_migration_result = Migration::new(
|
||||
let second_migration_result = connection.migrate(
|
||||
"test migration",
|
||||
&[
|
||||
indoc! {"
|
||||
|
@ -253,8 +250,7 @@ mod test {
|
|||
indoc! {"
|
||||
INSERT INTO test (color) VALUES (1)"},
|
||||
],
|
||||
)
|
||||
.run(&connection);
|
||||
);
|
||||
|
||||
// Verify new migration returns error when run
|
||||
assert!(second_migration_result.is_err())
|
||||
|
|
|
@ -256,11 +256,6 @@ impl<'a> Statement<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn insert(&mut self) -> Result<i64> {
|
||||
self.exec()?;
|
||||
Ok(self.connection.last_insert_id())
|
||||
}
|
||||
|
||||
pub fn exec(&mut self) -> Result<()> {
|
||||
fn logic(this: &mut Statement) -> Result<()> {
|
||||
while this.step()? == StepResult::Row {}
|
||||
|
|
|
@ -3,20 +3,23 @@ use std::{marker::PhantomData, ops::Deref, sync::Arc};
|
|||
use connection::Connection;
|
||||
use thread_local::ThreadLocal;
|
||||
|
||||
use crate::{connection, domain::Domain};
|
||||
use crate::{
|
||||
connection,
|
||||
domain::{Domain, Migrator},
|
||||
};
|
||||
|
||||
pub struct ThreadSafeConnection<D: Domain> {
|
||||
pub struct ThreadSafeConnection<M: Migrator> {
|
||||
uri: Arc<str>,
|
||||
persistent: bool,
|
||||
initialize_query: Option<&'static str>,
|
||||
connection: Arc<ThreadLocal<Connection>>,
|
||||
_pd: PhantomData<D>,
|
||||
_pd: PhantomData<M>,
|
||||
}
|
||||
|
||||
unsafe impl<T: Domain> Send for ThreadSafeConnection<T> {}
|
||||
unsafe impl<T: Domain> Sync for ThreadSafeConnection<T> {}
|
||||
unsafe impl<T: Migrator> Send for ThreadSafeConnection<T> {}
|
||||
unsafe impl<T: Migrator> Sync for ThreadSafeConnection<T> {}
|
||||
|
||||
impl<D: Domain> ThreadSafeConnection<D> {
|
||||
impl<M: Migrator> ThreadSafeConnection<M> {
|
||||
pub fn new(uri: &str, persistent: bool) -> Self {
|
||||
Self {
|
||||
uri: Arc::from(uri),
|
||||
|
@ -72,7 +75,11 @@ impl<D: Domain> Clone for ThreadSafeConnection<D> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<D: Domain> Deref for ThreadSafeConnection<D> {
|
||||
// TODO:
|
||||
// 1. When migration or initialization fails, move the corrupted db to a holding place and create a new one
|
||||
// 2. If the new db also fails, downgrade to a shared in memory db
|
||||
// 3. In either case notify the user about what went wrong
|
||||
impl<M: Migrator> Deref for ThreadSafeConnection<M> {
|
||||
type Target = Connection;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
|
@ -91,7 +98,7 @@ impl<D: Domain> Deref for ThreadSafeConnection<D> {
|
|||
.unwrap();
|
||||
}
|
||||
|
||||
D::migrate(&connection).expect("Migrations failed");
|
||||
M::migrate(&connection).expect("Migrations failed");
|
||||
|
||||
connection
|
||||
})
|
||||
|
|
|
@ -20,19 +20,6 @@ impl Connection {
|
|||
Ok(move |bindings| statement.with_bindings(bindings)?.exec())
|
||||
}
|
||||
|
||||
pub fn insert<'a>(&'a self, query: &str) -> Result<impl 'a + FnMut() -> Result<i64>> {
|
||||
let mut statement = Statement::prepare(&self, query)?;
|
||||
Ok(move || statement.insert())
|
||||
}
|
||||
|
||||
pub fn insert_bound<'a, B: Bind>(
|
||||
&'a self,
|
||||
query: &str,
|
||||
) -> Result<impl 'a + FnMut(B) -> Result<i64>> {
|
||||
let mut statement = Statement::prepare(&self, query)?;
|
||||
Ok(move |bindings| statement.with_bindings(bindings)?.insert())
|
||||
}
|
||||
|
||||
pub fn select<'a, C: Column>(
|
||||
&'a self,
|
||||
query: &str,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue