collab: Setup database for LLM service (#15882)

This PR puts the initial infrastructure for the LLM service's database
in place.

The LLM service will be using a separate Postgres database, with its own
set of migrations.

Currently we only connect to the database in development, as we don't
yet have the database setup for the staging/production environments.

Release Notes:

- N/A
This commit is contained in:
Marshall Bowers 2024-08-06 17:18:08 -04:00 committed by GitHub
parent a64906779b
commit 7f6d0919c9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 627 additions and 74 deletions

118
crates/collab/src/llm/db.rs Normal file
View file

@ -0,0 +1,118 @@
mod ids;
mod queries;
mod tables;
#[cfg(test)]
mod tests;
pub use ids::*;
pub use tables::*;
#[cfg(test)]
pub use tests::TestLlmDb;
use std::future::Future;
use std::sync::Arc;
use anyhow::anyhow;
use sea_orm::prelude::*;
pub use sea_orm::ConnectOptions;
use sea_orm::{
ActiveValue, DatabaseConnection, DatabaseTransaction, IsolationLevel, TransactionTrait,
};
use crate::db::TransactionHandle;
use crate::executor::Executor;
use crate::Result;
/// The database for the LLM service.
pub struct LlmDatabase {
options: ConnectOptions,
pool: DatabaseConnection,
#[allow(unused)]
executor: Executor,
#[cfg(test)]
runtime: Option<tokio::runtime::Runtime>,
}
impl LlmDatabase {
/// Connects to the database with the given options
pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
sqlx::any::install_default_drivers();
Ok(Self {
options: options.clone(),
pool: sea_orm::Database::connect(options).await?,
executor,
#[cfg(test)]
runtime: None,
})
}
pub fn options(&self) -> &ConnectOptions {
&self.options
}
pub async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
where
F: Send + Fn(TransactionHandle) -> Fut,
Fut: Send + Future<Output = Result<T>>,
{
let body = async {
let (tx, result) = self.with_transaction(&f).await?;
match result {
Ok(result) => match tx.commit().await.map_err(Into::into) {
Ok(()) => return Ok(result),
Err(error) => {
return Err(error);
}
},
Err(error) => {
tx.rollback().await?;
return Err(error);
}
}
};
self.run(body).await
}
async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
where
F: Send + Fn(TransactionHandle) -> Fut,
Fut: Send + Future<Output = Result<T>>,
{
let tx = self
.pool
.begin_with_config(Some(IsolationLevel::ReadCommitted), None)
.await?;
let mut tx = Arc::new(Some(tx));
let result = f(TransactionHandle(tx.clone())).await;
let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
return Err(anyhow!(
"couldn't complete transaction because it's still in use"
))?;
};
Ok((tx, result))
}
async fn run<F, T>(&self, future: F) -> Result<T>
where
F: Future<Output = Result<T>>,
{
#[cfg(test)]
{
if let Executor::Deterministic(executor) = &self.executor {
executor.simulate_random_delay().await;
}
self.runtime.as_ref().unwrap().block_on(future)
}
#[cfg(not(test))]
{
future.await
}
}
}