diff --git a/gpui/src/app.rs b/gpui/src/app.rs index e7c0b3f090..e2b1470bc5 100644 --- a/gpui/src/app.rs +++ b/gpui/src/app.rs @@ -1348,10 +1348,6 @@ impl MutableAppContext { AsyncAppContext(self.weak_self.as_ref().unwrap().upgrade().unwrap()) } - pub fn to_background(&self) -> BackgroundAppContext { - // - } - pub fn write_to_clipboard(&self, item: ClipboardItem) { self.platform.write_to_clipboard(item); } diff --git a/zed/src/file_finder.rs b/zed/src/file_finder.rs index 83227f309f..741753cec7 100644 --- a/zed/src/file_finder.rs +++ b/zed/src/file_finder.rs @@ -479,8 +479,13 @@ mod tests { let app_state = cx.read(build_app_state); let (window_id, workspace) = cx.add_window(|cx| { - let mut workspace = - Workspace::new(0, app_state.settings, app_state.language_registry, cx); + let mut workspace = Workspace::new( + 0, + app_state.settings, + app_state.language_registry, + app_state.rpc_client, + cx, + ); workspace.add_worktree(tmp_dir.path(), cx); workspace }); @@ -551,6 +556,7 @@ mod tests { 0, app_state.settings.clone(), app_state.language_registry.clone(), + app_state.rpc_client.clone(), cx, ); workspace.add_worktree(tmp_dir.path(), cx); @@ -614,6 +620,7 @@ mod tests { 0, app_state.settings.clone(), app_state.language_registry.clone(), + app_state.rpc_client.clone(), cx, ); workspace.add_worktree(&file_path, cx); @@ -665,6 +672,7 @@ mod tests { 0, app_state.settings.clone(), app_state.language_registry.clone(), + app_state.rpc_client.clone(), cx, ) }); diff --git a/zed/src/lib.rs b/zed/src/lib.rs index 44f7b89e5e..720a34cc1c 100644 --- a/zed/src/lib.rs +++ b/zed/src/lib.rs @@ -1,8 +1,5 @@ -use futures::Future; -use gpui::MutableAppContext; use rpc_client::RpcClient; use std::sync::Arc; -use zed_rpc::proto::RequestMessage; pub mod assets; pub mod editor; @@ -27,27 +24,6 @@ pub struct AppState { pub rpc_client: Arc, } -impl AppState { - pub async fn on_rpc_request( - &self, - cx: &mut MutableAppContext, - handler: F, - ) where - Req: RequestMessage, - F: 'static + Send + Sync + Fn(Req, &AppState, &mut MutableAppContext) -> Fut, - Fut: 'static + Send + Sync + Future, - { - let app_state = self.clone(); - let cx = cx.to_background(); - app_state - .rpc_client - .on_request(move |req| cx.update(|cx| async move { - handler(req, &app_state, cx) - }) - .await - } -} - pub fn init(cx: &mut gpui::MutableAppContext) { cx.add_global_action("app:quit", quit); } diff --git a/zed/src/main.rs b/zed/src/main.rs index 4fd35e9189..9ac7ce3886 100644 --- a/zed/src/main.rs +++ b/zed/src/main.rs @@ -25,13 +25,13 @@ fn main() { let app_state = AppState { language_registry, settings, - rpc_client: Arc::new(RpcClient::new()), + rpc_client: RpcClient::new(), }; app.run(move |cx| { cx.set_menus(menus::menus(app_state.clone())); zed::init(cx); - workspace::init(cx, &app_state); + workspace::init(cx, app_state.rpc_client.clone()); editor::init(cx); file_finder::init(cx); diff --git a/zed/src/rpc_client.rs b/zed/src/rpc_client.rs index 20ab8373ca..122a36b600 100644 --- a/zed/src/rpc_client.rs +++ b/zed/src/rpc_client.rs @@ -1,7 +1,7 @@ use anyhow::{anyhow, Result}; -use futures::future::{BoxFuture, Either, FutureExt}; +use futures::future::Either; use postage::{ - barrier, oneshot, + barrier, mpsc, oneshot, prelude::{Sink, Stream}, }; use smol::{ @@ -30,87 +30,128 @@ struct RpcConnection { _close_barrier: barrier::Sender, } -type RequestHandler = Box< - dyn Send - + Sync - + Fn(&mut Option, &AtomicU32) -> Option>, ->; type MessageHandler = - Box) -> Option>>; + Box, ConnectionId) -> Option>; + +struct ErasedMessage { + id: u32, + connection_id: ConnectionId, + body: proto::Envelope, +} + +pub struct Message { + connection_id: ConnectionId, + body: Option, +} + +impl From for Message { + fn from(message: ErasedMessage) -> Self { + Self { + connection_id: message.connection_id, + body: T::from_envelope(message.body), + } + } +} + +impl Message { + pub fn connection_id(&self) -> ConnectionId { + self.connection_id + } + + pub fn body(&mut self) -> T { + self.body.take().expect("body already taken") + } +} + +pub struct Request { + id: u32, + connection_id: ConnectionId, + body: Option, +} + +impl From for Request { + fn from(message: ErasedMessage) -> Self { + Self { + id: message.id, + connection_id: message.connection_id, + body: T::from_envelope(message.body), + } + } +} + +impl Request { + pub fn connection_id(&self) -> ConnectionId { + self.connection_id + } + + pub fn body(&mut self) -> T { + self.body.take().expect("body already taken") + } +} pub struct RpcClient { connections: RwLock>>, - request_handlers: RwLock>, - message_handlers: RwLock>, - handler_types: RwLock>, + message_handlers: RwLock, MessageHandler)>>, + handler_types: Mutex>, next_connection_id: AtomicU32, } impl RpcClient { - pub fn new() -> Self { - Self { - request_handlers: Default::default(), + pub fn new() -> Arc { + Arc::new(Self { + connections: Default::default(), message_handlers: Default::default(), handler_types: Default::default(), - connections: Default::default(), next_connection_id: Default::default(), - } + }) } - pub async fn on_request(&self, handler: F) - where - Req: RequestMessage, - F: 'static + Send + Sync + Fn(Req) -> Fut, - Fut: 'static + Send + Sync + Future, - { - if !self.handler_types.write().await.insert(TypeId::of::()) { - panic!("duplicate request handler type"); + pub async fn add_request_handler(&self) -> impl Stream> { + if !self.handler_types.lock().await.insert(TypeId::of::()) { + panic!("duplicate handler type"); } - self.request_handlers - .write() - .await - .push(Box::new(move |envelope, next_message_id| { - if envelope.as_ref().map_or(false, Req::matches_envelope) { + let (tx, rx) = mpsc::channel(256); + self.message_handlers.write().await.push(( + tx, + Box::new(move |envelope, connection_id| { + if envelope.as_ref().map_or(false, T::matches_envelope) { let envelope = Option::take(envelope).unwrap(); - let message_id = next_message_id.fetch_add(1, atomic::Ordering::SeqCst); - let responding_to = envelope.id; - let request = Req::from_envelope(envelope).unwrap(); - Some( - handler(request) - .map(move |response| { - response.into_envelope(message_id, Some(responding_to)) - }) - .boxed(), - ) + Some(ErasedMessage { + id: envelope.id, + connection_id, + body: envelope, + }) } else { None } - })); + }), + )); + rx.map(Request::from) } - pub async fn on_message(&self, handler: F) - where - M: EnvelopedMessage, - F: 'static + Send + Sync + Fn(M) -> Fut, - Fut: 'static + Send + Sync + Future, - { - if !self.handler_types.write().await.insert(TypeId::of::()) { - panic!("duplicate request handler type"); + pub async fn add_message_handler(&self) -> impl Stream> { + if !self.handler_types.lock().await.insert(TypeId::of::()) { + panic!("duplicate handler type"); } - self.message_handlers - .write() - .await - .push(Box::new(move |envelope| { - if envelope.as_ref().map_or(false, M::matches_envelope) { + let (tx, rx) = mpsc::channel(256); + self.message_handlers.write().await.push(( + tx, + Box::new(move |envelope, connection_id| { + if envelope.as_ref().map_or(false, T::matches_envelope) { let envelope = Option::take(envelope).unwrap(); - let request = M::from_envelope(envelope).unwrap(); - Some(handler(request).boxed()) + Some(ErasedMessage { + id: envelope.id, + connection_id, + body: envelope, + }) } else { None } - })); + }), + )); + rx.map(Message::from) } pub async fn add_connection( @@ -167,36 +208,14 @@ impl RpcClient { } else { let mut handled = false; let mut envelope = Some(incoming); - for handler in this.request_handlers.iter() { - if let Some(future) = - handler(&mut envelope, &connection.next_message_id) - { - let response = future.await; - if let Err(error) = connection - .writer - .lock() - .await - .write_message(&response) - .await - { - log::warn!("failed to write response: {}", error); - return; - } + for (tx, handler) in this.message_handlers.read().await.iter() { + if let Some(message) = handler(&mut envelope, connection_id) { + let _ = tx.clone().send(message).await; handled = true; break; } } - if !handled { - for handler in this.message_handlers.iter() { - if let Some(future) = handler(&mut envelope) { - future.await; - handled = true; - break; - } - } - } - if !handled { log::warn!("unhandled message: {:?}", envelope.unwrap().payload); } @@ -281,6 +300,33 @@ impl RpcClient { Ok(()) } } + + pub fn respond( + self: &Arc, + request: Request, + response: T::Response, + ) -> impl Future> { + let this = self.clone(); + async move { + let connection = this + .connections + .read() + .await + .get(&request.connection_id) + .ok_or_else(|| anyhow!("unknown connection: {}", request.connection_id.0))? + .clone(); + let message_id = connection + .next_message_id + .fetch_add(1, atomic::Ordering::SeqCst); + connection + .writer + .lock() + .await + .write_message(&response.into_envelope(message_id, Some(request.id))) + .await?; + Ok(()) + } + } } #[cfg(test)] @@ -304,7 +350,7 @@ mod tests { let (server_conn, _) = listener.accept().await.unwrap(); let mut server_stream = MessageStream::new(server_conn); - let client = Arc::new(RpcClient::new()); + let client = RpcClient::new(); let (connection_id, handler) = client.add_connection(client_conn).await; executor.spawn(handler).detach(); @@ -363,7 +409,7 @@ mod tests { let client_conn = UnixStream::connect(&socket_path).await.unwrap(); let (mut server_conn, _) = listener.accept().await.unwrap(); - let client = Arc::new(RpcClient::new()); + let client = RpcClient::new(); let (connection_id, handler) = client.add_connection(client_conn).await; executor.spawn(handler).detach(); client.disconnect(connection_id).await; @@ -390,7 +436,7 @@ mod tests { let mut client_conn = UnixStream::connect(&socket_path).await.unwrap(); client_conn.close().await.unwrap(); - let client = Arc::new(RpcClient::new()); + let client = RpcClient::new(); let (connection_id, handler) = client.add_connection(client_conn).await; executor.spawn(handler).detach(); let err = client diff --git a/zed/src/test.rs b/zed/src/test.rs index 90ef48e6c0..fca16cf97b 100644 --- a/zed/src/test.rs +++ b/zed/src/test.rs @@ -1,4 +1,6 @@ -use crate::{language::LanguageRegistry, settings, time::ReplicaId, AppState}; +use crate::{ + language::LanguageRegistry, rpc_client::RpcClient, settings, time::ReplicaId, AppState, +}; use ctor::ctor; use gpui::AppContext; use rand::Rng; @@ -150,5 +152,6 @@ pub fn build_app_state(cx: &AppContext) -> AppState { AppState { settings, language_registry, + rpc_client: RpcClient::new(), } } diff --git a/zed/src/util.rs b/zed/src/util.rs index 0fa1081d66..643c07fb70 100644 --- a/zed/src/util.rs +++ b/zed/src/util.rs @@ -1,5 +1,8 @@ +use crate::rpc_client::{Message, Request, RpcClient}; +use postage::prelude::Stream; use rand::prelude::*; -use std::cmp::Ordering; +use std::{cmp::Ordering, future::Future, sync::Arc}; +use zed_rpc::proto; #[derive(Copy, Clone, Eq, PartialEq, Debug, Hash)] pub enum Bias { @@ -53,6 +56,104 @@ where } } +pub trait RequestHandler<'a, R: proto::RequestMessage> { + type Output: 'a + Future>; + + fn handle( + &self, + request: Request, + client: Arc, + cx: &'a mut gpui::AsyncAppContext, + ) -> Self::Output; +} + +impl<'a, R, F, Fut> RequestHandler<'a, R> for F +where + R: proto::RequestMessage, + F: Fn(Request, Arc, &'a mut gpui::AsyncAppContext) -> Fut, + Fut: 'a + Future>, +{ + type Output = Fut; + + fn handle( + &self, + request: Request, + client: Arc, + cx: &'a mut gpui::AsyncAppContext, + ) -> Self::Output { + (self)(request, client, cx) + } +} + +pub trait MessageHandler<'a, M: proto::EnvelopedMessage> { + type Output: 'a + Future>; + + fn handle( + &self, + message: Message, + client: Arc, + cx: &'a mut gpui::AsyncAppContext, + ) -> Self::Output; +} + +impl<'a, M, F, Fut> MessageHandler<'a, M> for F +where + M: proto::EnvelopedMessage, + F: Fn(Message, Arc, &'a mut gpui::AsyncAppContext) -> Fut, + Fut: 'a + Future>, +{ + type Output = Fut; + + fn handle( + &self, + message: Message, + client: Arc, + cx: &'a mut gpui::AsyncAppContext, + ) -> Self::Output { + (self)(message, client, cx) + } +} + +pub fn spawn_request_handler( + handler: H, + client: &Arc, + cx: &mut gpui::MutableAppContext, +) where + H: 'static + for<'a> RequestHandler<'a, R>, + R: proto::RequestMessage, +{ + let client = client.clone(); + let mut requests = smol::block_on(client.add_request_handler::()); + cx.spawn(|mut cx| async move { + while let Some(request) = requests.recv().await { + if let Err(err) = handler.handle(request, client.clone(), &mut cx).await { + log::error!("error handling request: {:?}", err); + } + } + }) + .detach(); +} + +pub fn spawn_message_handler( + handler: H, + client: &Arc, + cx: &mut gpui::MutableAppContext, +) where + H: 'static + for<'a> MessageHandler<'a, M>, + M: proto::EnvelopedMessage, +{ + let client = client.clone(); + let mut messages = smol::block_on(client.add_message_handler::()); + cx.spawn(|mut cx| async move { + while let Some(message) = messages.recv().await { + if let Err(err) = handler.handle(message, client.clone(), &mut cx).await { + log::error!("error handling message: {:?}", err); + } + } + }) + .detach(); +} + pub struct RandomCharIter(T); impl RandomCharIter { diff --git a/zed/src/workspace.rs b/zed/src/workspace.rs index fe49475b42..d053488ac9 100644 --- a/zed/src/workspace.rs +++ b/zed/src/workspace.rs @@ -4,10 +4,10 @@ pub mod pane_group; use crate::{ editor::{Buffer, Editor}, language::LanguageRegistry, - rpc_client::RpcClient, + rpc_client::{Request, RpcClient}, settings::Settings, time::ReplicaId, - util::SurfResultExt as _, + util::{self, SurfResultExt as _}, worktree::{FileHandle, Worktree, WorktreeHandle}, AppState, }; @@ -33,7 +33,7 @@ use std::{ use surf::Url; use zed_rpc::{proto, rest::CreateWorktreeResponse}; -pub fn init(cx: &mut MutableAppContext, rpc_client: &mut RpcClient) { +pub fn init(cx: &mut MutableAppContext, rpc_client: Arc) { cx.add_global_action("workspace:open", open); cx.add_global_action("workspace:open_paths", open_paths); cx.add_action("workspace:save", Workspace::save_active_item); @@ -46,8 +46,7 @@ pub fn init(cx: &mut MutableAppContext, rpc_client: &mut RpcClient) { ]); pane::init(cx); - let cx = cx.to_async(); - rpc_client.on_request(move |req| handle_open_buffer(req, cx)); + util::spawn_request_handler(handle_open_buffer, &rpc_client, cx); } pub struct OpenParams { @@ -100,6 +99,7 @@ fn open_paths(params: &OpenParams, cx: &mut MutableAppContext) { 0, params.app_state.settings.clone(), params.app_state.language_registry.clone(), + params.app_state.rpc_client.clone(), cx, ); let open_paths = view.open_paths(¶ms.paths, cx); @@ -108,8 +108,19 @@ fn open_paths(params: &OpenParams, cx: &mut MutableAppContext) { }); } -fn handle_open_buffer(request: zed_rpc::proto::OpenBuffer, cx: AsyncAppContext) { - // +async fn handle_open_buffer( + mut request: Request, + rpc_client: Arc, + cx: &mut AsyncAppContext, +) -> anyhow::Result<()> { + let body = request.body(); + dbg!(body.path); + rpc_client + .respond(request, proto::OpenBufferResponse { buffer: None }) + .await?; + + dbg!(cx.read(|app| app.root_view_id(1))); + Ok(()) } pub trait Item: Entity + Sized { @@ -302,6 +313,7 @@ pub struct State { pub struct Workspace { pub settings: watch::Receiver, language_registry: Arc, + rpc_client: Arc, modal: Option, center: PaneGroup, panes: Vec>, @@ -320,6 +332,7 @@ impl Workspace { replica_id: ReplicaId, settings: watch::Receiver, language_registry: Arc, + rpc_client: Arc, cx: &mut ViewContext, ) -> Self { let pane = cx.add_view(|_| Pane::new(settings.clone())); @@ -336,6 +349,7 @@ impl Workspace { active_pane: pane.clone(), settings, language_registry, + rpc_client, replica_id, worktrees: Default::default(), items: Default::default(), @@ -651,6 +665,7 @@ impl Workspace { } fn share_worktree(&mut self, _: &(), cx: &mut ViewContext) { + let rpc_client = self.rpc_client.clone(); let zed_url = std::env::var("ZED_SERVER_URL").unwrap_or("https://zed.dev".to_string()); let executor = cx.background_executor().clone(); @@ -677,7 +692,6 @@ impl Workspace { // a TLS stream using `native-tls`. let stream = smol::net::TcpStream::connect(rpc_address).await?; - let rpc_client = Arc::new(RpcClient::new()); let (connection_id, handler) = rpc_client.add_connection(stream).await; executor.spawn(handler).detach(); @@ -942,7 +956,7 @@ mod tests { fn test_open_paths_action(cx: &mut gpui::MutableAppContext) { let app_state = build_app_state(cx.as_ref()); - init(cx); + init(cx, app_state.rpc_client.clone()); let dir = temp_tree(json!({ "a": { @@ -1010,8 +1024,13 @@ mod tests { let app_state = cx.read(build_app_state); let (_, workspace) = cx.add_window(|cx| { - let mut workspace = - Workspace::new(0, app_state.settings, app_state.language_registry, cx); + let mut workspace = Workspace::new( + 0, + app_state.settings, + app_state.language_registry, + app_state.rpc_client, + cx, + ); workspace.add_worktree(dir.path(), cx); workspace }); @@ -1114,8 +1133,13 @@ mod tests { let app_state = cx.read(build_app_state); let (_, workspace) = cx.add_window(|cx| { - let mut workspace = - Workspace::new(0, app_state.settings, app_state.language_registry, cx); + let mut workspace = Workspace::new( + 0, + app_state.settings, + app_state.language_registry, + app_state.rpc_client, + cx, + ); workspace.add_worktree(dir1.path(), cx); workspace }); @@ -1183,8 +1207,13 @@ mod tests { let app_state = cx.read(build_app_state); let (window_id, workspace) = cx.add_window(|cx| { - let mut workspace = - Workspace::new(0, app_state.settings, app_state.language_registry, cx); + let mut workspace = Workspace::new( + 0, + app_state.settings, + app_state.language_registry, + app_state.rpc_client, + cx, + ); workspace.add_worktree(dir.path(), cx); workspace }); @@ -1227,8 +1256,13 @@ mod tests { let dir = TempDir::new("test-new-file").unwrap(); let app_state = cx.read(build_app_state); let (_, workspace) = cx.add_window(|cx| { - let mut workspace = - Workspace::new(0, app_state.settings, app_state.language_registry, cx); + let mut workspace = Workspace::new( + 0, + app_state.settings, + app_state.language_registry, + app_state.rpc_client, + cx, + ); workspace.add_worktree(dir.path(), cx); workspace }); @@ -1328,8 +1362,13 @@ mod tests { let app_state = cx.read(build_app_state); let (window_id, workspace) = cx.add_window(|cx| { - let mut workspace = - Workspace::new(0, app_state.settings, app_state.language_registry, cx); + let mut workspace = Workspace::new( + 0, + app_state.settings, + app_state.language_registry, + app_state.rpc_client, + cx, + ); workspace.add_worktree(dir.path(), cx); workspace });