From 364ed1f840fc62e3dbb2da464d75cbfec2f100c0 Mon Sep 17 00:00:00 2001 From: Mikayla Date: Mon, 21 Aug 2023 17:53:37 -0700 Subject: [PATCH] WIP: pass synchronize channel buffers integration test --- crates/channel/src/channel_buffer.rs | 113 +++++++++++------- .../20221109000000_test_schema.sql | 21 +++- .../20230819154600_add_channel_buffers.sql | 18 ++- crates/collab/src/db/ids.rs | 1 + crates/collab/src/db/queries/buffers.rs | 105 +++++++++++++--- crates/collab/src/db/queries/channels.rs | 28 ----- crates/collab/src/db/tables.rs | 1 + crates/collab/src/db/tables/buffer.rs | 23 +++- crates/collab/src/db/tables/channel.rs | 9 +- .../db/tables/channel_buffer_collaborator.rs | 42 +++++++ crates/collab/src/db/tests/buffer_tests.rs | 57 ++++++++- crates/collab/src/rpc.rs | 52 ++++++-- .../collab/src/tests/channel_buffer_tests.rs | 40 +++---- crates/rpc/proto/zed.proto | 25 ++-- crates/rpc/src/proto.rs | 11 +- 15 files changed, 411 insertions(+), 135 deletions(-) create mode 100644 crates/collab/src/db/tables/channel_buffer_collaborator.rs diff --git a/crates/channel/src/channel_buffer.rs b/crates/channel/src/channel_buffer.rs index 10f59bce46..372bd319a1 100644 --- a/crates/channel/src/channel_buffer.rs +++ b/crates/channel/src/channel_buffer.rs @@ -1,9 +1,10 @@ use crate::ChannelId; use anyhow::Result; use client::Client; -use gpui::{Entity, ModelContext, ModelHandle, Task}; -use rpc::proto::GetChannelBuffer; +use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task}; +use rpc::{proto, TypedEnvelope}; use std::sync::Arc; +use util::ResultExt; // Open the channel document // ChannelDocumentView { ChannelDocument, Editor } -> On clone, clones internal ChannelDocument handle, instantiates new editor @@ -14,9 +15,12 @@ use std::sync::Arc; // ChannleBuffers: HashMap> // } +type BufferId = u64; + pub struct ChannelBuffer { channel_id: ChannelId, - buffer: Option>, + buffer_id: BufferId, + buffer: ModelHandle, client: Arc, } @@ -28,53 +32,76 @@ impl ChannelBuffer { pub fn for_channel( channel_id: ChannelId, client: Arc, - cx: &mut ModelContext, - ) -> Self { - Self { - channel_id, - client, - buffer: None, - } + cx: &mut AppContext, + ) -> Task>> { + cx.spawn(|mut cx| async move { + let response = client + .request(proto::OpenChannelBuffer { channel_id }) + .await?; + + let base_text = response.base_text; + let operations = response + .operations + .into_iter() + .map(language::proto::deserialize_operation) + .collect::, _>>()?; + let buffer_id = response.buffer_id; + + let buffer = cx.add_model(|cx| language::Buffer::new(0, base_text, cx)); + buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))?; + + anyhow::Ok(cx.add_model(|cx| { + cx.subscribe(&buffer, Self::on_buffer_update).detach(); + client.add_model_message_handler(Self::handle_update_channel_buffer); + Self { + buffer_id, + buffer, + client, + channel_id, + } + })) + }) + } + + async fn handle_update_channel_buffer( + this: ModelHandle, + update_channel_buffer: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + let ops = update_channel_buffer + .payload + .operations + .into_iter() + .map(language::proto::deserialize_operation) + .collect::, _>>()?; + + this.update(&mut cx, |this, cx| { + this.buffer + .update(cx, |buffer, cx| buffer.apply_ops(ops, cx)) + })?; + + Ok(()) } fn on_buffer_update( &mut self, - buffer: ModelHandle, + _: ModelHandle, event: &language::Event, - cx: &mut ModelContext, + _: &mut ModelContext, ) { - // - } - - pub fn buffer( - &mut self, - cx: &mut ModelContext, - ) -> Task>> { - if let Some(buffer) = &self.buffer { - Task::ready(Ok(buffer.clone())) - } else { - let channel_id = self.channel_id; - let client = self.client.clone(); - cx.spawn(|this, mut cx| async move { - let response = client.request(GetChannelBuffer { channel_id }).await?; - - let base_text = response.base_text; - let operations = response - .operations - .into_iter() - .map(language::proto::deserialize_operation) - .collect::, _>>()?; - - this.update(&mut cx, |this, cx| { - let buffer = cx.add_model(|cx| language::Buffer::new(0, base_text, cx)); - buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?; - - cx.subscribe(&buffer, Self::on_buffer_update).detach(); - - this.buffer = Some(buffer.clone()); - anyhow::Ok(buffer) + if let language::Event::Operation(operation) = event { + let operation = language::proto::serialize_operation(operation); + self.client + .send(proto::UpdateChannelBuffer { + buffer_id: self.buffer_id, + operations: vec![operation], }) - }) + .log_err(); } } + + pub fn buffer(&self) -> ModelHandle { + self.buffer.clone() + } } diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql index 1e4663a6f6..12ff2caec5 100644 --- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql +++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql @@ -189,8 +189,7 @@ CREATE INDEX "index_followers_on_room_id" ON "followers" ("room_id"); CREATE TABLE "channels" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, "name" VARCHAR NOT NULL, - "created_at" TIMESTAMP NOT NULL DEFAULT now, - "main_buffer_id" INTEGER REFERENCES buffers (id) + "created_at" TIMESTAMP NOT NULL DEFAULT now ); CREATE TABLE "channel_paths" ( @@ -212,9 +211,12 @@ CREATE UNIQUE INDEX "index_channel_members_on_channel_id_and_user_id" ON "channe CREATE TABLE "buffers" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, + "channel_id" INTEGER NOT NULL REFERENCES channels (id) ON DELETE CASCADE, "epoch" INTEGER NOT NULL DEFAULT 0 ); +CREATE INDEX "index_buffers_on_channel_id" ON "buffers" ("channel_id"); + CREATE TABLE "buffer_operations" ( "buffer_id" INTEGER NOT NULL REFERENCES buffers (id) ON DELETE CASCADE, "epoch" INTEGER NOT NULL, @@ -233,3 +235,18 @@ CREATE TABLE "buffer_snapshots" ( "text" TEXT NOT NULL, PRIMARY KEY(buffer_id, epoch) ); + +CREATE TABLE "channel_buffer_collaborators" ( + "id" INTEGER PRIMARY KEY AUTOINCREMENT, + "buffer_id" INTEGER NOT NULL REFERENCES buffers (id) ON DELETE CASCADE, + "connection_id" INTEGER NOT NULL, + "connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE, + "user_id" INTEGER NOT NULL REFERENCES users (id) ON DELETE CASCADE, + "replica_id" INTEGER NOT NULL +); + +CREATE INDEX "index_channel_buffer_collaborators_on_buffer_id" ON "channel_buffer_collaborators" ("buffer_id"); +CREATE UNIQUE INDEX "index_channel_buffer_collaborators_on_buffer_id_and_replica_id" ON "channel_buffer_collaborators" ("buffer_id", "replica_id"); +CREATE INDEX "index_channel_buffer_collaborators_on_connection_server_id" ON "channel_buffer_collaborators" ("connection_server_id"); +CREATE INDEX "index_channel_buffer_collaborators_on_connection_id" ON "channel_buffer_collaborators" ("connection_id"); +CREATE UNIQUE INDEX "index_channel_buffer_collaborators_on_buffer_id_connection_id_and_server_id" ON "channel_buffer_collaborators" ("buffer_id", "connection_id", "connection_server_id"); diff --git a/crates/collab/migrations/20230819154600_add_channel_buffers.sql b/crates/collab/migrations/20230819154600_add_channel_buffers.sql index a4d936fd74..8ccd7acadf 100644 --- a/crates/collab/migrations/20230819154600_add_channel_buffers.sql +++ b/crates/collab/migrations/20230819154600_add_channel_buffers.sql @@ -1,8 +1,11 @@ CREATE TABLE "buffers" ( "id" SERIAL PRIMARY KEY, + "channel_id" INTEGER NOT NULL REFERENCES channels (id) ON DELETE CASCADE, "epoch" INTEGER NOT NULL DEFAULT 0 ); +CREATE INDEX "index_buffers_on_channel_id" ON "buffers" ("channel_id"); + CREATE TABLE "buffer_operations" ( "buffer_id" INTEGER NOT NULL REFERENCES buffers (id) ON DELETE CASCADE, "epoch" INTEGER NOT NULL, @@ -22,4 +25,17 @@ CREATE TABLE "buffer_snapshots" ( PRIMARY KEY(buffer_id, epoch) ); -ALTER TABLE "channels" ADD COLUMN "main_buffer_id" INTEGER REFERENCES buffers (id); +CREATE TABLE "channel_buffer_collaborators" ( + "id" SERIAL PRIMARY KEY, + "buffer_id" INTEGER NOT NULL REFERENCES buffers (id) ON DELETE CASCADE, + "connection_id" INTEGER NOT NULL, + "connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE, + "user_id" INTEGER NOT NULL REFERENCES users (id) ON DELETE CASCADE, + "replica_id" INTEGER NOT NULL +); + +CREATE INDEX "index_channel_buffer_collaborators_on_buffer_id" ON "channel_buffer_collaborators" ("buffer_id"); +CREATE UNIQUE INDEX "index_channel_buffer_collaborators_on_buffer_id_and_replica_id" ON "channel_buffer_collaborators" ("buffer_id", "replica_id"); +CREATE INDEX "index_channel_buffer_collaborators_on_connection_server_id" ON "channel_buffer_collaborators" ("connection_server_id"); +CREATE INDEX "index_channel_buffer_collaborators_on_connection_id" ON "channel_buffer_collaborators" ("connection_id"); +CREATE UNIQUE INDEX "index_channel_buffer_collaborators_on_buffer_id_connection_id_and_server_id" ON "channel_buffer_collaborators" ("buffer_id", "connection_id", "connection_server_id"); diff --git a/crates/collab/src/db/ids.rs b/crates/collab/src/db/ids.rs index 54f9463cca..8501083f83 100644 --- a/crates/collab/src/db/ids.rs +++ b/crates/collab/src/db/ids.rs @@ -124,3 +124,4 @@ id_type!(ReplicaId); id_type!(ServerId); id_type!(SignupId); id_type!(UserId); +id_type!(ChannelBufferCollaboratorId); diff --git a/crates/collab/src/db/queries/buffers.rs b/crates/collab/src/db/queries/buffers.rs index f5ff2e3367..ba88e95fb8 100644 --- a/crates/collab/src/db/queries/buffers.rs +++ b/crates/collab/src/db/queries/buffers.rs @@ -1,20 +1,12 @@ use super::*; use prost::Message; -pub struct Buffer { +pub struct ChannelBuffer { pub base_text: String, pub operations: Vec, } impl Database { - pub async fn create_buffer(&self) -> Result { - self.transaction(|tx| async move { - let buffer = buffer::ActiveModel::new().insert(&*tx).await?; - Ok(buffer.id) - }) - .await - } - pub async fn update_buffer( &self, buffer_id: BufferId, @@ -69,13 +61,65 @@ impl Database { .await } - pub async fn get_buffer(&self, id: BufferId) -> Result { + pub async fn join_buffer_for_channel( + &self, + channel_id: ChannelId, + user_id: UserId, + connection: ConnectionId, + ) -> Result { self.transaction(|tx| async move { - let buffer = buffer::Entity::find_by_id(id) - .one(&*tx) - .await? - .ok_or_else(|| anyhow!("no such buffer"))?; + let tx = tx; + // Get or create buffer from channel + self.check_user_is_channel_member(channel_id, user_id, &tx) + .await?; + + let buffer = channel::Model { + id: channel_id, + ..Default::default() + } + .find_related(buffer::Entity) + .one(&*tx) + .await?; + + let buffer = if let Some(buffer) = buffer { + buffer + } else { + let buffer = buffer::ActiveModel { + channel_id: ActiveValue::Set(channel_id), + ..Default::default() + } + .insert(&*tx) + .await?; + buffer + }; + + // Join the collaborators + let collaborators = buffer + .find_related(channel_buffer_collaborator::Entity) + .all(&*tx) + .await?; + let replica_ids = collaborators + .iter() + .map(|c| c.replica_id) + .collect::>(); + let mut replica_id = ReplicaId(0); + while replica_ids.contains(&replica_id) { + replica_id.0 += 1; + } + channel_buffer_collaborator::ActiveModel { + buffer_id: ActiveValue::Set(buffer.id), + connection_id: ActiveValue::Set(connection.id as i32), + connection_server_id: ActiveValue::Set(ServerId(connection.owner_id as i32)), + user_id: ActiveValue::Set(user_id), + replica_id: ActiveValue::Set(replica_id), + ..Default::default() + } + .insert(&*tx) + .await?; + + // Assemble the buffer state + let id = buffer.id; let base_text = if buffer.epoch > 0 { buffer_snapshot::Entity::find() .filter( @@ -128,13 +172,44 @@ impl Database { }) } - Ok(Buffer { + Ok(ChannelBuffer { base_text, operations, }) }) .await } + + pub async fn get_buffer_collaborators(&self, buffer: BufferId) -> Result<()> { + todo!() + } + + pub async fn leave_buffer(&self, buffer: BufferId, user: UserId) -> Result<()> { + self.transaction(|tx| async move { + //TODO + // let tx = tx; + // let channel = channel::Entity::find_by_id(channel_id) + // .one(&*tx) + // .await? + // .ok_or_else(|| anyhow!("invalid channel"))?; + + // if let Some(id) = channel.main_buffer_id { + // return Ok(id); + // } else { + // let buffer = buffer::ActiveModel::new().insert(&*tx).await?; + // channel::ActiveModel { + // id: ActiveValue::Unchanged(channel_id), + // main_buffer_id: ActiveValue::Set(Some(buffer.id)), + // ..Default::default() + // } + // .update(&*tx) + // .await?; + // Ok(buffer.id) + // } + Ok(()) + }) + .await + } } mod storage { diff --git a/crates/collab/src/db/queries/channels.rs b/crates/collab/src/db/queries/channels.rs index 85a9304a2e..e3d3643a61 100644 --- a/crates/collab/src/db/queries/channels.rs +++ b/crates/collab/src/db/queries/channels.rs @@ -689,34 +689,6 @@ impl Database { }) .await } - - pub async fn get_or_create_buffer_for_channel( - &self, - channel_id: ChannelId, - ) -> Result { - self.transaction(|tx| async move { - let tx = tx; - let channel = channel::Entity::find_by_id(channel_id) - .one(&*tx) - .await? - .ok_or_else(|| anyhow!("invalid channel"))?; - - if let Some(id) = channel.main_buffer_id { - return Ok(id); - } else { - let buffer = buffer::ActiveModel::new().insert(&*tx).await?; - channel::ActiveModel { - id: ActiveValue::Unchanged(channel_id), - main_buffer_id: ActiveValue::Set(Some(buffer.id)), - ..Default::default() - } - .update(&*tx) - .await?; - Ok(buffer.id) - } - }) - .await - } } #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] diff --git a/crates/collab/src/db/tables.rs b/crates/collab/src/db/tables.rs index fbf4bff2a6..fe747e0d27 100644 --- a/crates/collab/src/db/tables.rs +++ b/crates/collab/src/db/tables.rs @@ -3,6 +3,7 @@ pub mod buffer; pub mod buffer_operation; pub mod buffer_snapshot; pub mod channel; +pub mod channel_buffer_collaborator; pub mod channel_member; pub mod channel_path; pub mod contact; diff --git a/crates/collab/src/db/tables/buffer.rs b/crates/collab/src/db/tables/buffer.rs index 84e62cc071..f0187ad278 100644 --- a/crates/collab/src/db/tables/buffer.rs +++ b/crates/collab/src/db/tables/buffer.rs @@ -1,4 +1,4 @@ -use crate::db::BufferId; +use crate::db::{BufferId, ChannelId}; use sea_orm::entity::prelude::*; #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] @@ -7,6 +7,7 @@ pub struct Model { #[sea_orm(primary_key)] pub id: BufferId, pub epoch: i32, + pub channel_id: ChannelId, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -15,6 +16,14 @@ pub enum Relation { Operations, #[sea_orm(has_many = "super::buffer_snapshot::Entity")] Snapshots, + #[sea_orm( + belongs_to = "super::channel::Entity", + from = "Column::ChannelId", + to = "super::channel::Column::Id" + )] + Channel, + #[sea_orm(has_many = "super::channel_buffer_collaborator::Entity")] + Collaborators, } impl Related for Entity { @@ -29,4 +38,16 @@ impl Related for Entity { } } +impl Related for Entity { + fn to() -> RelationDef { + Relation::Channel.def() + } +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Collaborators.def() + } +} + impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/collab/src/db/tables/channel.rs b/crates/collab/src/db/tables/channel.rs index 444d5fa6d9..7b33e3a1dd 100644 --- a/crates/collab/src/db/tables/channel.rs +++ b/crates/collab/src/db/tables/channel.rs @@ -7,7 +7,6 @@ pub struct Model { #[sea_orm(primary_key)] pub id: ChannelId, pub name: String, - pub main_buffer_id: Option, } impl ActiveModelBehavior for ActiveModel {} @@ -16,6 +15,8 @@ impl ActiveModelBehavior for ActiveModel {} pub enum Relation { #[sea_orm(has_one = "super::room::Entity")] Room, + #[sea_orm(has_one = "super::room::Entity")] + Buffer, #[sea_orm(has_many = "super::channel_member::Entity")] Member, } @@ -31,3 +32,9 @@ impl Related for Entity { Relation::Room.def() } } + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Buffer.def() + } +} diff --git a/crates/collab/src/db/tables/channel_buffer_collaborator.rs b/crates/collab/src/db/tables/channel_buffer_collaborator.rs new file mode 100644 index 0000000000..2e43e93e8e --- /dev/null +++ b/crates/collab/src/db/tables/channel_buffer_collaborator.rs @@ -0,0 +1,42 @@ +use crate::db::{BufferId, ChannelBufferCollaboratorId, ReplicaId, ServerId, UserId}; +use rpc::ConnectionId; +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] +#[sea_orm(table_name = "channel_buffer_collaborators")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: ChannelBufferCollaboratorId, + pub buffer_id: BufferId, + pub connection_id: i32, + pub connection_server_id: ServerId, + pub user_id: UserId, + pub replica_id: ReplicaId, +} + +impl Model { + pub fn connection(&self) -> ConnectionId { + ConnectionId { + owner_id: self.connection_server_id.0 as u32, + id: self.connection_id as u32, + } + } +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::buffer::Entity", + from = "Column::BufferId", + to = "super::buffer::Column::Id" + )] + Buffer, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Buffer.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/collab/src/db/tests/buffer_tests.rs b/crates/collab/src/db/tests/buffer_tests.rs index f0e78e1fe4..bf7d7763e2 100644 --- a/crates/collab/src/db/tests/buffer_tests.rs +++ b/crates/collab/src/db/tests/buffer_tests.rs @@ -6,7 +6,60 @@ use text::Buffer; test_both_dbs!(test_buffers, test_buffers_postgres, test_buffers_sqlite); async fn test_buffers(db: &Arc) { - let buffer_id = db.create_buffer().await.unwrap(); + // Prep database test info + let a_id = db + .create_user( + "user_a@example.com", + false, + NewUserParams { + github_login: "user_a".into(), + github_user_id: 101, + invite_count: 0, + }, + ) + .await + .unwrap() + .user_id; + let b_id = db + .create_user( + "user_b@example.com", + false, + NewUserParams { + github_login: "user_b".into(), + github_user_id: 102, + invite_count: 0, + }, + ) + .await + .unwrap() + .user_id; + // This user will not be a part of the channel + let c_id = db + .create_user( + "user_b@example.com", + false, + NewUserParams { + github_login: "user_b".into(), + github_user_id: 102, + invite_count: 0, + }, + ) + .await + .unwrap() + .user_id; + + let zed_id = db.create_root_channel("zed", "1", a_id).await.unwrap(); + + db.invite_channel_member(zed_id, b_id, a_id, false) + .await + .unwrap(); + + db.respond_to_channel_invite(zed_id, b_id, true) + .await + .unwrap(); + + // TODO: Join buffer + let buffer_id = db.get_or_create_buffer_for_channel(zed_id); let mut buffer = Buffer::new(0, 0, "".to_string()); let mut operations = Vec::new(); @@ -23,7 +76,7 @@ async fn test_buffers(db: &Arc) { db.update_buffer(buffer_id, &operations).await.unwrap(); - let buffer_data = db.get_buffer(buffer_id).await.unwrap(); + let buffer_data = db.open_buffer(buffer_id).await.unwrap(); let mut buffer_2 = Buffer::new(0, 0, buffer_data.base_text); buffer_2 diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 22eb23ce8e..6e62b90473 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -2,7 +2,10 @@ mod connection_pool; use crate::{ auth, - db::{self, ChannelId, ChannelsForUser, Database, ProjectId, RoomId, ServerId, User, UserId}, + db::{ + self, BufferId, ChannelId, ChannelsForUser, Database, ProjectId, RoomId, ServerId, User, + UserId, + }, executor::Executor, AppState, Result, }; @@ -35,8 +38,8 @@ use lazy_static::lazy_static; use prometheus::{register_int_gauge, IntGauge}; use rpc::{ proto::{ - self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, GetChannelBufferResponse, - LiveKitConnectionInfo, RequestMessage, + self, Ack, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, LiveKitConnectionInfo, + OpenChannelBufferResponse, RequestMessage, }, Connection, ConnectionId, Peer, Receipt, TypedEnvelope, }; @@ -248,7 +251,9 @@ impl Server { .add_request_handler(remove_channel_member) .add_request_handler(set_channel_member_admin) .add_request_handler(rename_channel) - .add_request_handler(get_channel_buffer) + .add_request_handler(open_channel_buffer) + .add_request_handler(close_channel_buffer) + .add_message_handler(update_channel_buffer) .add_request_handler(get_channel_members) .add_request_handler(respond_to_channel_invite) .add_request_handler(join_channel) @@ -2479,9 +2484,9 @@ async fn join_channel( Ok(()) } -async fn get_channel_buffer( - request: proto::GetChannelBuffer, - response: Response, +async fn open_channel_buffer( + request: proto::OpenChannelBuffer, + response: Response, session: Session, ) -> Result<()> { let db = session.db().await; @@ -2489,9 +2494,12 @@ async fn get_channel_buffer( let buffer_id = db.get_or_create_buffer_for_channel(channel_id).await?; - let buffer = db.get_buffer(buffer_id).await?; + // TODO: join channel_buffer - response.send(GetChannelBufferResponse { + let buffer = db.open_buffer(buffer_id).await?; + + response.send(OpenChannelBufferResponse { + buffer_id: buffer_id.to_proto(), base_text: buffer.base_text, operations: buffer.operations, })?; @@ -2499,6 +2507,32 @@ async fn get_channel_buffer( Ok(()) } +async fn close_channel_buffer( + request: proto::CloseChannelBuffer, + response: Response, + session: Session, +) -> Result<()> { + let db = session.db().await; + let buffer_id = BufferId::from_proto(request.buffer_id); + + // TODO: close channel buffer here + // + response.send(Ack {})?; + + Ok(()) +} + +async fn update_channel_buffer( + request: proto::UpdateChannelBuffer, + session: Session, +) -> Result<()> { + let db = session.db().await; + + // TODO: Broadcast to buffer members + + Ok(()) +} + async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> { let project_id = ProjectId::from_proto(request.project_id); let project_connection_ids = session diff --git a/crates/collab/src/tests/channel_buffer_tests.rs b/crates/collab/src/tests/channel_buffer_tests.rs index e7f662523e..c41f5de803 100644 --- a/crates/collab/src/tests/channel_buffer_tests.rs +++ b/crates/collab/src/tests/channel_buffer_tests.rs @@ -19,45 +19,39 @@ async fn test_channel_buffers( .make_channel("zed", (&client_a, cx_a), &mut [(&client_b, cx_b)]) .await; - let a_document = - cx_a.add_model(|cx| ChannelBuffer::for_channel(zed_id, client_a.client().to_owned(), cx)); - let channel_buffer_a = a_document - .update(cx_a, |doc, cx| doc.buffer(cx)) + let channel_buffer_a = cx_a + .update(|cx| ChannelBuffer::for_channel(zed_id, client_a.client().to_owned(), cx)) .await .unwrap(); - edit_channel_buffer(&channel_buffer_a, cx_a, [(0..0, "hello world")]); - edit_channel_buffer(&channel_buffer_a, cx_a, [(5..5, ", cruel")]); - edit_channel_buffer(&channel_buffer_a, cx_a, [(0..5, "goodbye")]); - undo_channel_buffer(&channel_buffer_a, cx_a); + let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer()); - assert_eq!( - channel_buffer_text(&channel_buffer_a, cx_a), - "hello, cruel world" - ); + edit_channel_buffer(&buffer_a, cx_a, [(0..0, "hello world")]); + edit_channel_buffer(&buffer_a, cx_a, [(5..5, ", cruel")]); + edit_channel_buffer(&buffer_a, cx_a, [(0..5, "goodbye")]); + undo_channel_buffer(&buffer_a, cx_a); - let b_document = - cx_b.add_model(|cx| ChannelBuffer::for_channel(zed_id, client_b.client().to_owned(), cx)); - let channel_buffer_b = b_document - .update(cx_b, |doc, cx| doc.buffer(cx)) + assert_eq!(channel_buffer_text(&buffer_a, cx_a), "hello, cruel world"); + + let channel_buffer_b = cx_b + .update(|cx| ChannelBuffer::for_channel(zed_id, client_b.client().to_owned(), cx)) .await .unwrap(); - assert_eq!( - channel_buffer_text(&channel_buffer_b, cx_b), - "hello, cruel world" - ); + let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer()); - edit_channel_buffer(&channel_buffer_b, cx_b, [(7..12, "beautiful")]); + assert_eq!(channel_buffer_text(&buffer_b, cx_b), "hello, cruel world"); + + edit_channel_buffer(&buffer_b, cx_b, [(7..12, "beautiful")]); deterministic.run_until_parked(); assert_eq!( - channel_buffer_text(&channel_buffer_a, cx_a), + channel_buffer_text(&buffer_a, cx_a), "hello, beautiful world" ); assert_eq!( - channel_buffer_text(&channel_buffer_b, cx_b), + channel_buffer_text(&buffer_b, cx_b), "hello, beautiful world" ); } diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index baeaae1876..7fb22577f3 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -143,8 +143,10 @@ message Envelope { SetChannelMemberAdmin set_channel_member_admin = 129; RenameChannel rename_channel = 130; - GetChannelBuffer get_channel_buffer = 131; - GetChannelBufferResponse get_channel_buffer_response = 132; + OpenChannelBuffer open_channel_buffer = 131; + OpenChannelBufferResponse open_channel_buffer_response = 132; + UpdateChannelBuffer update_channel_buffer = 133; + CloseChannelBuffer close_channel_buffer = 134; } } @@ -543,6 +545,11 @@ message UpdateBuffer { repeated Operation operations = 3; } +message UpdateChannelBuffer { + uint64 buffer_id = 2; + repeated Operation operations = 3; +} + message UpdateBufferFile { uint64 project_id = 1; uint64 buffer_id = 2; @@ -951,13 +958,18 @@ message RenameChannel { string name = 2; } -message GetChannelBuffer { +message OpenChannelBuffer { uint64 channel_id = 1; } -message GetChannelBufferResponse { - string base_text = 1; - repeated Operation operations = 2; +message OpenChannelBufferResponse { + uint64 buffer_id = 1; + string base_text = 2; + repeated Operation operations = 3; +} + +message CloseChannelBuffer { + uint64 buffer_id = 1; } message RespondToChannelInvite { @@ -1156,7 +1168,6 @@ enum GitStatus { Conflict = 2; } - message BufferState { uint64 id = 1; optional File file = 2; diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 21a491b934..9d71140aa0 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -249,8 +249,10 @@ messages!( (GetPrivateUserInfoResponse, Foreground), (GetChannelMembers, Foreground), (GetChannelMembersResponse, Foreground), - (GetChannelBuffer, Foreground), - (GetChannelBufferResponse, Foreground) + (OpenChannelBuffer, Foreground), + (OpenChannelBufferResponse, Foreground), + (CloseChannelBuffer, Background), + (UpdateChannelBuffer, Foreground) ); request_messages!( @@ -317,7 +319,8 @@ request_messages!( (UpdateParticipantLocation, Ack), (UpdateProject, Ack), (UpdateWorktree, Ack), - (GetChannelBuffer, GetChannelBufferResponse) + (OpenChannelBuffer, OpenChannelBufferResponse), + (CloseChannelBuffer, Ack) ); entity_messages!( @@ -373,6 +376,8 @@ entity_messages!( UpdateDiffBase ); +entity_messages!(buffer_id, UpdateChannelBuffer); + const KIB: usize = 1024; const MIB: usize = KIB * 1024; const MAX_BUFFER_LEN: usize = MIB;