Move RpcClient to zed_rpc and rename it to Peer

Co-Authored-By: Nathan Sobo <nathan@zed.dev>
This commit is contained in:
Antonio Scandurra 2021-06-16 18:01:26 +02:00
parent 5e4872fdf8
commit 9de4d73ffb
13 changed files with 214 additions and 222 deletions

10
Cargo.lock generated
View file

@ -197,9 +197,9 @@ dependencies = [
[[package]] [[package]]
name = "async-lock" name = "async-lock"
version = "2.3.0" version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1996609732bde4a9988bc42125f55f2af5f3c36370e27c778d5191a4a1b63bfb" checksum = "e6a8ea61bf9947a1007c5cada31e647dbc77b103c679858150003ba697ea798b"
dependencies = [ dependencies = [
"event-listener", "event-listener",
] ]
@ -4353,9 +4353,11 @@ name = "zed-rpc"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-lock",
"base64 0.13.0", "base64 0.13.0",
"futures-io", "futures",
"futures-lite", "log",
"postage",
"prost 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "prost 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"prost-build", "prost-build",
"rand 0.8.3", "rand 0.8.3",

View file

@ -6,16 +6,18 @@ version = "0.1.0"
[dependencies] [dependencies]
anyhow = "1.0" anyhow = "1.0"
async-lock = "2.4"
base64 = "0.13" base64 = "0.13"
futures-io = "0.3" futures = "0.3"
futures-lite = "1" log = "0.4"
postage = { version="0.4.1", features=["futures-traits"] }
prost = "0.7" prost = "0.7"
rsa = "0.4" rsa = "0.4"
rand = "0.8" rand = "0.8"
serde = { version = "1", features = ["derive"] } serde = { version="1", features=["derive"] }
[build-dependencies] [build-dependencies]
prost-build = { git = "https://github.com/sfackler/prost", rev = "082f3e65874fe91382e72482863896b7b4db3728" } prost-build = { git="https://github.com/sfackler/prost", rev="082f3e65874fe91382e72482863896b7b4db3728" }
[dev-dependencies] [dev-dependencies]
smol = "1.2.5" smol = "1.2.5"

View file

@ -1,3 +1,6 @@
pub mod auth; pub mod auth;
mod peer;
pub mod proto; pub mod proto;
pub mod rest; pub mod rest;
pub use peer::{ConnectionId, Peer, TypedEnvelope};

View file

@ -1,32 +1,31 @@
use crate::proto::{self, EnvelopedMessage, MessageStream, RequestMessage};
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use async_lock::{Mutex, RwLock};
use futures::{ use futures::{
future::{BoxFuture, Either}, future::{BoxFuture, Either},
FutureExt, AsyncRead, AsyncWrite, FutureExt,
}; };
use postage::{ use postage::{
barrier, mpsc, oneshot, barrier, mpsc, oneshot,
prelude::{Sink, Stream}, prelude::{Sink, Stream},
}; };
use smol::{
io::BoxedWriter,
lock::{Mutex, RwLock},
prelude::{AsyncRead, AsyncWrite},
};
use std::{ use std::{
any::TypeId, any::TypeId,
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
future::Future, future::Future,
pin::Pin,
sync::{ sync::{
atomic::{self, AtomicU32}, atomic::{self, AtomicU32},
Arc, Arc,
}, },
}; };
use zed_rpc::proto::{self, EnvelopedMessage, MessageStream, RequestMessage};
type BoxedWriter = Pin<Box<dyn AsyncWrite + 'static + Send>>;
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct ConnectionId(u32); pub struct ConnectionId(u32);
struct RpcConnection { struct Connection {
writer: Mutex<MessageStream<BoxedWriter>>, writer: Mutex<MessageStream<BoxedWriter>>,
response_channels: Mutex<HashMap<u32, oneshot::Sender<proto::Envelope>>>, response_channels: Mutex<HashMap<u32, oneshot::Sender<proto::Envelope>>>,
next_message_id: AtomicU32, next_message_id: AtomicU32,
@ -52,14 +51,14 @@ impl<T> TypedEnvelope<T> {
} }
} }
pub struct RpcClient { pub struct Peer {
connections: RwLock<HashMap<ConnectionId, Arc<RpcConnection>>>, connections: RwLock<HashMap<ConnectionId, Arc<Connection>>>,
message_handlers: RwLock<Vec<MessageHandler>>, message_handlers: RwLock<Vec<MessageHandler>>,
handler_types: Mutex<HashSet<TypeId>>, handler_types: Mutex<HashSet<TypeId>>,
next_connection_id: AtomicU32, next_connection_id: AtomicU32,
} }
impl RpcClient { impl Peer {
pub fn new() -> Arc<Self> { pub fn new() -> Arc<Self> {
Arc::new(Self { Arc::new(Self {
connections: Default::default(), connections: Default::default(),
@ -107,16 +106,15 @@ impl RpcClient {
conn: Conn, conn: Conn,
) -> (ConnectionId, impl Future<Output = ()>) ) -> (ConnectionId, impl Future<Output = ()>)
where where
Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static, Conn: Clone + AsyncRead + AsyncWrite + Unpin + Send + 'static,
{ {
let connection_id = ConnectionId( let connection_id = ConnectionId(
self.next_connection_id self.next_connection_id
.fetch_add(1, atomic::Ordering::SeqCst), .fetch_add(1, atomic::Ordering::SeqCst),
); );
let (close_tx, mut close_rx) = barrier::channel(); let (close_tx, mut close_rx) = barrier::channel();
let (conn_rx, conn_tx) = smol::io::split(conn); let connection = Arc::new(Connection {
let connection = Arc::new(RpcConnection { writer: Mutex::new(MessageStream::new(Box::pin(conn.clone()))),
writer: Mutex::new(MessageStream::new(Box::pin(conn_tx))),
response_channels: Default::default(), response_channels: Default::default(),
next_message_id: Default::default(), next_message_id: Default::default(),
_close_barrier: close_tx, _close_barrier: close_tx,
@ -130,12 +128,12 @@ impl RpcClient {
let this = self.clone(); let this = self.clone();
let handler_future = async move { let handler_future = async move {
let closed = close_rx.recv(); let closed = close_rx.recv();
smol::pin!(closed); futures::pin_mut!(closed);
let mut stream = MessageStream::new(conn_rx); let mut stream = MessageStream::new(conn);
loop { loop {
let read_message = stream.read_message(); let read_message = stream.read_message();
smol::pin!(read_message); futures::pin_mut!(read_message);
match futures::future::select(read_message, &mut closed).await { match futures::future::select(read_message, &mut closed).await {
Either::Left((Ok(incoming), _)) => { Either::Left((Ok(incoming), _)) => {
@ -277,144 +275,144 @@ impl RpcClient {
} }
} }
#[cfg(test)] // #[cfg(test)]
mod tests { // mod tests {
use super::*; // use super::*;
use smol::{ // use smol::{
future::poll_once, // future::poll_once,
io::AsyncWriteExt, // io::AsyncWriteExt,
net::unix::{UnixListener, UnixStream}, // net::unix::{UnixListener, UnixStream},
}; // };
use std::{future::Future, io}; // use std::{future::Future, io};
use tempdir::TempDir; // use tempdir::TempDir;
#[gpui::test] // #[gpui::test]
async fn test_request_response(cx: gpui::TestAppContext) { // async fn test_request_response(cx: gpui::TestAppContext) {
let executor = cx.read(|app| app.background_executor().clone()); // let executor = cx.read(|app| app.background_executor().clone());
let socket_dir_path = TempDir::new("request-response").unwrap(); // let socket_dir_path = TempDir::new("request-response").unwrap();
let socket_path = socket_dir_path.path().join(".sock"); // let socket_path = socket_dir_path.path().join(".sock");
let listener = UnixListener::bind(&socket_path).unwrap(); // let listener = UnixListener::bind(&socket_path).unwrap();
let client_conn = UnixStream::connect(&socket_path).await.unwrap(); // let client_conn = UnixStream::connect(&socket_path).await.unwrap();
let (server_conn, _) = listener.accept().await.unwrap(); // let (server_conn, _) = listener.accept().await.unwrap();
let mut server_stream = MessageStream::new(server_conn); // let mut server_stream = MessageStream::new(server_conn);
let client = RpcClient::new(); // let client = Peer::new();
let (connection_id, handler) = client.add_connection(client_conn).await; // let (connection_id, handler) = client.add_connection(client_conn).await;
executor.spawn(handler).detach(); // executor.spawn(handler).detach();
let client_req = client.request( // let client_req = client.request(
connection_id, // connection_id,
proto::Auth { // proto::Auth {
user_id: 42, // user_id: 42,
access_token: "token".to_string(), // access_token: "token".to_string(),
}, // },
); // );
smol::pin!(client_req); // smol::pin!(client_req);
let server_req = send_recv(&mut client_req, server_stream.read_message()) // let server_req = send_recv(&mut client_req, server_stream.read_message())
.await // .await
.unwrap(); // .unwrap();
assert_eq!( // assert_eq!(
server_req.payload, // server_req.payload,
Some(proto::envelope::Payload::Auth(proto::Auth { // Some(proto::envelope::Payload::Auth(proto::Auth {
user_id: 42, // user_id: 42,
access_token: "token".to_string() // access_token: "token".to_string()
})) // }))
); // );
// Respond to another request to ensure requests are properly matched up. // // Respond to another request to ensure requests are properly matched up.
server_stream // server_stream
.write_message( // .write_message(
&proto::AuthResponse { // &proto::AuthResponse {
credentials_valid: false, // credentials_valid: false,
} // }
.into_envelope(1000, Some(999)), // .into_envelope(1000, Some(999)),
) // )
.await // .await
.unwrap(); // .unwrap();
server_stream // server_stream
.write_message( // .write_message(
&proto::AuthResponse { // &proto::AuthResponse {
credentials_valid: true, // credentials_valid: true,
} // }
.into_envelope(1001, Some(server_req.id)), // .into_envelope(1001, Some(server_req.id)),
) // )
.await // .await
.unwrap(); // .unwrap();
assert_eq!( // assert_eq!(
client_req.await.unwrap(), // client_req.await.unwrap(),
proto::AuthResponse { // proto::AuthResponse {
credentials_valid: true // credentials_valid: true
} // }
); // );
} // }
#[gpui::test] // #[gpui::test]
async fn test_disconnect(cx: gpui::TestAppContext) { // async fn test_disconnect(cx: gpui::TestAppContext) {
let executor = cx.read(|app| app.background_executor().clone()); // let executor = cx.read(|app| app.background_executor().clone());
let socket_dir_path = TempDir::new("drop-client").unwrap(); // let socket_dir_path = TempDir::new("drop-client").unwrap();
let socket_path = socket_dir_path.path().join(".sock"); // let socket_path = socket_dir_path.path().join(".sock");
let listener = UnixListener::bind(&socket_path).unwrap(); // let listener = UnixListener::bind(&socket_path).unwrap();
let client_conn = UnixStream::connect(&socket_path).await.unwrap(); // let client_conn = UnixStream::connect(&socket_path).await.unwrap();
let (mut server_conn, _) = listener.accept().await.unwrap(); // let (mut server_conn, _) = listener.accept().await.unwrap();
let client = RpcClient::new(); // let client = Peer::new();
let (connection_id, handler) = client.add_connection(client_conn).await; // let (connection_id, handler) = client.add_connection(client_conn).await;
executor.spawn(handler).detach(); // executor.spawn(handler).detach();
client.disconnect(connection_id).await; // client.disconnect(connection_id).await;
// Try sending an empty payload over and over, until the client is dropped and hangs up. // // Try sending an empty payload over and over, until the client is dropped and hangs up.
loop { // loop {
match server_conn.write(&[]).await { // match server_conn.write(&[]).await {
Ok(_) => {} // Ok(_) => {}
Err(err) => { // Err(err) => {
if err.kind() == io::ErrorKind::BrokenPipe { // if err.kind() == io::ErrorKind::BrokenPipe {
break; // break;
} // }
} // }
} // }
} // }
} // }
#[gpui::test] // #[gpui::test]
async fn test_io_error(cx: gpui::TestAppContext) { // async fn test_io_error(cx: gpui::TestAppContext) {
let executor = cx.read(|app| app.background_executor().clone()); // let executor = cx.read(|app| app.background_executor().clone());
let socket_dir_path = TempDir::new("io-error").unwrap(); // let socket_dir_path = TempDir::new("io-error").unwrap();
let socket_path = socket_dir_path.path().join(".sock"); // let socket_path = socket_dir_path.path().join(".sock");
let _listener = UnixListener::bind(&socket_path).unwrap(); // let _listener = UnixListener::bind(&socket_path).unwrap();
let mut client_conn = UnixStream::connect(&socket_path).await.unwrap(); // let mut client_conn = UnixStream::connect(&socket_path).await.unwrap();
client_conn.close().await.unwrap(); // client_conn.close().await.unwrap();
let client = RpcClient::new(); // let client = Peer::new();
let (connection_id, handler) = client.add_connection(client_conn).await; // let (connection_id, handler) = client.add_connection(client_conn).await;
executor.spawn(handler).detach(); // executor.spawn(handler).detach();
let err = client // let err = client
.request( // .request(
connection_id, // connection_id,
proto::Auth { // proto::Auth {
user_id: 42, // user_id: 42,
access_token: "token".to_string(), // access_token: "token".to_string(),
}, // },
) // )
.await // .await
.unwrap_err(); // .unwrap_err();
assert_eq!( // assert_eq!(
err.downcast_ref::<io::Error>().unwrap().kind(), // err.downcast_ref::<io::Error>().unwrap().kind(),
io::ErrorKind::BrokenPipe // io::ErrorKind::BrokenPipe
); // );
} // }
async fn send_recv<S, R, O>(mut sender: S, receiver: R) -> O // async fn send_recv<S, R, O>(mut sender: S, receiver: R) -> O
where // where
S: Unpin + Future, // S: Unpin + Future,
R: Future<Output = O>, // R: Future<Output = O>,
{ // {
smol::pin!(receiver); // smol::pin!(receiver);
loop { // loop {
poll_once(&mut sender).await; // poll_once(&mut sender).await;
match poll_once(&mut receiver).await { // match poll_once(&mut receiver).await {
Some(message) => break message, // Some(message) => break message,
None => continue, // None => continue,
} // }
} // }
} // }
} // }

View file

@ -1,5 +1,4 @@
use futures_io::{AsyncRead, AsyncWrite}; use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt as _};
use futures_lite::{AsyncReadExt, AsyncWriteExt as _};
use prost::Message; use prost::Message;
use std::{convert::TryInto, io}; use std::{convert::TryInto, io};
@ -97,7 +96,7 @@ where
T: AsyncRead + Unpin, T: AsyncRead + Unpin,
{ {
/// Read a protobuf message of the given type from the stream. /// Read a protobuf message of the given type from the stream.
pub async fn read_message(&mut self) -> futures_io::Result<Envelope> { pub async fn read_message(&mut self) -> io::Result<Envelope> {
let mut delimiter_buf = [0; 4]; let mut delimiter_buf = [0; 4];
self.byte_stream.read_exact(&mut delimiter_buf).await?; self.byte_stream.read_exact(&mut delimiter_buf).await?;
let message_len = u32::from_be_bytes(delimiter_buf) as usize; let message_len = u32::from_be_bytes(delimiter_buf) as usize;

View file

@ -20,9 +20,9 @@ crossbeam-channel = "0.5.0"
ctor = "0.1.20" ctor = "0.1.20"
dirs = "3.0" dirs = "3.0"
easy-parallel = "3.1.0" easy-parallel = "3.1.0"
fsevent = { path = "../fsevent" } fsevent = { path="../fsevent" }
futures = "0.3" futures = "0.3"
gpui = { path = "../gpui" } gpui = { path="../gpui" }
http-auth-basic = "0.1.3" http-auth-basic = "0.1.3"
ignore = "0.4" ignore = "0.4"
lazy_static = "1.4.0" lazy_static = "1.4.0"
@ -30,15 +30,15 @@ libc = "0.2"
log = "0.4" log = "0.4"
num_cpus = "1.13.0" num_cpus = "1.13.0"
parking_lot = "0.11.1" parking_lot = "0.11.1"
postage = { version = "0.4.1", features = ["futures-traits"] } postage = { version="0.4.1", features=["futures-traits"] }
rand = "0.8.3" rand = "0.8.3"
rsa = "0.4" rsa = "0.4"
rust-embed = "5.9.0" rust-embed = "5.9.0"
seahash = "4.1" seahash = "4.1"
serde = { version = "1", features = ["derive"] } serde = { version="1", features=["derive"] }
similar = "1.3" similar = "1.3"
simplelog = "0.9" simplelog = "0.9"
smallvec = { version = "1.6", features = ["union"] } smallvec = { version="1.6", features=["union"] }
smol = "1.2.5" smol = "1.2.5"
surf = "2.2" surf = "2.2"
tiny_http = "0.8" tiny_http = "0.8"
@ -46,12 +46,12 @@ toml = "0.5"
tree-sitter = "0.19.5" tree-sitter = "0.19.5"
tree-sitter-rust = "0.19.0" tree-sitter-rust = "0.19.0"
url = "2.2" url = "2.2"
zed-rpc = { path = "../zed-rpc" } zed-rpc = { path="../zed-rpc" }
[dev-dependencies] [dev-dependencies]
cargo-bundle = "0.5.0" cargo-bundle = "0.5.0"
env_logger = "0.8" env_logger = "0.8"
serde_json = { version = "1.0.64", features = ["preserve_order"] } serde_json = { version="1.0.64", features=["preserve_order"] }
tempdir = "0.3.7" tempdir = "0.3.7"
unindent = "0.1.7" unindent = "0.1.7"

View file

@ -483,7 +483,7 @@ mod tests {
0, 0,
app_state.settings, app_state.settings,
app_state.language_registry, app_state.language_registry,
app_state.rpc_client, app_state.rpc,
cx, cx,
); );
workspace.add_worktree(tmp_dir.path(), cx); workspace.add_worktree(tmp_dir.path(), cx);
@ -556,7 +556,7 @@ mod tests {
0, 0,
app_state.settings.clone(), app_state.settings.clone(),
app_state.language_registry.clone(), app_state.language_registry.clone(),
app_state.rpc_client.clone(), app_state.rpc.clone(),
cx, cx,
); );
workspace.add_worktree(tmp_dir.path(), cx); workspace.add_worktree(tmp_dir.path(), cx);
@ -620,7 +620,7 @@ mod tests {
0, 0,
app_state.settings.clone(), app_state.settings.clone(),
app_state.language_registry.clone(), app_state.language_registry.clone(),
app_state.rpc_client.clone(), app_state.rpc.clone(),
cx, cx,
); );
workspace.add_worktree(&file_path, cx); workspace.add_worktree(&file_path, cx);
@ -672,7 +672,7 @@ mod tests {
0, 0,
app_state.settings.clone(), app_state.settings.clone(),
app_state.language_registry.clone(), app_state.language_registry.clone(),
app_state.rpc_client.clone(), app_state.rpc.clone(),
cx, cx,
) )
}); });

View file

@ -1,4 +1,3 @@
use rpc_client::RpcClient;
use std::sync::Arc; use std::sync::Arc;
pub mod assets; pub mod assets;
@ -7,7 +6,6 @@ pub mod file_finder;
pub mod language; pub mod language;
pub mod menus; pub mod menus;
mod operation_queue; mod operation_queue;
pub mod rpc_client;
pub mod settings; pub mod settings;
mod sum_tree; mod sum_tree;
#[cfg(test)] #[cfg(test)]
@ -21,7 +19,7 @@ mod worktree;
pub struct AppState { pub struct AppState {
pub settings: postage::watch::Receiver<settings::Settings>, pub settings: postage::watch::Receiver<settings::Settings>,
pub language_registry: std::sync::Arc<language::LanguageRegistry>, pub language_registry: std::sync::Arc<language::LanguageRegistry>,
pub rpc_client: Arc<RpcClient>, pub rpc: Arc<zed_rpc::Peer>,
} }
pub fn init(cx: &mut gpui::MutableAppContext) { pub fn init(cx: &mut gpui::MutableAppContext) {

View file

@ -6,9 +6,7 @@ use log::LevelFilter;
use simplelog::SimpleLogger; use simplelog::SimpleLogger;
use std::{fs, path::PathBuf, sync::Arc}; use std::{fs, path::PathBuf, sync::Arc};
use zed::{ use zed::{
self, assets, editor, file_finder, language, menus, self, assets, editor, file_finder, language, menus, settings,
rpc_client::RpcClient,
settings,
workspace::{self, OpenParams}, workspace::{self, OpenParams},
AppState, AppState,
}; };
@ -25,13 +23,13 @@ fn main() {
let app_state = AppState { let app_state = AppState {
language_registry, language_registry,
settings, settings,
rpc_client: RpcClient::new(), rpc: zed_rpc::Peer::new(),
}; };
app.run(move |cx| { app.run(move |cx| {
cx.set_menus(menus::menus(app_state.clone())); cx.set_menus(menus::menus(app_state.clone()));
zed::init(cx); zed::init(cx);
workspace::init(cx, app_state.rpc_client.clone()); workspace::init(cx, app_state.rpc.clone());
editor::init(cx); editor::init(cx);
file_finder::init(cx); file_finder::init(cx);

View file

@ -1,6 +1,4 @@
use crate::{ use crate::{language::LanguageRegistry, settings, time::ReplicaId, AppState};
language::LanguageRegistry, rpc_client::RpcClient, settings, time::ReplicaId, AppState,
};
use ctor::ctor; use ctor::ctor;
use gpui::AppContext; use gpui::AppContext;
use rand::Rng; use rand::Rng;
@ -152,6 +150,6 @@ pub fn build_app_state(cx: &AppContext) -> AppState {
AppState { AppState {
settings, settings,
language_registry, language_registry,
rpc_client: RpcClient::new(), rpc: zed_rpc::Peer::new(),
} }
} }

View file

@ -1,8 +1,7 @@
use crate::rpc_client::{RpcClient, TypedEnvelope};
use postage::prelude::Stream; use postage::prelude::Stream;
use rand::prelude::*; use rand::prelude::*;
use std::{cmp::Ordering, future::Future, sync::Arc}; use std::{cmp::Ordering, future::Future, sync::Arc};
use zed_rpc::proto; use zed_rpc::{proto, Peer, TypedEnvelope};
#[derive(Copy, Clone, Eq, PartialEq, Debug, Hash)] #[derive(Copy, Clone, Eq, PartialEq, Debug, Hash)]
pub enum Bias { pub enum Bias {
@ -62,7 +61,7 @@ pub trait MessageHandler<'a, M: proto::EnvelopedMessage> {
fn handle( fn handle(
&self, &self,
message: TypedEnvelope<M>, message: TypedEnvelope<M>,
client: Arc<RpcClient>, rpc: Arc<Peer>,
cx: &'a mut gpui::AsyncAppContext, cx: &'a mut gpui::AsyncAppContext,
) -> Self::Output; ) -> Self::Output;
} }
@ -70,7 +69,7 @@ pub trait MessageHandler<'a, M: proto::EnvelopedMessage> {
impl<'a, M, F, Fut> MessageHandler<'a, M> for F impl<'a, M, F, Fut> MessageHandler<'a, M> for F
where where
M: proto::EnvelopedMessage, M: proto::EnvelopedMessage,
F: Fn(TypedEnvelope<M>, Arc<RpcClient>, &'a mut gpui::AsyncAppContext) -> Fut, F: Fn(TypedEnvelope<M>, Arc<Peer>, &'a mut gpui::AsyncAppContext) -> Fut,
Fut: 'a + Future<Output = anyhow::Result<()>>, Fut: 'a + Future<Output = anyhow::Result<()>>,
{ {
type Output = Fut; type Output = Fut;
@ -78,23 +77,23 @@ where
fn handle( fn handle(
&self, &self,
message: TypedEnvelope<M>, message: TypedEnvelope<M>,
client: Arc<RpcClient>, rpc: Arc<Peer>,
cx: &'a mut gpui::AsyncAppContext, cx: &'a mut gpui::AsyncAppContext,
) -> Self::Output { ) -> Self::Output {
(self)(message, client, cx) (self)(message, rpc, cx)
} }
} }
pub fn handle_messages<H, M>(handler: H, client: &Arc<RpcClient>, cx: &mut gpui::MutableAppContext) pub fn handle_messages<H, M>(handler: H, rpc: &Arc<Peer>, cx: &mut gpui::MutableAppContext)
where where
H: 'static + for<'a> MessageHandler<'a, M>, H: 'static + for<'a> MessageHandler<'a, M>,
M: proto::EnvelopedMessage, M: proto::EnvelopedMessage,
{ {
let client = client.clone(); let rpc = rpc.clone();
let mut messages = smol::block_on(client.add_message_handler::<M>()); let mut messages = smol::block_on(rpc.add_message_handler::<M>());
cx.spawn(|mut cx| async move { cx.spawn(|mut cx| async move {
while let Some(message) = messages.recv().await { while let Some(message) = messages.recv().await {
if let Err(err) = handler.handle(message, client.clone(), &mut cx).await { if let Err(err) = handler.handle(message, rpc.clone(), &mut cx).await {
log::error!("error handling message: {:?}", err); log::error!("error handling message: {:?}", err);
} }
} }

View file

@ -4,7 +4,6 @@ pub mod pane_group;
use crate::{ use crate::{
editor::{Buffer, Editor}, editor::{Buffer, Editor},
language::LanguageRegistry, language::LanguageRegistry,
rpc_client::{RpcClient, TypedEnvelope},
settings::Settings, settings::Settings,
time::ReplicaId, time::ReplicaId,
util::{self, SurfResultExt as _}, util::{self, SurfResultExt as _},
@ -31,9 +30,9 @@ use std::{
time::Duration, time::Duration,
}; };
use surf::Url; use surf::Url;
use zed_rpc::{proto, rest::CreateWorktreeResponse}; use zed_rpc::{proto, rest::CreateWorktreeResponse, Peer, TypedEnvelope};
pub fn init(cx: &mut MutableAppContext, rpc_client: Arc<RpcClient>) { pub fn init(cx: &mut MutableAppContext, rpc: Arc<Peer>) {
cx.add_global_action("workspace:open", open); cx.add_global_action("workspace:open", open);
cx.add_global_action("workspace:open_paths", open_paths); cx.add_global_action("workspace:open_paths", open_paths);
cx.add_action("workspace:save", Workspace::save_active_item); cx.add_action("workspace:save", Workspace::save_active_item);
@ -46,7 +45,7 @@ pub fn init(cx: &mut MutableAppContext, rpc_client: Arc<RpcClient>) {
]); ]);
pane::init(cx); pane::init(cx);
util::handle_messages(handle_open_buffer, &rpc_client, cx); util::handle_messages(handle_open_buffer, &rpc, cx);
} }
pub struct OpenParams { pub struct OpenParams {
@ -99,7 +98,7 @@ fn open_paths(params: &OpenParams, cx: &mut MutableAppContext) {
0, 0,
params.app_state.settings.clone(), params.app_state.settings.clone(),
params.app_state.language_registry.clone(), params.app_state.language_registry.clone(),
params.app_state.rpc_client.clone(), params.app_state.rpc.clone(),
cx, cx,
); );
let open_paths = view.open_paths(&params.paths, cx); let open_paths = view.open_paths(&params.paths, cx);
@ -110,13 +109,12 @@ fn open_paths(params: &OpenParams, cx: &mut MutableAppContext) {
async fn handle_open_buffer( async fn handle_open_buffer(
request: TypedEnvelope<proto::OpenBuffer>, request: TypedEnvelope<proto::OpenBuffer>,
rpc_client: Arc<RpcClient>, rpc: Arc<Peer>,
cx: &mut AsyncAppContext, cx: &mut AsyncAppContext,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let payload = request.payload(); let payload = request.payload();
dbg!(&payload.path); dbg!(&payload.path);
rpc_client rpc.respond(request, proto::OpenBufferResponse { buffer: None })
.respond(request, proto::OpenBufferResponse { buffer: None })
.await?; .await?;
dbg!(cx.read(|app| app.root_view_id(1))); dbg!(cx.read(|app| app.root_view_id(1)));
@ -313,7 +311,7 @@ pub struct State {
pub struct Workspace { pub struct Workspace {
pub settings: watch::Receiver<Settings>, pub settings: watch::Receiver<Settings>,
language_registry: Arc<LanguageRegistry>, language_registry: Arc<LanguageRegistry>,
rpc_client: Arc<RpcClient>, rpc: Arc<Peer>,
modal: Option<AnyViewHandle>, modal: Option<AnyViewHandle>,
center: PaneGroup, center: PaneGroup,
panes: Vec<ViewHandle<Pane>>, panes: Vec<ViewHandle<Pane>>,
@ -332,7 +330,7 @@ impl Workspace {
replica_id: ReplicaId, replica_id: ReplicaId,
settings: watch::Receiver<Settings>, settings: watch::Receiver<Settings>,
language_registry: Arc<LanguageRegistry>, language_registry: Arc<LanguageRegistry>,
rpc_client: Arc<RpcClient>, rpc: Arc<Peer>,
cx: &mut ViewContext<Self>, cx: &mut ViewContext<Self>,
) -> Self { ) -> Self {
let pane = cx.add_view(|_| Pane::new(settings.clone())); let pane = cx.add_view(|_| Pane::new(settings.clone()));
@ -349,7 +347,7 @@ impl Workspace {
active_pane: pane.clone(), active_pane: pane.clone(),
settings, settings,
language_registry, language_registry,
rpc_client, rpc,
replica_id, replica_id,
worktrees: Default::default(), worktrees: Default::default(),
items: Default::default(), items: Default::default(),
@ -665,7 +663,7 @@ impl Workspace {
} }
fn share_worktree(&mut self, _: &(), cx: &mut ViewContext<Self>) { fn share_worktree(&mut self, _: &(), cx: &mut ViewContext<Self>) {
let rpc_client = self.rpc_client.clone(); let rpc = self.rpc.clone();
let zed_url = std::env::var("ZED_SERVER_URL").unwrap_or("https://zed.dev".to_string()); let zed_url = std::env::var("ZED_SERVER_URL").unwrap_or("https://zed.dev".to_string());
let executor = cx.background_executor().clone(); let executor = cx.background_executor().clone();
@ -692,10 +690,10 @@ impl Workspace {
// a TLS stream using `native-tls`. // a TLS stream using `native-tls`.
let stream = smol::net::TcpStream::connect(rpc_address).await?; let stream = smol::net::TcpStream::connect(rpc_address).await?;
let (connection_id, handler) = rpc_client.add_connection(stream).await; let (connection_id, handler) = rpc.add_connection(stream).await;
executor.spawn(handler).detach(); executor.spawn(handler).detach();
let auth_response = rpc_client let auth_response = rpc
.request( .request(
connection_id, connection_id,
proto::Auth { proto::Auth {
@ -710,9 +708,7 @@ impl Workspace {
let share_task = this.update(&mut cx, |this, cx| { let share_task = this.update(&mut cx, |this, cx| {
let worktree = this.worktrees.iter().next()?; let worktree = this.worktrees.iter().next()?;
Some(worktree.update(cx, |worktree, cx| { Some(worktree.update(cx, |worktree, cx| worktree.share(rpc, connection_id, cx)))
worktree.share(rpc_client, connection_id, cx)
}))
}); });
if let Some(share_task) = share_task { if let Some(share_task) = share_task {
@ -956,7 +952,7 @@ mod tests {
fn test_open_paths_action(cx: &mut gpui::MutableAppContext) { fn test_open_paths_action(cx: &mut gpui::MutableAppContext) {
let app_state = build_app_state(cx.as_ref()); let app_state = build_app_state(cx.as_ref());
init(cx, app_state.rpc_client.clone()); init(cx, app_state.rpc.clone());
let dir = temp_tree(json!({ let dir = temp_tree(json!({
"a": { "a": {
@ -1028,7 +1024,7 @@ mod tests {
0, 0,
app_state.settings, app_state.settings,
app_state.language_registry, app_state.language_registry,
app_state.rpc_client, app_state.rpc,
cx, cx,
); );
workspace.add_worktree(dir.path(), cx); workspace.add_worktree(dir.path(), cx);
@ -1137,7 +1133,7 @@ mod tests {
0, 0,
app_state.settings, app_state.settings,
app_state.language_registry, app_state.language_registry,
app_state.rpc_client, app_state.rpc,
cx, cx,
); );
workspace.add_worktree(dir1.path(), cx); workspace.add_worktree(dir1.path(), cx);
@ -1211,7 +1207,7 @@ mod tests {
0, 0,
app_state.settings, app_state.settings,
app_state.language_registry, app_state.language_registry,
app_state.rpc_client, app_state.rpc,
cx, cx,
); );
workspace.add_worktree(dir.path(), cx); workspace.add_worktree(dir.path(), cx);
@ -1260,7 +1256,7 @@ mod tests {
0, 0,
app_state.settings, app_state.settings,
app_state.language_registry, app_state.language_registry,
app_state.rpc_client, app_state.rpc,
cx, cx,
); );
workspace.add_worktree(dir.path(), cx); workspace.add_worktree(dir.path(), cx);
@ -1366,7 +1362,7 @@ mod tests {
0, 0,
app_state.settings, app_state.settings,
app_state.language_registry, app_state.language_registry,
app_state.rpc_client, app_state.rpc,
cx, cx,
); );
workspace.add_worktree(dir.path(), cx); workspace.add_worktree(dir.path(), cx);

View file

@ -4,7 +4,6 @@ mod ignore;
use crate::{ use crate::{
editor::{History, Rope}, editor::{History, Rope},
rpc_client::{ConnectionId, RpcClient},
sum_tree::{self, Cursor, Edit, SumTree}, sum_tree::{self, Cursor, Edit, SumTree},
util::Bias, util::Bias,
}; };
@ -32,7 +31,7 @@ use std::{
sync::{Arc, Weak}, sync::{Arc, Weak},
time::{Duration, SystemTime, UNIX_EPOCH}, time::{Duration, SystemTime, UNIX_EPOCH},
}; };
use zed_rpc::proto; use zed_rpc::{proto, ConnectionId, Peer};
use self::{char_bag::CharBag, ignore::IgnoreStack}; use self::{char_bag::CharBag, ignore::IgnoreStack};
@ -54,7 +53,7 @@ pub struct Worktree {
scan_state: (watch::Sender<ScanState>, watch::Receiver<ScanState>), scan_state: (watch::Sender<ScanState>, watch::Receiver<ScanState>),
_event_stream_handle: fsevent::Handle, _event_stream_handle: fsevent::Handle,
poll_scheduled: bool, poll_scheduled: bool,
rpc_client: Option<Arc<RpcClient>>, rpc: Option<Arc<Peer>>,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -96,7 +95,7 @@ impl Worktree {
scan_state: watch::channel_with(ScanState::Scanning), scan_state: watch::channel_with(ScanState::Scanning),
_event_stream_handle: event_stream_handle, _event_stream_handle: event_stream_handle,
poll_scheduled: false, poll_scheduled: false,
rpc_client: None, rpc: None,
}; };
std::thread::spawn(move || { std::thread::spawn(move || {
@ -228,11 +227,11 @@ impl Worktree {
pub fn share( pub fn share(
&mut self, &mut self,
client: Arc<RpcClient>, client: Arc<Peer>,
connection_id: ConnectionId, connection_id: ConnectionId,
cx: &mut ModelContext<Self>, cx: &mut ModelContext<Self>,
) -> Task<anyhow::Result<()>> { ) -> Task<anyhow::Result<()>> {
self.rpc_client = Some(client.clone()); self.rpc = Some(client.clone());
let snapshot = self.snapshot(); let snapshot = self.snapshot();
cx.spawn(|_this, cx| async move { cx.spawn(|_this, cx| async move {
let paths = cx let paths = cx