Start on a new db2 module that uses SeaORM

This commit is contained in:
Antonio Scandurra 2022-11-29 16:49:04 +01:00
parent ac24600a40
commit 11a39226e8
11 changed files with 765 additions and 1 deletions

316
crates/collab/src/db2.rs Normal file
View file

@ -0,0 +1,316 @@
mod project;
mod project_collaborator;
mod room;
mod room_participant;
mod worktree;
use crate::{Error, Result};
use anyhow::anyhow;
use collections::HashMap;
use dashmap::DashMap;
use futures::StreamExt;
use rpc::{proto, ConnectionId};
use sea_orm::ActiveValue;
use sea_orm::{
entity::prelude::*, ConnectOptions, DatabaseConnection, DatabaseTransaction, DbErr,
TransactionTrait,
};
use serde::{Deserialize, Serialize};
use std::ops::{Deref, DerefMut};
use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc};
use tokio::sync::{Mutex, OwnedMutexGuard};
pub struct Database {
pool: DatabaseConnection,
rooms: DashMap<RoomId, Arc<Mutex<()>>>,
#[cfg(test)]
background: Option<std::sync::Arc<gpui::executor::Background>>,
#[cfg(test)]
runtime: Option<tokio::runtime::Runtime>,
}
impl Database {
pub async fn new(url: &str, max_connections: u32) -> Result<Self> {
let mut options = ConnectOptions::new(url.into());
options.max_connections(max_connections);
Ok(Self {
pool: sea_orm::Database::connect(options).await?,
rooms: DashMap::with_capacity(16384),
#[cfg(test)]
background: None,
#[cfg(test)]
runtime: None,
})
}
pub async fn share_project(
&self,
room_id: RoomId,
connection_id: ConnectionId,
worktrees: &[proto::WorktreeMetadata],
) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
self.transact(|tx| async move {
let participant = room_participant::Entity::find()
.filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0))
.one(&tx)
.await?
.ok_or_else(|| anyhow!("could not find participant"))?;
if participant.room_id != room_id.0 {
return Err(anyhow!("shared project on unexpected room"))?;
}
let project = project::ActiveModel {
room_id: ActiveValue::set(participant.room_id),
host_user_id: ActiveValue::set(participant.user_id),
host_connection_id: ActiveValue::set(connection_id.0 as i32),
..Default::default()
}
.insert(&tx)
.await?;
worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
id: ActiveValue::set(worktree.id as i32),
project_id: ActiveValue::set(project.id),
abs_path: ActiveValue::set(worktree.abs_path.clone()),
root_name: ActiveValue::set(worktree.root_name.clone()),
visible: ActiveValue::set(worktree.visible),
scan_id: ActiveValue::set(0),
is_complete: ActiveValue::set(false),
}))
.exec(&tx)
.await?;
project_collaborator::ActiveModel {
project_id: ActiveValue::set(project.id),
connection_id: ActiveValue::set(connection_id.0 as i32),
user_id: ActiveValue::set(participant.user_id),
replica_id: ActiveValue::set(0),
is_host: ActiveValue::set(true),
..Default::default()
}
.insert(&tx)
.await?;
let room = self.get_room(room_id, &tx).await?;
self.commit_room_transaction(room_id, tx, (ProjectId(project.id), room))
.await
})
.await
}
async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
let db_room = room::Entity::find_by_id(room_id.0)
.one(tx)
.await?
.ok_or_else(|| anyhow!("could not find room"))?;
let mut db_participants = db_room
.find_related(room_participant::Entity)
.stream(tx)
.await?;
let mut participants = HashMap::default();
let mut pending_participants = Vec::new();
while let Some(db_participant) = db_participants.next().await {
let db_participant = db_participant?;
if let Some(answering_connection_id) = db_participant.answering_connection_id {
let location = match (
db_participant.location_kind,
db_participant.location_project_id,
) {
(Some(0), Some(project_id)) => {
Some(proto::participant_location::Variant::SharedProject(
proto::participant_location::SharedProject {
id: project_id as u64,
},
))
}
(Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
Default::default(),
)),
_ => Some(proto::participant_location::Variant::External(
Default::default(),
)),
};
participants.insert(
answering_connection_id,
proto::Participant {
user_id: db_participant.user_id as u64,
peer_id: answering_connection_id as u32,
projects: Default::default(),
location: Some(proto::ParticipantLocation { variant: location }),
},
);
} else {
pending_participants.push(proto::PendingParticipant {
user_id: db_participant.user_id as u64,
calling_user_id: db_participant.calling_user_id as u64,
initial_project_id: db_participant.initial_project_id.map(|id| id as u64),
});
}
}
let mut db_projects = db_room
.find_related(project::Entity)
.find_with_related(worktree::Entity)
.stream(tx)
.await?;
while let Some(row) = db_projects.next().await {
let (db_project, db_worktree) = row?;
if let Some(participant) = participants.get_mut(&db_project.host_connection_id) {
let project = if let Some(project) = participant
.projects
.iter_mut()
.find(|project| project.id as i32 == db_project.id)
{
project
} else {
participant.projects.push(proto::ParticipantProject {
id: db_project.id as u64,
worktree_root_names: Default::default(),
});
participant.projects.last_mut().unwrap()
};
if let Some(db_worktree) = db_worktree {
project.worktree_root_names.push(db_worktree.root_name);
}
}
}
Ok(proto::Room {
id: db_room.id as u64,
live_kit_room: db_room.live_kit_room,
participants: participants.into_values().collect(),
pending_participants,
})
}
async fn commit_room_transaction<T>(
&self,
room_id: RoomId,
tx: DatabaseTransaction,
data: T,
) -> Result<RoomGuard<T>> {
let lock = self.rooms.entry(room_id).or_default().clone();
let _guard = lock.lock_owned().await;
tx.commit().await?;
Ok(RoomGuard {
data,
_guard,
_not_send: PhantomData,
})
}
async fn transact<F, Fut, T>(&self, f: F) -> Result<T>
where
F: Send + Fn(DatabaseTransaction) -> Fut,
Fut: Send + Future<Output = Result<T>>,
{
let body = async {
loop {
let tx = self.pool.begin().await?;
match f(tx).await {
Ok(result) => return Ok(result),
Err(error) => match error {
Error::Database2(
DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
| DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
) if error
.as_database_error()
.and_then(|error| error.code())
.as_deref()
== Some("40001") =>
{
// Retry (don't break the loop)
}
error @ _ => return Err(error),
},
}
}
};
#[cfg(test)]
{
if let Some(background) = self.background.as_ref() {
background.simulate_random_delay().await;
}
self.runtime.as_ref().unwrap().block_on(body)
}
#[cfg(not(test))]
{
body.await
}
}
}
pub struct RoomGuard<T> {
data: T,
_guard: OwnedMutexGuard<()>,
_not_send: PhantomData<Rc<()>>,
}
impl<T> Deref for RoomGuard<T> {
type Target = T;
fn deref(&self) -> &T {
&self.data
}
}
impl<T> DerefMut for RoomGuard<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.data
}
}
macro_rules! id_type {
($name:ident) => {
#[derive(
Clone,
Copy,
Debug,
Default,
PartialEq,
Eq,
PartialOrd,
Ord,
Hash,
sqlx::Type,
Serialize,
Deserialize,
)]
#[sqlx(transparent)]
#[serde(transparent)]
pub struct $name(pub i32);
impl $name {
#[allow(unused)]
pub const MAX: Self = Self(i32::MAX);
#[allow(unused)]
pub fn from_proto(value: u64) -> Self {
Self(value as i32)
}
#[allow(unused)]
pub fn to_proto(self) -> u64 {
self.0 as u64
}
}
impl std::fmt::Display for $name {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
self.0.fmt(f)
}
}
};
}
id_type!(UserId);
id_type!(RoomId);
id_type!(RoomParticipantId);
id_type!(ProjectId);
id_type!(WorktreeId);

