Write to and read from the database in a transactional way

Co-Authored-By: Kyle Caverly <kyle@zed.dev>
This commit is contained in:
Antonio Scandurra 2023-08-31 16:59:54 +02:00
parent 35440be98e
commit c763e728d1

View file

@ -58,7 +58,8 @@ impl FromSql for Sha1 {
#[derive(Clone)] #[derive(Clone)]
pub struct VectorDatabase { pub struct VectorDatabase {
path: Arc<Path>, path: Arc<Path>,
transactions: smol::channel::Sender<Box<dyn 'static + Send + FnOnce(&rusqlite::Connection)>>, transactions:
smol::channel::Sender<Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>>,
} }
impl VectorDatabase { impl VectorDatabase {
@ -71,15 +72,16 @@ impl VectorDatabase {
fs.create_dir(db_directory).await?; fs.create_dir(db_directory).await?;
} }
let (transactions_tx, transactions_rx) = let (transactions_tx, transactions_rx) = smol::channel::unbounded::<
smol::channel::unbounded::<Box<dyn 'static + Send + FnOnce(&rusqlite::Connection)>>(); Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>,
>();
executor executor
.spawn({ .spawn({
let path = path.clone(); let path = path.clone();
async move { async move {
let connection = rusqlite::Connection::open(&path)?; let mut connection = rusqlite::Connection::open(&path)?;
while let Ok(transaction) = transactions_rx.recv().await { while let Ok(transaction) = transactions_rx.recv().await {
transaction(&connection); transaction(&mut connection);
} }
anyhow::Ok(()) anyhow::Ok(())
@ -99,9 +101,9 @@ impl VectorDatabase {
&self.path &self.path
} }
fn transact<F, T>(&self, transaction: F) -> impl Future<Output = Result<T>> fn transact<F, T>(&self, f: F) -> impl Future<Output = Result<T>>
where where
F: 'static + Send + FnOnce(&rusqlite::Connection) -> Result<T>, F: 'static + Send + FnOnce(&rusqlite::Transaction) -> Result<T>,
T: 'static + Send, T: 'static + Send,
{ {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
@ -109,7 +111,14 @@ impl VectorDatabase {
async move { async move {
if transactions if transactions
.send(Box::new(|connection| { .send(Box::new(|connection| {
let result = transaction(connection); let result = connection
.transaction()
.map_err(|err| anyhow!(err))
.and_then(|transaction| {
let result = f(&transaction)?;
transaction.commit()?;
Ok(result)
});
let _ = tx.send(result); let _ = tx.send(result);
})) }))
.await .await