From d6462c611cc8256411b0f3c44402d5283b1fe25d Mon Sep 17 00:00:00 2001 From: Julia Date: Fri, 3 Feb 2023 13:13:38 -0500 Subject: [PATCH] Begin tracking follow states on collab server Co-Authored-By: Antonio Scandurra --- crates/call/src/room.rs | 22 ++++++++ .../migrations/20230202155735_followers.sql | 15 ++++++ crates/collab/src/db.rs | 42 +++++++++++++++ crates/collab/src/db/follower.rs | 51 +++++++++++++++++++ crates/collab/src/db/room.rs | 8 +++ crates/collab/src/rpc.rs | 10 ++++ crates/rpc/proto/zed.proto | 10 +++- crates/rpc/src/rpc.rs | 2 +- 8 files changed, 157 insertions(+), 3 deletions(-) create mode 100644 crates/collab/migrations/20230202155735_followers.sql create mode 100644 crates/collab/src/db/follower.rs diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 9c22298946..bf82fb4c73 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -55,6 +55,7 @@ pub struct Room { leave_when_empty: bool, client: Arc, user_store: ModelHandle, + follows_by_leader_id: HashMap>, subscriptions: Vec, pending_room_update: Option>, maintain_connection: Option>>, @@ -148,6 +149,7 @@ impl Room { pending_room_update: None, client, user_store, + follows_by_leader_id: Default::default(), maintain_connection: Some(maintain_connection), } } @@ -487,11 +489,13 @@ impl Room { .iter() .map(|p| p.user_id) .collect::>(); + let remote_participant_user_ids = room .participants .iter() .map(|p| p.user_id) .collect::>(); + let (remote_participants, pending_participants) = self.user_store.update(cx, move |user_store, cx| { ( @@ -499,6 +503,7 @@ impl Room { user_store.get_users(pending_participant_user_ids, cx), ) }); + self.pending_room_update = Some(cx.spawn(|this, mut cx| async move { let (remote_participants, pending_participants) = futures::join!(remote_participants, pending_participants); @@ -620,6 +625,23 @@ impl Room { } } + this.follows_by_leader_id.clear(); + for follower in room.followers { + let (leader, follower) = match (follower.leader_id, follower.follower_id) { + (Some(leader), Some(follower)) => (leader, follower), + + _ => { + log::error!("Follower message {follower:?} missing some state"); + continue; + } + }; + + this.follows_by_leader_id + .entry(leader) + .or_insert(Default::default()) + .insert(follower); + } + this.pending_room_update.take(); if this.should_leave() { log::info!("room is empty, leaving"); diff --git a/crates/collab/migrations/20230202155735_followers.sql b/crates/collab/migrations/20230202155735_followers.sql new file mode 100644 index 0000000000..c82d6ba3bd --- /dev/null +++ b/crates/collab/migrations/20230202155735_followers.sql @@ -0,0 +1,15 @@ +CREATE TABLE IF NOT EXISTS "followers" ( + "id" SERIAL PRIMARY KEY, + "room_id" INTEGER NOT NULL REFERENCES rooms (id) ON DELETE CASCADE, + "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE, + "leader_connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE, + "leader_connection_id" INTEGER NOT NULL, + "follower_connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE, + "follower_connection_id" INTEGER NOT NULL +); + +CREATE UNIQUE INDEX + "index_followers_on_project_id_and_leader_connection_server_id_and_leader_connection_id_and_follower_connection_server_id_and_follower_connection_id" +ON "followers" ("project_id", "leader_connection_server_id", "leader_connection_id", "follower_connection_server_id", "follower_connection_id"); + +CREATE INDEX "index_followers_on_room_id" ON "followers" ("room_id"); diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index af30073ab4..e60cc99145 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -1,5 +1,6 @@ mod access_token; mod contact; +mod follower; mod language_server; mod project; mod project_collaborator; @@ -1717,6 +1718,35 @@ impl Database { .await } + pub async fn follow( + &self, + room_id: RoomId, + project_id: ProjectId, + leader_connection: ConnectionId, + follower_connection: ConnectionId, + ) -> Result> { + self.room_transaction(|tx| async move { + follower::ActiveModel { + room_id: ActiveValue::set(room_id), + project_id: ActiveValue::set(project_id), + leader_connection_server_id: ActiveValue::set(ServerId( + leader_connection.owner_id as i32, + )), + leader_connection_id: ActiveValue::set(leader_connection.id as i32), + follower_connection_server_id: ActiveValue::set(ServerId( + follower_connection.owner_id as i32, + )), + follower_connection_id: ActiveValue::set(follower_connection.id as i32), + ..Default::default() + } + .insert(&*tx) + .await?; + + Ok((room_id, self.get_room(room_id, &*tx).await?)) + }) + .await + } + pub async fn update_room_participant_location( &self, room_id: RoomId, @@ -1927,11 +1957,22 @@ impl Database { } } + let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?; + let mut followers = Vec::new(); + while let Some(db_follower) = db_followers.next().await { + let db_follower = db_follower?; + followers.push(proto::Follower { + leader_id: Some(db_follower.leader_connection().into()), + follower_id: Some(db_follower.follower_connection().into()), + }); + } + Ok(proto::Room { id: db_room.id.to_proto(), live_kit_room: db_room.live_kit_room, participants: participants.into_values().collect(), pending_participants, + followers, }) } @@ -3011,6 +3052,7 @@ macro_rules! id_type { id_type!(AccessTokenId); id_type!(ContactId); +id_type!(FollowerId); id_type!(RoomId); id_type!(RoomParticipantId); id_type!(ProjectId); diff --git a/crates/collab/src/db/follower.rs b/crates/collab/src/db/follower.rs new file mode 100644 index 0000000000..f1243dc99e --- /dev/null +++ b/crates/collab/src/db/follower.rs @@ -0,0 +1,51 @@ +use super::{FollowerId, ProjectId, RoomId, ServerId}; +use rpc::ConnectionId; +use sea_orm::entity::prelude::*; +use serde::Serialize; + +#[derive(Clone, Debug, Default, PartialEq, Eq, DeriveEntityModel, Serialize)] +#[sea_orm(table_name = "followers")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: FollowerId, + pub room_id: RoomId, + pub project_id: ProjectId, + pub leader_connection_server_id: ServerId, + pub leader_connection_id: i32, + pub follower_connection_server_id: ServerId, + pub follower_connection_id: i32, +} + +impl Model { + pub fn leader_connection(&self) -> ConnectionId { + ConnectionId { + owner_id: self.leader_connection_server_id.0 as u32, + id: self.leader_connection_id as u32, + } + } + + pub fn follower_connection(&self) -> ConnectionId { + ConnectionId { + owner_id: self.follower_connection_server_id.0 as u32, + id: self.follower_connection_id as u32, + } + } +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::room::Entity", + from = "Column::RoomId", + to = "super::room::Column::Id" + )] + Room, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Room.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/collab/src/db/room.rs b/crates/collab/src/db/room.rs index 7dbf03a780..c3e88670eb 100644 --- a/crates/collab/src/db/room.rs +++ b/crates/collab/src/db/room.rs @@ -15,6 +15,8 @@ pub enum Relation { RoomParticipant, #[sea_orm(has_many = "super::project::Entity")] Project, + #[sea_orm(has_many = "super::follower::Entity")] + Follower, } impl Related for Entity { @@ -29,4 +31,10 @@ impl Related for Entity { } } +impl Related for Entity { + fn to() -> RelationDef { + Relation::Follower.def() + } +} + impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 32cce1e681..fad34497eb 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -1312,6 +1312,7 @@ async fn join_project( .filter(|collaborator| collaborator.connection_id != session.connection_id) .map(|collaborator| collaborator.to_proto()) .collect::>(); + let worktrees = project .worktrees .iter() @@ -1719,6 +1720,7 @@ async fn follow( session: Session, ) -> Result<()> { let project_id = ProjectId::from_proto(request.project_id); + let room_id = RoomId::from_proto(request.project_id); let leader_id = request .leader_id .ok_or_else(|| anyhow!("invalid leader id"))? @@ -1744,6 +1746,14 @@ async fn follow( .views .retain(|view| view.leader_id != Some(follower_id.into())); response.send(response_payload)?; + + let room = session + .db() + .await + .follow(room_id, project_id, leader_id, follower_id) + .await?; + room_updated(&room, &session.peer); + Ok(()) } diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index ba481ce45b..6b46f09e26 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -16,7 +16,7 @@ message Envelope { Error error = 6; Ping ping = 7; Test test = 8; - + CreateRoom create_room = 9; CreateRoomResponse create_room_response = 10; JoinRoom join_room = 11; @@ -206,7 +206,8 @@ message Room { uint64 id = 1; repeated Participant participants = 2; repeated PendingParticipant pending_participants = 3; - string live_kit_room = 4; + repeated Follower followers = 4; + string live_kit_room = 5; } message Participant { @@ -227,6 +228,11 @@ message ParticipantProject { repeated string worktree_root_names = 2; } +message Follower { + PeerId leader_id = 1; + PeerId follower_id = 2; +} + message ParticipantLocation { oneof variant { SharedProject shared_project = 1; diff --git a/crates/rpc/src/rpc.rs b/crates/rpc/src/rpc.rs index b05bc17906..439ed87746 100644 --- a/crates/rpc/src/rpc.rs +++ b/crates/rpc/src/rpc.rs @@ -6,4 +6,4 @@ pub use conn::Connection; pub use peer::*; mod macros; -pub const PROTOCOL_VERSION: u32 = 46; +pub const PROTOCOL_VERSION: u32 = 47;