View file

@ -0,0 +1,37 @@
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "projects")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub room_id: i32,
pub host_user_id: i32,
pub host_connection_id: i32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::room::Entity",
from = "Column::RoomId",
to = "super::room::Column::Id"
)]
Room,
#[sea_orm(has_many = "super::worktree::Entity")]
Worktree,
}
impl Related<super::room::Entity> for Entity {
fn to() -> RelationDef {
Relation::Room.def()
}
}
impl Related<super::worktree::Entity> for Entity {
fn to() -> RelationDef {
Relation::Worktree.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

View file

@ -0,0 +1,18 @@
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "project_collaborators")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub project_id: i32,
pub connection_id: i32,
pub user_id: i32,
pub replica_id: i32,
pub is_host: bool,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View file

@ -0,0 +1,31 @@
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "room_participants")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub live_kit_room: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(has_many = "super::room_participant::Entity")]
RoomParticipant,
#[sea_orm(has_many = "super::project::Entity")]
Project,
}
impl Related<super::room_participant::Entity> for Entity {
fn to() -> RelationDef {
Relation::RoomParticipant.def()
}
}
impl Related<super::project::Entity> for Entity {
fn to() -> RelationDef {
Relation::Project.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

View file

@ -0,0 +1,34 @@
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "room_participants")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub room_id: i32,
pub user_id: i32,
pub answering_connection_id: Option<i32>,
pub location_kind: Option<i32>,
pub location_project_id: Option<i32>,
pub initial_project_id: Option<i32>,
pub calling_user_id: i32,
pub calling_connection_id: i32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::room::Entity",
from = "Column::RoomId",
to = "super::room::Column::Id"
)]
Room,
}
impl Related<super::room::Entity> for Entity {
fn to() -> RelationDef {
Relation::Room.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

View file

@ -0,0 +1,33 @@
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "worktrees")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
#[sea_orm(primary_key)]
pub project_id: i32,
pub abs_path: String,
pub root_name: String,
pub visible: bool,
pub scan_id: i64,
pub is_complete: bool,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::project::Entity",
from = "Column::ProjectId",
to = "super::project::Column::Id"
)]
Project,
}
impl Related<super::project::Entity> for Entity {
fn to() -> RelationDef {
Relation::Project.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

View file

@ -5,6 +5,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
pub enum Error {
Http(StatusCode, String),
Database(sqlx::Error),
Database2(sea_orm::error::DbErr),
Internal(anyhow::Error),
}
@ -20,6 +21,12 @@ impl From<sqlx::Error> for Error {
}
}
impl From<sea_orm::error::DbErr> for Error {
fn from(error: sea_orm::error::DbErr) -> Self {
Self::Database2(error)
}
}
impl From<axum::Error> for Error {
fn from(error: axum::Error) -> Self {
Self::Internal(error.into())
@ -45,6 +52,9 @@ impl IntoResponse for Error {
Error::Database(error) => {
(StatusCode::INTERNAL_SERVER_ERROR, format!("{}", &error)).into_response()
}
Error::Database2(error) => {
(StatusCode::INTERNAL_SERVER_ERROR, format!("{}", &error)).into_response()
}
Error::Internal(error) => {
(StatusCode::INTERNAL_SERVER_ERROR, format!("{}", &error)).into_response()
}
@ -57,6 +67,7 @@ impl std::fmt::Debug for Error {
match self {
Error::Http(code, message) => (code, message).fmt(f),
Error::Database(error) => error.fmt(f),
Error::Database2(error) => error.fmt(f),
Error::Internal(error) => error.fmt(f),
}
}
@ -67,6 +78,7 @@ impl std::fmt::Display for Error {
match self {
Error::Http(code, message) => write!(f, "{code}: {message}"),
Error::Database(error) => error.fmt(f),
Error::Database2(error) => error.fmt(f),
Error::Internal(error) => error.fmt(f),
}
}

View file

@ -1,6 +1,7 @@
mod api;
mod auth;
mod db;
mod db2;
mod env;
mod rpc;