From db1d93576f8aea0364e52ddf1abdf92f74ea0dc1 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 1 Dec 2022 15:13:34 +0100 Subject: [PATCH] Go back to a compiling state, panicking on unimplemented db methods --- crates/collab/src/db.rs | 1267 +++++++++++++++++- crates/collab/src/db/project.rs | 12 + crates/collab/src/db/project_collaborator.rs | 4 +- crates/collab/src/db/user.rs | 8 + 4 files changed, 1240 insertions(+), 51 deletions(-) diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index d89d041f2a..c5f2f98d0b 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -12,7 +12,7 @@ mod worktree; use crate::{Error, Result}; use anyhow::anyhow; -use collections::HashMap; +use collections::{BTreeMap, HashMap, HashSet}; pub use contact::Contact; use dashmap::DashMap; use futures::StreamExt; @@ -255,6 +255,19 @@ impl Database { .await } + pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> { + self.transact(|tx| async move { + user::Entity::update_many() + .filter(user::Column::Id.eq(id)) + .col_expr(user::Column::ConnectedOnce, connected_once.into()) + .exec(&tx) + .await?; + tx.commit().await?; + Ok(()) + }) + .await + } + pub async fn destroy_user(&self, id: UserId) -> Result<()> { self.transact(|tx| async move { access_token::Entity::delete_many() @@ -360,6 +373,17 @@ impl Database { .await } + pub async fn is_user_busy(&self, user_id: UserId) -> Result { + self.transact(|tx| async move { + let participant = room_participant::Entity::find() + .filter(room_participant::Column::UserId.eq(user_id)) + .one(&tx) + .await?; + Ok(participant.is_some()) + }) + .await + } + pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result { self.transact(|tx| async move { let (id_a, id_b) = if user_id_1 < user_id_2 { @@ -896,63 +920,447 @@ impl Database { .await } - // projects + // rooms - pub async fn share_project( + pub async fn incoming_call_for_user( + &self, + user_id: UserId, + ) -> Result> { + self.transact(|tx| async move { + let pending_participant = room_participant::Entity::find() + .filter( + room_participant::Column::UserId + .eq(user_id) + .and(room_participant::Column::AnsweringConnectionId.is_null()), + ) + .one(&tx) + .await?; + + if let Some(pending_participant) = pending_participant { + let room = self.get_room(pending_participant.room_id, &tx).await?; + Ok(Self::build_incoming_call(&room, user_id)) + } else { + Ok(None) + } + }) + .await + } + + pub async fn create_room( + &self, + user_id: UserId, + connection_id: ConnectionId, + live_kit_room: &str, + ) -> Result> { + self.transact(|tx| async move { + todo!() + // let room_id = sqlx::query_scalar( + // " + // INSERT INTO rooms (live_kit_room) + // VALUES ($1) + // RETURNING id + // ", + // ) + // .bind(&live_kit_room) + // .fetch_one(&mut tx) + // .await + // .map(RoomId)?; + + // sqlx::query( + // " + // INSERT INTO room_participants (room_id, user_id, answering_connection_id, calling_user_id, calling_connection_id) + // VALUES ($1, $2, $3, $4, $5) + // ", + // ) + // .bind(room_id) + // .bind(user_id) + // .bind(connection_id.0 as i32) + // .bind(user_id) + // .bind(connection_id.0 as i32) + // .execute(&mut tx) + // .await?; + + // let room = self.get_room(room_id, &mut tx).await?; + // self.commit_room_transaction(room_id, tx, room).await + }) + .await + } + + pub async fn call( + &self, + room_id: RoomId, + calling_user_id: UserId, + calling_connection_id: ConnectionId, + called_user_id: UserId, + initial_project_id: Option, + ) -> Result> { + self.transact(|tx| async move { + todo!() + // sqlx::query( + // " + // INSERT INTO room_participants ( + // room_id, + // user_id, + // calling_user_id, + // calling_connection_id, + // initial_project_id + // ) + // VALUES ($1, $2, $3, $4, $5) + // ", + // ) + // .bind(room_id) + // .bind(called_user_id) + // .bind(calling_user_id) + // .bind(calling_connection_id.0 as i32) + // .bind(initial_project_id) + // .execute(&mut tx) + // .await?; + + // let room = self.get_room(room_id, &mut tx).await?; + // let incoming_call = Self::build_incoming_call(&room, called_user_id) + // .ok_or_else(|| anyhow!("failed to build incoming call"))?; + // self.commit_room_transaction(room_id, tx, (room, incoming_call)) + // .await + }) + .await + } + + pub async fn call_failed( + &self, + room_id: RoomId, + called_user_id: UserId, + ) -> Result> { + self.transact(|tx| async move { + todo!() + // sqlx::query( + // " + // DELETE FROM room_participants + // WHERE room_id = $1 AND user_id = $2 + // ", + // ) + // .bind(room_id) + // .bind(called_user_id) + // .execute(&mut tx) + // .await?; + + // let room = self.get_room(room_id, &mut tx).await?; + // self.commit_room_transaction(room_id, tx, room).await + }) + .await + } + + pub async fn decline_call( + &self, + expected_room_id: Option, + user_id: UserId, + ) -> Result> { + self.transact(|tx| async move { + todo!() + // let room_id = sqlx::query_scalar( + // " + // DELETE FROM room_participants + // WHERE user_id = $1 AND answering_connection_id IS NULL + // RETURNING room_id + // ", + // ) + // .bind(user_id) + // .fetch_one(&mut tx) + // .await?; + // if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) { + // return Err(anyhow!("declining call on unexpected room"))?; + // } + + // let room = self.get_room(room_id, &mut tx).await?; + // self.commit_room_transaction(room_id, tx, room).await + }) + .await + } + + pub async fn cancel_call( + &self, + expected_room_id: Option, + calling_connection_id: ConnectionId, + called_user_id: UserId, + ) -> Result> { + self.transact(|tx| async move { + todo!() + // let room_id = sqlx::query_scalar( + // " + // DELETE FROM room_participants + // WHERE user_id = $1 AND calling_connection_id = $2 AND answering_connection_id IS NULL + // RETURNING room_id + // ", + // ) + // .bind(called_user_id) + // .bind(calling_connection_id.0 as i32) + // .fetch_one(&mut tx) + // .await?; + // if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) { + // return Err(anyhow!("canceling call on unexpected room"))?; + // } + + // let room = self.get_room(room_id, &mut tx).await?; + // self.commit_room_transaction(room_id, tx, room).await + }) + .await + } + + pub async fn join_room( + &self, + room_id: RoomId, + user_id: UserId, + connection_id: ConnectionId, + ) -> Result> { + self.transact(|tx| async move { + todo!() + // sqlx::query( + // " + // UPDATE room_participants + // SET answering_connection_id = $1 + // WHERE room_id = $2 AND user_id = $3 + // RETURNING 1 + // ", + // ) + // .bind(connection_id.0 as i32) + // .bind(room_id) + // .bind(user_id) + // .fetch_one(&mut tx) + // .await?; + + // let room = self.get_room(room_id, &mut tx).await?; + // self.commit_room_transaction(room_id, tx, room).await + }) + .await + } + + pub async fn leave_room( + &self, + connection_id: ConnectionId, + ) -> Result>> { + self.transact(|tx| async move { + todo!() + // // Leave room. + // let room_id = sqlx::query_scalar::<_, RoomId>( + // " + // DELETE FROM room_participants + // WHERE answering_connection_id = $1 + // RETURNING room_id + // ", + // ) + // .bind(connection_id.0 as i32) + // .fetch_optional(&mut tx) + // .await?; + + // if let Some(room_id) = room_id { + // // Cancel pending calls initiated by the leaving user. + // let canceled_calls_to_user_ids: Vec = sqlx::query_scalar( + // " + // DELETE FROM room_participants + // WHERE calling_connection_id = $1 AND answering_connection_id IS NULL + // RETURNING user_id + // ", + // ) + // .bind(connection_id.0 as i32) + // .fetch_all(&mut tx) + // .await?; + + // let project_ids = sqlx::query_scalar::<_, ProjectId>( + // " + // SELECT project_id + // FROM project_collaborators + // WHERE connection_id = $1 + // ", + // ) + // .bind(connection_id.0 as i32) + // .fetch_all(&mut tx) + // .await?; + + // // Leave projects. + // let mut left_projects = HashMap::default(); + // if !project_ids.is_empty() { + // let mut params = "?,".repeat(project_ids.len()); + // params.pop(); + // let query = format!( + // " + // SELECT * + // FROM project_collaborators + // WHERE project_id IN ({params}) + // " + // ); + // let mut query = sqlx::query_as::<_, ProjectCollaborator>(&query); + // for project_id in project_ids { + // query = query.bind(project_id); + // } + + // let mut project_collaborators = query.fetch(&mut tx); + // while let Some(collaborator) = project_collaborators.next().await { + // let collaborator = collaborator?; + // let left_project = + // left_projects + // .entry(collaborator.project_id) + // .or_insert(LeftProject { + // id: collaborator.project_id, + // host_user_id: Default::default(), + // connection_ids: Default::default(), + // host_connection_id: Default::default(), + // }); + + // let collaborator_connection_id = + // ConnectionId(collaborator.connection_id as u32); + // if collaborator_connection_id != connection_id { + // left_project.connection_ids.push(collaborator_connection_id); + // } + + // if collaborator.is_host { + // left_project.host_user_id = collaborator.user_id; + // left_project.host_connection_id = + // ConnectionId(collaborator.connection_id as u32); + // } + // } + // } + // sqlx::query( + // " + // DELETE FROM project_collaborators + // WHERE connection_id = $1 + // ", + // ) + // .bind(connection_id.0 as i32) + // .execute(&mut tx) + // .await?; + + // // Unshare projects. + // sqlx::query( + // " + // DELETE FROM projects + // WHERE room_id = $1 AND host_connection_id = $2 + // ", + // ) + // .bind(room_id) + // .bind(connection_id.0 as i32) + // .execute(&mut tx) + // .await?; + + // let room = self.get_room(room_id, &mut tx).await?; + // Ok(Some( + // self.commit_room_transaction( + // room_id, + // tx, + // LeftRoom { + // room, + // left_projects, + // canceled_calls_to_user_ids, + // }, + // ) + // .await?, + // )) + // } else { + // Ok(None) + // } + }) + .await + } + + pub async fn update_room_participant_location( &self, room_id: RoomId, connection_id: ConnectionId, - worktrees: &[proto::WorktreeMetadata], - ) -> Result> { - self.transact(|tx| async move { - let participant = room_participant::Entity::find() - .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0)) - .one(&tx) - .await? - .ok_or_else(|| anyhow!("could not find participant"))?; - if participant.room_id != room_id { - return Err(anyhow!("shared project on unexpected room"))?; - } + location: proto::ParticipantLocation, + ) -> Result> { + self.transact(|tx| async { + todo!() + // let mut tx = tx; + // let location_kind; + // let location_project_id; + // match location + // .variant + // .as_ref() + // .ok_or_else(|| anyhow!("invalid location"))? + // { + // proto::participant_location::Variant::SharedProject(project) => { + // location_kind = 0; + // location_project_id = Some(ProjectId::from_proto(project.id)); + // } + // proto::participant_location::Variant::UnsharedProject(_) => { + // location_kind = 1; + // location_project_id = None; + // } + // proto::participant_location::Variant::External(_) => { + // location_kind = 2; + // location_project_id = None; + // } + // } - let project = project::ActiveModel { - room_id: ActiveValue::set(participant.room_id), - host_user_id: ActiveValue::set(participant.user_id), - host_connection_id: ActiveValue::set(connection_id.0 as i32), - ..Default::default() - } - .insert(&tx) - .await?; + // sqlx::query( + // " + // UPDATE room_participants + // SET location_kind = $1, location_project_id = $2 + // WHERE room_id = $3 AND answering_connection_id = $4 + // RETURNING 1 + // ", + // ) + // .bind(location_kind) + // .bind(location_project_id) + // .bind(room_id) + // .bind(connection_id.0 as i32) + // .fetch_one(&mut tx) + // .await?; - worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel { - id: ActiveValue::set(worktree.id as i32), - project_id: ActiveValue::set(project.id), - abs_path: ActiveValue::set(worktree.abs_path.clone()), - root_name: ActiveValue::set(worktree.root_name.clone()), - visible: ActiveValue::set(worktree.visible), - scan_id: ActiveValue::set(0), - is_complete: ActiveValue::set(false), - })) - .exec(&tx) - .await?; - - project_collaborator::ActiveModel { - project_id: ActiveValue::set(project.id), - connection_id: ActiveValue::set(connection_id.0 as i32), - user_id: ActiveValue::set(participant.user_id), - replica_id: ActiveValue::set(0), - is_host: ActiveValue::set(true), - ..Default::default() - } - .insert(&tx) - .await?; - - let room = self.get_room(room_id, &tx).await?; - self.commit_room_transaction(room_id, tx, (project.id, room)) - .await + // let room = self.get_room(room_id, &mut tx).await?; + // self.commit_room_transaction(room_id, tx, room).await }) .await } + async fn get_guest_connection_ids( + &self, + project_id: ProjectId, + tx: &DatabaseTransaction, + ) -> Result> { + todo!() + // let mut guest_connection_ids = Vec::new(); + // let mut db_guest_connection_ids = sqlx::query_scalar::<_, i32>( + // " + // SELECT connection_id + // FROM project_collaborators + // WHERE project_id = $1 AND is_host = FALSE + // ", + // ) + // .bind(project_id) + // .fetch(tx); + // while let Some(connection_id) = db_guest_connection_ids.next().await { + // guest_connection_ids.push(ConnectionId(connection_id? as u32)); + // } + // Ok(guest_connection_ids) + } + + fn build_incoming_call( + room: &proto::Room, + called_user_id: UserId, + ) -> Option { + let pending_participant = room + .pending_participants + .iter() + .find(|participant| participant.user_id == called_user_id.to_proto())?; + + Some(proto::IncomingCall { + room_id: room.id, + calling_user_id: pending_participant.calling_user_id, + participant_user_ids: room + .participants + .iter() + .map(|participant| participant.user_id) + .collect(), + initial_project: room.participants.iter().find_map(|participant| { + let initial_project_id = pending_participant.initial_project_id?; + participant + .projects + .iter() + .find(|project| project.id == initial_project_id) + .cloned() + }), + }) + } + async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result { let db_room = room::Entity::find_by_id(room_id) .one(tx) @@ -1057,6 +1465,736 @@ impl Database { }) } + // projects + + pub async fn project_count_excluding_admins(&self) -> Result { + #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] + enum QueryAs { + Count, + } + + self.transact(|tx| async move { + Ok(project::Entity::find() + .select_only() + .column_as(project::Column::Id.count(), QueryAs::Count) + .inner_join(user::Entity) + .filter(user::Column::Admin.eq(false)) + .into_values::<_, QueryAs>() + .one(&tx) + .await? + .unwrap_or(0) as usize) + }) + .await + } + + pub async fn share_project( + &self, + room_id: RoomId, + connection_id: ConnectionId, + worktrees: &[proto::WorktreeMetadata], + ) -> Result> { + self.transact(|tx| async move { + let participant = room_participant::Entity::find() + .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0)) + .one(&tx) + .await? + .ok_or_else(|| anyhow!("could not find participant"))?; + if participant.room_id != room_id { + return Err(anyhow!("shared project on unexpected room"))?; + } + + let project = project::ActiveModel { + room_id: ActiveValue::set(participant.room_id), + host_user_id: ActiveValue::set(participant.user_id), + host_connection_id: ActiveValue::set(connection_id.0 as i32), + ..Default::default() + } + .insert(&tx) + .await?; + + worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel { + id: ActiveValue::set(worktree.id as i32), + project_id: ActiveValue::set(project.id), + abs_path: ActiveValue::set(worktree.abs_path.clone()), + root_name: ActiveValue::set(worktree.root_name.clone()), + visible: ActiveValue::set(worktree.visible), + scan_id: ActiveValue::set(0), + is_complete: ActiveValue::set(false), + })) + .exec(&tx) + .await?; + + project_collaborator::ActiveModel { + project_id: ActiveValue::set(project.id), + connection_id: ActiveValue::set(connection_id.0 as i32), + user_id: ActiveValue::set(participant.user_id), + replica_id: ActiveValue::set(ReplicaId(0)), + is_host: ActiveValue::set(true), + ..Default::default() + } + .insert(&tx) + .await?; + + let room = self.get_room(room_id, &tx).await?; + self.commit_room_transaction(room_id, tx, (project.id, room)) + .await + }) + .await + } + + pub async fn unshare_project( + &self, + project_id: ProjectId, + connection_id: ConnectionId, + ) -> Result)>> { + self.transact(|tx| async move { + todo!() + // let guest_connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?; + // let room_id: RoomId = sqlx::query_scalar( + // " + // DELETE FROM projects + // WHERE id = $1 AND host_connection_id = $2 + // RETURNING room_id + // ", + // ) + // .bind(project_id) + // .bind(connection_id.0 as i32) + // .fetch_one(&mut tx) + // .await?; + // let room = self.get_room(room_id, &mut tx).await?; + // self.commit_room_transaction(room_id, tx, (room, guest_connection_ids)) + // .await + }) + .await + } + + pub async fn update_project( + &self, + project_id: ProjectId, + connection_id: ConnectionId, + worktrees: &[proto::WorktreeMetadata], + ) -> Result)>> { + self.transact(|tx| async move { + todo!() + // let room_id: RoomId = sqlx::query_scalar( + // " + // SELECT room_id + // FROM projects + // WHERE id = $1 AND host_connection_id = $2 + // ", + // ) + // .bind(project_id) + // .bind(connection_id.0 as i32) + // .fetch_one(&mut tx) + // .await?; + + // if !worktrees.is_empty() { + // let mut params = "(?, ?, ?, ?, ?, ?, ?),".repeat(worktrees.len()); + // params.pop(); + // let query = format!( + // " + // INSERT INTO worktrees ( + // project_id, + // id, + // root_name, + // abs_path, + // visible, + // scan_id, + // is_complete + // ) + // VALUES {params} + // ON CONFLICT (project_id, id) DO UPDATE SET root_name = excluded.root_name + // " + // ); + + // let mut query = sqlx::query(&query); + // for worktree in worktrees { + // query = query + // .bind(project_id) + // .bind(worktree.id as i32) + // .bind(&worktree.root_name) + // .bind(&worktree.abs_path) + // .bind(worktree.visible) + // .bind(0) + // .bind(false) + // } + // query.execute(&mut tx).await?; + // } + + // let mut params = "?,".repeat(worktrees.len()); + // if !worktrees.is_empty() { + // params.pop(); + // } + // let query = format!( + // " + // DELETE FROM worktrees + // WHERE project_id = ? AND id NOT IN ({params}) + // ", + // ); + + // let mut query = sqlx::query(&query).bind(project_id); + // for worktree in worktrees { + // query = query.bind(WorktreeId(worktree.id as i32)); + // } + // query.execute(&mut tx).await?; + + // let guest_connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?; + // let room = self.get_room(room_id, &mut tx).await?; + // self.commit_room_transaction(room_id, tx, (room, guest_connection_ids)) + // .await + }) + .await + } + + pub async fn update_worktree( + &self, + update: &proto::UpdateWorktree, + connection_id: ConnectionId, + ) -> Result>> { + self.transact(|tx| async move { + todo!() + // let project_id = ProjectId::from_proto(update.project_id); + // let worktree_id = WorktreeId::from_proto(update.worktree_id); + + // // Ensure the update comes from the host. + // let room_id: RoomId = sqlx::query_scalar( + // " + // SELECT room_id + // FROM projects + // WHERE id = $1 AND host_connection_id = $2 + // ", + // ) + // .bind(project_id) + // .bind(connection_id.0 as i32) + // .fetch_one(&mut tx) + // .await?; + + // // Update metadata. + // sqlx::query( + // " + // UPDATE worktrees + // SET + // root_name = $1, + // scan_id = $2, + // is_complete = $3, + // abs_path = $4 + // WHERE project_id = $5 AND id = $6 + // RETURNING 1 + // ", + // ) + // .bind(&update.root_name) + // .bind(update.scan_id as i64) + // .bind(update.is_last_update) + // .bind(&update.abs_path) + // .bind(project_id) + // .bind(worktree_id) + // .fetch_one(&mut tx) + // .await?; + + // if !update.updated_entries.is_empty() { + // let mut params = + // "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?),".repeat(update.updated_entries.len()); + // params.pop(); + + // let query = format!( + // " + // INSERT INTO worktree_entries ( + // project_id, + // worktree_id, + // id, + // is_dir, + // path, + // inode, + // mtime_seconds, + // mtime_nanos, + // is_symlink, + // is_ignored + // ) + // VALUES {params} + // ON CONFLICT (project_id, worktree_id, id) DO UPDATE SET + // is_dir = excluded.is_dir, + // path = excluded.path, + // inode = excluded.inode, + // mtime_seconds = excluded.mtime_seconds, + // mtime_nanos = excluded.mtime_nanos, + // is_symlink = excluded.is_symlink, + // is_ignored = excluded.is_ignored + // " + // ); + // let mut query = sqlx::query(&query); + // for entry in &update.updated_entries { + // let mtime = entry.mtime.clone().unwrap_or_default(); + // query = query + // .bind(project_id) + // .bind(worktree_id) + // .bind(entry.id as i64) + // .bind(entry.is_dir) + // .bind(&entry.path) + // .bind(entry.inode as i64) + // .bind(mtime.seconds as i64) + // .bind(mtime.nanos as i32) + // .bind(entry.is_symlink) + // .bind(entry.is_ignored); + // } + // query.execute(&mut tx).await?; + // } + + // if !update.removed_entries.is_empty() { + // let mut params = "?,".repeat(update.removed_entries.len()); + // params.pop(); + // let query = format!( + // " + // DELETE FROM worktree_entries + // WHERE project_id = ? AND worktree_id = ? AND id IN ({params}) + // " + // ); + + // let mut query = sqlx::query(&query).bind(project_id).bind(worktree_id); + // for entry_id in &update.removed_entries { + // query = query.bind(*entry_id as i64); + // } + // query.execute(&mut tx).await?; + // } + + // let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?; + // self.commit_room_transaction(room_id, tx, connection_ids) + // .await + }) + .await + } + + pub async fn update_diagnostic_summary( + &self, + update: &proto::UpdateDiagnosticSummary, + connection_id: ConnectionId, + ) -> Result>> { + self.transact(|tx| async { + todo!() + // let project_id = ProjectId::from_proto(update.project_id); + // let worktree_id = WorktreeId::from_proto(update.worktree_id); + // let summary = update + // .summary + // .as_ref() + // .ok_or_else(|| anyhow!("invalid summary"))?; + + // // Ensure the update comes from the host. + // let room_id: RoomId = sqlx::query_scalar( + // " + // SELECT room_id + // FROM projects + // WHERE id = $1 AND host_connection_id = $2 + // ", + // ) + // .bind(project_id) + // .bind(connection_id.0 as i32) + // .fetch_one(&mut tx) + // .await?; + + // // Update summary. + // sqlx::query( + // " + // INSERT INTO worktree_diagnostic_summaries ( + // project_id, + // worktree_id, + // path, + // language_server_id, + // error_count, + // warning_count + // ) + // VALUES ($1, $2, $3, $4, $5, $6) + // ON CONFLICT (project_id, worktree_id, path) DO UPDATE SET + // language_server_id = excluded.language_server_id, + // error_count = excluded.error_count, + // warning_count = excluded.warning_count + // ", + // ) + // .bind(project_id) + // .bind(worktree_id) + // .bind(&summary.path) + // .bind(summary.language_server_id as i64) + // .bind(summary.error_count as i32) + // .bind(summary.warning_count as i32) + // .execute(&mut tx) + // .await?; + + // let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?; + // self.commit_room_transaction(room_id, tx, connection_ids) + // .await + }) + .await + } + + pub async fn start_language_server( + &self, + update: &proto::StartLanguageServer, + connection_id: ConnectionId, + ) -> Result>> { + self.transact(|tx| async { + todo!() + // let project_id = ProjectId::from_proto(update.project_id); + // let server = update + // .server + // .as_ref() + // .ok_or_else(|| anyhow!("invalid language server"))?; + + // // Ensure the update comes from the host. + // let room_id: RoomId = sqlx::query_scalar( + // " + // SELECT room_id + // FROM projects + // WHERE id = $1 AND host_connection_id = $2 + // ", + // ) + // .bind(project_id) + // .bind(connection_id.0 as i32) + // .fetch_one(&mut tx) + // .await?; + + // // Add the newly-started language server. + // sqlx::query( + // " + // INSERT INTO language_servers (project_id, id, name) + // VALUES ($1, $2, $3) + // ON CONFLICT (project_id, id) DO UPDATE SET + // name = excluded.name + // ", + // ) + // .bind(project_id) + // .bind(server.id as i64) + // .bind(&server.name) + // .execute(&mut tx) + // .await?; + + // let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?; + // self.commit_room_transaction(room_id, tx, connection_ids) + // .await + }) + .await + } + + pub async fn join_project( + &self, + project_id: ProjectId, + connection_id: ConnectionId, + ) -> Result> { + self.transact(|tx| async move { + todo!() + // let (room_id, user_id) = sqlx::query_as::<_, (RoomId, UserId)>( + // " + // SELECT room_id, user_id + // FROM room_participants + // WHERE answering_connection_id = $1 + // ", + // ) + // .bind(connection_id.0 as i32) + // .fetch_one(&mut tx) + // .await?; + + // // Ensure project id was shared on this room. + // sqlx::query( + // " + // SELECT 1 + // FROM projects + // WHERE id = $1 AND room_id = $2 + // ", + // ) + // .bind(project_id) + // .bind(room_id) + // .fetch_one(&mut tx) + // .await?; + + // let mut collaborators = sqlx::query_as::<_, ProjectCollaborator>( + // " + // SELECT * + // FROM project_collaborators + // WHERE project_id = $1 + // ", + // ) + // .bind(project_id) + // .fetch_all(&mut tx) + // .await?; + // let replica_ids = collaborators + // .iter() + // .map(|c| c.replica_id) + // .collect::>(); + // let mut replica_id = ReplicaId(1); + // while replica_ids.contains(&replica_id) { + // replica_id.0 += 1; + // } + // let new_collaborator = ProjectCollaborator { + // project_id, + // connection_id: connection_id.0 as i32, + // user_id, + // replica_id, + // is_host: false, + // }; + + // sqlx::query( + // " + // INSERT INTO project_collaborators ( + // project_id, + // connection_id, + // user_id, + // replica_id, + // is_host + // ) + // VALUES ($1, $2, $3, $4, $5) + // ", + // ) + // .bind(new_collaborator.project_id) + // .bind(new_collaborator.connection_id) + // .bind(new_collaborator.user_id) + // .bind(new_collaborator.replica_id) + // .bind(new_collaborator.is_host) + // .execute(&mut tx) + // .await?; + // collaborators.push(new_collaborator); + + // let worktree_rows = sqlx::query_as::<_, WorktreeRow>( + // " + // SELECT * + // FROM worktrees + // WHERE project_id = $1 + // ", + // ) + // .bind(project_id) + // .fetch_all(&mut tx) + // .await?; + // let mut worktrees = worktree_rows + // .into_iter() + // .map(|worktree_row| { + // ( + // worktree_row.id, + // Worktree { + // id: worktree_row.id, + // abs_path: worktree_row.abs_path, + // root_name: worktree_row.root_name, + // visible: worktree_row.visible, + // entries: Default::default(), + // diagnostic_summaries: Default::default(), + // scan_id: worktree_row.scan_id as u64, + // is_complete: worktree_row.is_complete, + // }, + // ) + // }) + // .collect::>(); + + // // Populate worktree entries. + // { + // let mut entries = sqlx::query_as::<_, WorktreeEntry>( + // " + // SELECT * + // FROM worktree_entries + // WHERE project_id = $1 + // ", + // ) + // .bind(project_id) + // .fetch(&mut tx); + // while let Some(entry) = entries.next().await { + // let entry = entry?; + // if let Some(worktree) = worktrees.get_mut(&entry.worktree_id) { + // worktree.entries.push(proto::Entry { + // id: entry.id as u64, + // is_dir: entry.is_dir, + // path: entry.path, + // inode: entry.inode as u64, + // mtime: Some(proto::Timestamp { + // seconds: entry.mtime_seconds as u64, + // nanos: entry.mtime_nanos as u32, + // }), + // is_symlink: entry.is_symlink, + // is_ignored: entry.is_ignored, + // }); + // } + // } + // } + + // // Populate worktree diagnostic summaries. + // { + // let mut summaries = sqlx::query_as::<_, WorktreeDiagnosticSummary>( + // " + // SELECT * + // FROM worktree_diagnostic_summaries + // WHERE project_id = $1 + // ", + // ) + // .bind(project_id) + // .fetch(&mut tx); + // while let Some(summary) = summaries.next().await { + // let summary = summary?; + // if let Some(worktree) = worktrees.get_mut(&summary.worktree_id) { + // worktree + // .diagnostic_summaries + // .push(proto::DiagnosticSummary { + // path: summary.path, + // language_server_id: summary.language_server_id as u64, + // error_count: summary.error_count as u32, + // warning_count: summary.warning_count as u32, + // }); + // } + // } + // } + + // // Populate language servers. + // let language_servers = sqlx::query_as::<_, LanguageServer>( + // " + // SELECT * + // FROM language_servers + // WHERE project_id = $1 + // ", + // ) + // .bind(project_id) + // .fetch_all(&mut tx) + // .await?; + + // self.commit_room_transaction( + // room_id, + // tx, + // ( + // Project { + // collaborators, + // worktrees, + // language_servers: language_servers + // .into_iter() + // .map(|language_server| proto::LanguageServer { + // id: language_server.id.to_proto(), + // name: language_server.name, + // }) + // .collect(), + // }, + // replica_id as ReplicaId, + // ), + // ) + // .await + }) + .await + } + + pub async fn leave_project( + &self, + project_id: ProjectId, + connection_id: ConnectionId, + ) -> Result> { + self.transact(|tx| async move { + todo!() + // let result = sqlx::query( + // " + // DELETE FROM project_collaborators + // WHERE project_id = $1 AND connection_id = $2 + // ", + // ) + // .bind(project_id) + // .bind(connection_id.0 as i32) + // .execute(&mut tx) + // .await?; + + // if result.rows_affected() == 0 { + // Err(anyhow!("not a collaborator on this project"))?; + // } + + // let connection_ids = sqlx::query_scalar::<_, i32>( + // " + // SELECT connection_id + // FROM project_collaborators + // WHERE project_id = $1 + // ", + // ) + // .bind(project_id) + // .fetch_all(&mut tx) + // .await? + // .into_iter() + // .map(|id| ConnectionId(id as u32)) + // .collect(); + + // let (room_id, host_user_id, host_connection_id) = + // sqlx::query_as::<_, (RoomId, i32, i32)>( + // " + // SELECT room_id, host_user_id, host_connection_id + // FROM projects + // WHERE id = $1 + // ", + // ) + // .bind(project_id) + // .fetch_one(&mut tx) + // .await?; + + // self.commit_room_transaction( + // room_id, + // tx, + // LeftProject { + // id: project_id, + // host_user_id: UserId(host_user_id), + // host_connection_id: ConnectionId(host_connection_id as u32), + // connection_ids, + // }, + // ) + // .await + }) + .await + } + + pub async fn project_collaborators( + &self, + project_id: ProjectId, + connection_id: ConnectionId, + ) -> Result> { + self.transact(|tx| async move { + todo!() + // let collaborators = sqlx::query_as::<_, ProjectCollaborator>( + // " + // SELECT * + // FROM project_collaborators + // WHERE project_id = $1 + // ", + // ) + // .bind(project_id) + // .fetch_all(&mut tx) + // .await?; + + // if collaborators + // .iter() + // .any(|collaborator| collaborator.connection_id == connection_id.0 as i32) + // { + // Ok(collaborators) + // } else { + // Err(anyhow!("no such project"))? + // } + }) + .await + } + + pub async fn project_connection_ids( + &self, + project_id: ProjectId, + connection_id: ConnectionId, + ) -> Result> { + self.transact(|tx| async move { + todo!() + // let connection_ids = sqlx::query_scalar::<_, i32>( + // " + // SELECT connection_id + // FROM project_collaborators + // WHERE project_id = $1 + // ", + // ) + // .bind(project_id) + // .fetch_all(&mut tx) + // .await?; + + // if connection_ids.contains(&(connection_id.0 as i32)) { + // Ok(connection_ids + // .into_iter() + // .map(|connection_id| ConnectionId(connection_id as u32)) + // .collect()) + // } else { + // Err(anyhow!("no such project"))? + // } + }) + .await + } + + // access tokens + pub async fn create_access_token_hash( &self, user_id: UserId, @@ -1334,14 +2472,45 @@ macro_rules! id_type { id_type!(AccessTokenId); id_type!(ContactId); -id_type!(UserId); id_type!(RoomId); id_type!(RoomParticipantId); id_type!(ProjectId); id_type!(ProjectCollaboratorId); +id_type!(ReplicaId); id_type!(SignupId); +id_type!(UserId); id_type!(WorktreeId); +pub struct LeftRoom { + pub room: proto::Room, + pub left_projects: HashMap, + pub canceled_calls_to_user_ids: Vec, +} + +pub struct Project { + pub collaborators: Vec, + pub worktrees: BTreeMap, + pub language_servers: Vec, +} + +pub struct LeftProject { + pub id: ProjectId, + pub host_user_id: UserId, + pub host_connection_id: ConnectionId, + pub connection_ids: Vec, +} + +pub struct Worktree { + pub id: WorktreeId, + pub abs_path: String, + pub root_name: String, + pub visible: bool, + pub entries: Vec, + pub diagnostic_summaries: Vec, + pub scan_id: u64, + pub is_complete: bool, +} + #[cfg(test)] pub use test::*; diff --git a/crates/collab/src/db/project.rs b/crates/collab/src/db/project.rs index 21ee0b27d1..a9f0d1cb47 100644 --- a/crates/collab/src/db/project.rs +++ b/crates/collab/src/db/project.rs @@ -13,6 +13,12 @@ pub struct Model { #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation { + #[sea_orm( + belongs_to = "super::user::Entity", + from = "Column::HostUserId", + to = "super::user::Column::Id" + )] + HostUser, #[sea_orm( belongs_to = "super::room::Entity", from = "Column::RoomId", @@ -23,6 +29,12 @@ pub enum Relation { Worktree, } +impl Related for Entity { + fn to() -> RelationDef { + Relation::HostUser.def() + } +} + impl Related for Entity { fn to() -> RelationDef { Relation::Room.def() diff --git a/crates/collab/src/db/project_collaborator.rs b/crates/collab/src/db/project_collaborator.rs index 3e572fe5d4..fb1d565e3a 100644 --- a/crates/collab/src/db/project_collaborator.rs +++ b/crates/collab/src/db/project_collaborator.rs @@ -1,4 +1,4 @@ -use super::{ProjectCollaboratorId, ProjectId, UserId}; +use super::{ProjectCollaboratorId, ProjectId, ReplicaId, UserId}; use sea_orm::entity::prelude::*; #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] @@ -9,7 +9,7 @@ pub struct Model { pub project_id: ProjectId, pub connection_id: i32, pub user_id: UserId, - pub replica_id: i32, + pub replica_id: ReplicaId, pub is_host: bool, } diff --git a/crates/collab/src/db/user.rs b/crates/collab/src/db/user.rs index b6e096f667..c2b157bd0a 100644 --- a/crates/collab/src/db/user.rs +++ b/crates/collab/src/db/user.rs @@ -24,6 +24,8 @@ pub enum Relation { AccessToken, #[sea_orm(has_one = "super::room_participant::Entity")] RoomParticipant, + #[sea_orm(has_many = "super::project::Entity")] + HostedProjects, } impl Related for Entity { @@ -38,4 +40,10 @@ impl Related for Entity { } } +impl Related for Entity { + fn to() -> RelationDef { + Relation::HostedProjects.def() + } +} + impl ActiveModelBehavior for ActiveModel {}