diff --git a/Cargo.lock b/Cargo.lock index f5e274cb81..11c9d229c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5822,7 +5822,9 @@ dependencies = [ "clap 3.0.0-beta.2", "collections", "comrak", + "ctor", "either", + "env_logger", "envy", "futures", "gpui", diff --git a/crates/client/src/channel.rs b/crates/client/src/channel.rs index d9555da7dd..f89f578247 100644 --- a/crates/client/src/channel.rs +++ b/crates/client/src/channel.rs @@ -17,7 +17,7 @@ use std::{ }; use sum_tree::{Bias, SumTree}; use time::OffsetDateTime; -use util::{post_inc, TryFutureExt}; +use util::{post_inc, ResultExt as _, TryFutureExt}; pub struct ChannelList { available_channels: Option>, @@ -168,16 +168,12 @@ impl ChannelList { impl Entity for Channel { type Event = ChannelEvent; - fn release(&mut self, cx: &mut MutableAppContext) { - let rpc = self.rpc.clone(); - let channel_id = self.details.id; - cx.foreground() - .spawn(async move { - if let Err(error) = rpc.send(proto::LeaveChannel { channel_id }).await { - log::error!("error leaving channel: {}", error); - }; + fn release(&mut self, _: &mut MutableAppContext) { + self.rpc + .send(proto::LeaveChannel { + channel_id: self.details.id, }) - .detach() + .log_err(); } } @@ -718,18 +714,16 @@ mod tests { }); // Receive a new message. - server - .send(proto::ChannelMessageSent { - channel_id: channel.read_with(&cx, |channel, _| channel.details.id), - message: Some(proto::ChannelMessage { - id: 12, - body: "c".into(), - timestamp: 1002, - sender_id: 7, - nonce: Some(3.into()), - }), - }) - .await; + server.send(proto::ChannelMessageSent { + channel_id: channel.read_with(&cx, |channel, _| channel.details.id), + message: Some(proto::ChannelMessage { + id: 12, + body: "c".into(), + timestamp: 1002, + sender_id: 7, + nonce: Some(3.into()), + }), + }); // Client requests user for message since they haven't seen them yet let get_users = server.receive::().await.unwrap(); diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index e22cd7cba9..103471c6f3 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -24,8 +24,10 @@ use std::{ collections::HashMap, convert::TryFrom, fmt::Write as _, - future::Future, - sync::{Arc, Weak}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Weak, + }, time::{Duration, Instant}, }; use surf::{http::Method, Url}; @@ -55,6 +57,7 @@ pub fn init(rpc: Arc, cx: &mut MutableAppContext) { } pub struct Client { + id: usize, peer: Arc, http: Arc, state: RwLock, @@ -167,7 +170,12 @@ impl Drop for Subscription { impl Client { pub fn new(http: Arc) -> Arc { + lazy_static! { + static ref NEXT_CLIENT_ID: AtomicUsize = AtomicUsize::default(); + } + Arc::new(Self { + id: NEXT_CLIENT_ID.fetch_add(1, Ordering::SeqCst), peer: Peer::new(), http, state: Default::default(), @@ -448,21 +456,31 @@ impl Client { None }; + let type_name = message.payload_type_name(); + let handler_key = (payload_type_id, entity_id); if let Some(handler) = state.model_handlers.get_mut(&handler_key) { let mut handler = handler.take().unwrap(); drop(state); // Avoid deadlocks if the handler interacts with rpc::Client - let start_time = Instant::now(); - log::info!("RPC client message {}", message.payload_type_name()); + + log::debug!( + "rpc message received. client_id:{}, name:{}", + this.id, + type_name + ); (handler)(message, &mut cx); - log::info!("RPC message handled. duration:{:?}", start_time.elapsed()); + log::debug!( + "rpc message handled. client_id:{}, name:{}", + this.id, + type_name + ); let mut state = this.state.write(); if state.model_handlers.contains_key(&handler_key) { state.model_handlers.insert(handler_key, Some(handler)); } } else { - log::info!("unhandled message {}", message.payload_type_name()); + log::info!("unhandled message {}", type_name); } } } @@ -677,19 +695,32 @@ impl Client { } } - pub async fn send(&self, message: T) -> Result<()> { - self.peer.send(self.connection_id()?, message).await + pub fn send(&self, message: T) -> Result<()> { + log::debug!("rpc send. client_id:{}, name:{}", self.id, T::NAME); + self.peer.send(self.connection_id()?, message) } pub async fn request(&self, request: T) -> Result { - self.peer.request(self.connection_id()?, request).await + log::debug!( + "rpc request start. client_id: {}. name:{}", + self.id, + T::NAME + ); + let response = self.peer.request(self.connection_id()?, request).await; + log::debug!( + "rpc request finish. client_id: {}. name:{}", + self.id, + T::NAME + ); + response } pub fn respond( &self, receipt: Receipt, response: T::Response, - ) -> impl Future> { + ) -> Result<()> { + log::debug!("rpc respond. client_id: {}. name:{}", self.id, T::NAME); self.peer.respond(receipt, response) } @@ -697,7 +728,8 @@ impl Client { &self, receipt: Receipt, error: proto::Error, - ) -> impl Future> { + ) -> Result<()> { + log::debug!("rpc respond. client_id: {}. name:{}", self.id, T::NAME); self.peer.respond_with_error(receipt, error) } } @@ -860,8 +892,8 @@ mod tests { }); drop(subscription3); - server.send(proto::UnshareProject { project_id: 1 }).await; - server.send(proto::UnshareProject { project_id: 2 }).await; + server.send(proto::UnshareProject { project_id: 1 }); + server.send(proto::UnshareProject { project_id: 2 }); done_rx1.next().await.unwrap(); done_rx2.next().await.unwrap(); } @@ -890,7 +922,7 @@ mod tests { Ok(()) }) }); - server.send(proto::Ping {}).await; + server.send(proto::Ping {}); done_rx2.next().await.unwrap(); } @@ -914,7 +946,7 @@ mod tests { }, )); }); - server.send(proto::Ping {}).await; + server.send(proto::Ping {}); done_rx.next().await.unwrap(); } diff --git a/crates/client/src/test.rs b/crates/client/src/test.rs index 7402417196..c8aca79192 100644 --- a/crates/client/src/test.rs +++ b/crates/client/src/test.rs @@ -118,8 +118,8 @@ impl FakeServer { self.forbid_connections.store(false, SeqCst); } - pub async fn send(&self, message: T) { - self.peer.send(self.connection_id(), message).await.unwrap(); + pub fn send(&self, message: T) { + self.peer.send(self.connection_id(), message).unwrap(); } pub async fn receive(&self) -> Result> { @@ -148,7 +148,7 @@ impl FakeServer { receipt: Receipt, response: T::Response, ) { - self.peer.respond(receipt, response).await.unwrap() + self.peer.respond(receipt, response).unwrap() } fn connection_id(&self) -> ConnectionId { diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 8b40e52488..91c5708fd5 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -461,7 +461,7 @@ impl Project { } })?; - rpc.send(proto::UnshareProject { project_id }).await?; + rpc.send(proto::UnshareProject { project_id })?; this.update(&mut cx, |this, cx| { this.collaborators.clear(); this.shared_buffers.clear(); @@ -856,15 +856,13 @@ impl Project { let this = cx.read(|cx| this.upgrade(cx))?; match message { LspEvent::DiagnosticsStart => { - let send = this.update(&mut cx, |this, cx| { + this.update(&mut cx, |this, cx| { this.disk_based_diagnostics_started(cx); - this.remote_id().map(|project_id| { + if let Some(project_id) = this.remote_id() { rpc.send(proto::DiskBasedDiagnosticsUpdating { project_id }) - }) + .log_err(); + } }); - if let Some(send) = send { - send.await.log_err(); - } } LspEvent::DiagnosticsUpdate(mut params) => { language.process_diagnostics(&mut params); @@ -874,15 +872,13 @@ impl Project { }); } LspEvent::DiagnosticsFinish => { - let send = this.update(&mut cx, |this, cx| { + this.update(&mut cx, |this, cx| { this.disk_based_diagnostics_finished(cx); - this.remote_id().map(|project_id| { + if let Some(project_id) = this.remote_id() { rpc.send(proto::DiskBasedDiagnosticsUpdated { project_id }) - }) + .log_err(); + } }); - if let Some(send) = send { - send.await.log_err(); - } } } } @@ -1501,15 +1497,13 @@ impl Project { }; if let Some(project_id) = self.remote_id() { - let client = self.client.clone(); - let message = proto::UpdateBufferFile { - project_id, - buffer_id: *buffer_id as u64, - file: Some(new_file.to_proto()), - }; - cx.foreground() - .spawn(async move { client.send(message).await }) - .detach_and_log_err(cx); + self.client + .send(proto::UpdateBufferFile { + project_id, + buffer_id: *buffer_id as u64, + file: Some(new_file.to_proto()), + }) + .log_err(); } buffer.file_updated(Box::new(new_file), cx).detach(); } @@ -1829,8 +1823,7 @@ impl Project { version: (&version).into(), mtime: Some(mtime.into()), }, - ) - .await?; + )?; Ok(()) } @@ -1859,16 +1852,13 @@ impl Project { // associated with formatting. cx.spawn(|_| async move { match format { - Ok(()) => rpc.respond(receipt, proto::Ack {}).await?, - Err(error) => { - rpc.respond_with_error( - receipt, - proto::Error { - message: error.to_string(), - }, - ) - .await? - } + Ok(()) => rpc.respond(receipt, proto::Ack {})?, + Err(error) => rpc.respond_with_error( + receipt, + proto::Error { + message: error.to_string(), + }, + )?, } Ok::<_, anyhow::Error>(()) }) @@ -1902,27 +1892,21 @@ impl Project { .update(&mut cx, |buffer, cx| buffer.completions(position, cx)) .await { - Ok(completions) => { - rpc.respond( - receipt, - proto::GetCompletionsResponse { - completions: completions - .iter() - .map(language::proto::serialize_completion) - .collect(), - }, - ) - .await - } - Err(error) => { - rpc.respond_with_error( - receipt, - proto::Error { - message: error.to_string(), - }, - ) - .await - } + Ok(completions) => rpc.respond( + receipt, + proto::GetCompletionsResponse { + completions: completions + .iter() + .map(language::proto::serialize_completion) + .collect(), + }, + ), + Err(error) => rpc.respond_with_error( + receipt, + proto::Error { + message: error.to_string(), + }, + ), } }) .detach_and_log_err(cx); @@ -1957,30 +1941,24 @@ impl Project { }) .await { - Ok(edit_ids) => { - rpc.respond( - receipt, - proto::ApplyCompletionAdditionalEditsResponse { - additional_edits: edit_ids - .into_iter() - .map(|edit_id| proto::AdditionalEdit { - replica_id: edit_id.replica_id as u32, - local_timestamp: edit_id.value, - }) - .collect(), - }, - ) - .await - } - Err(error) => { - rpc.respond_with_error( - receipt, - proto::Error { - message: error.to_string(), - }, - ) - .await - } + Ok(edit_ids) => rpc.respond( + receipt, + proto::ApplyCompletionAdditionalEditsResponse { + additional_edits: edit_ids + .into_iter() + .map(|edit_id| proto::AdditionalEdit { + replica_id: edit_id.replica_id as u32, + local_timestamp: edit_id.value, + }) + .collect(), + }, + ), + Err(error) => rpc.respond_with_error( + receipt, + proto::Error { + message: error.to_string(), + }, + ), } }) .detach_and_log_err(cx); @@ -2026,7 +2004,7 @@ impl Project { }); } }); - rpc.respond(receipt, response).await?; + rpc.respond(receipt, response)?; Ok::<_, anyhow::Error>(()) }) .detach_and_log_err(cx); @@ -2062,7 +2040,6 @@ impl Project { buffer: Some(buffer), }, ) - .await } .log_err() }) @@ -2296,28 +2273,21 @@ impl<'a> Iterator for CandidateSetIter<'a> { impl Entity for Project { type Event = Event; - fn release(&mut self, cx: &mut gpui::MutableAppContext) { + fn release(&mut self, _: &mut gpui::MutableAppContext) { match &self.client_state { ProjectClientState::Local { remote_id_rx, .. } => { if let Some(project_id) = *remote_id_rx.borrow() { - let rpc = self.client.clone(); - cx.spawn(|_| async move { - if let Err(err) = rpc.send(proto::UnregisterProject { project_id }).await { - log::error!("error unregistering project: {}", err); - } - }) - .detach(); + self.client + .send(proto::UnregisterProject { project_id }) + .log_err(); } } ProjectClientState::Remote { remote_id, .. } => { - let rpc = self.client.clone(); - let project_id = *remote_id; - cx.spawn(|_| async move { - if let Err(err) = rpc.send(proto::LeaveProject { project_id }).await { - log::error!("error leaving project: {}", err); - } - }) - .detach(); + self.client + .send(proto::LeaveProject { + project_id: *remote_id, + }) + .log_err(); } } } diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 32b4009207..643c26aa71 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -149,7 +149,7 @@ pub enum Event { impl Entity for Worktree { type Event = Event; - fn release(&mut self, cx: &mut MutableAppContext) { + fn release(&mut self, _: &mut MutableAppContext) { if let Some(worktree) = self.as_local_mut() { if let Registration::Done { project_id } = worktree.registration { let client = worktree.client.clone(); @@ -157,12 +157,7 @@ impl Entity for Worktree { project_id, worktree_id: worktree.id().to_proto(), }; - cx.foreground() - .spawn(async move { - client.send(unregister_message).await?; - Ok::<_, anyhow::Error>(()) - }) - .detach_and_log_err(cx); + client.send(unregister_message).log_err(); } } } @@ -596,7 +591,7 @@ impl LocalWorktree { &mut self, worktree_path: Arc, diagnostics: Vec>, - cx: &mut ModelContext, + _: &mut ModelContext, ) -> Result<()> { let summary = DiagnosticSummary::new(&diagnostics); self.diagnostic_summaries @@ -604,30 +599,19 @@ impl LocalWorktree { self.diagnostics.insert(worktree_path.clone(), diagnostics); if let Some(share) = self.share.as_ref() { - cx.foreground() - .spawn({ - let client = self.client.clone(); - let project_id = share.project_id; - let worktree_id = self.id().to_proto(); - let path = worktree_path.to_string_lossy().to_string(); - async move { - client - .send(proto::UpdateDiagnosticSummary { - project_id, - worktree_id, - summary: Some(proto::DiagnosticSummary { - path, - error_count: summary.error_count as u32, - warning_count: summary.warning_count as u32, - info_count: summary.info_count as u32, - hint_count: summary.hint_count as u32, - }), - }) - .await - .log_err() - } + self.client + .send(proto::UpdateDiagnosticSummary { + project_id: share.project_id, + worktree_id: self.id().to_proto(), + summary: Some(proto::DiagnosticSummary { + path: worktree_path.to_string_lossy().to_string(), + error_count: summary.error_count as u32, + warning_count: summary.warning_count as u32, + info_count: summary.info_count as u32, + hint_count: summary.hint_count as u32, + }), }) - .detach(); + .log_err(); } Ok(()) @@ -787,7 +771,7 @@ impl LocalWorktree { while let Ok(snapshot) = snapshots_to_send_rx.recv().await { let message = snapshot.build_update(&prev_snapshot, project_id, worktree_id, false); - match rpc.send(message).await { + match rpc.send(message) { Ok(()) => prev_snapshot = snapshot, Err(err) => log::error!("error sending snapshot diff {}", err), } @@ -1377,8 +1361,7 @@ impl language::File for File { buffer_id, version: (&version).into(), mtime: Some(entry.mtime.into()), - }) - .await?; + })?; } Ok((version, entry.mtime)) }) @@ -1501,23 +1484,15 @@ impl language::File for File { } fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) { - self.worktree.update(cx, |worktree, cx| { + self.worktree.update(cx, |worktree, _| { if let Worktree::Remote(worktree) = worktree { - let project_id = worktree.project_id; - let rpc = worktree.client.clone(); - cx.background() - .spawn(async move { - if let Err(error) = rpc - .send(proto::CloseBuffer { - project_id, - buffer_id, - }) - .await - { - log::error!("error closing remote buffer: {}", error); - } + worktree + .client + .send(proto::CloseBuffer { + project_id: worktree.project_id, + buffer_id, }) - .detach(); + .log_err(); } }); } @@ -1563,16 +1538,15 @@ impl language::LocalFile for File { ) { let worktree = self.worktree.read(cx).as_local().unwrap(); if let Some(project_id) = worktree.share.as_ref().map(|share| share.project_id) { - let rpc = worktree.client.clone(); - let message = proto::BufferReloaded { - project_id, - buffer_id, - version: version.into(), - mtime: Some(mtime.into()), - }; - cx.background() - .spawn(async move { rpc.send(message).await }) - .detach_and_log_err(cx); + worktree + .client + .send(proto::BufferReloaded { + project_id, + buffer_id, + version: version.into(), + mtime: Some(mtime.into()), + }) + .log_err(); } } } diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index dcfcd2530c..77e9bb4db4 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -5,7 +5,7 @@ use futures::stream::BoxStream; use futures::{FutureExt as _, StreamExt}; use parking_lot::{Mutex, RwLock}; use postage::{ - mpsc, + barrier, mpsc, prelude::{Sink as _, Stream as _}, }; use smol_timeout::TimeoutExt as _; @@ -89,9 +89,10 @@ pub struct Peer { #[derive(Clone)] pub struct ConnectionState { - outgoing_tx: mpsc::Sender, + outgoing_tx: futures::channel::mpsc::UnboundedSender, next_message_id: Arc, - response_channels: Arc>>>>, + response_channels: + Arc>>>>, } const WRITE_TIMEOUT: Duration = Duration::from_secs(10); @@ -112,9 +113,14 @@ impl Peer { impl Future> + Send, BoxStream<'static, Box>, ) { - let connection_id = ConnectionId(self.next_connection_id.fetch_add(1, SeqCst)); + // For outgoing messages, use an unbounded channel so that application code + // can always send messages without yielding. For incoming messages, use a + // bounded channel so that other peers will receive backpressure if they send + // messages faster than this peer can process them. let (mut incoming_tx, incoming_rx) = mpsc::channel(64); - let (outgoing_tx, mut outgoing_rx) = mpsc::channel(64); + let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded(); + + let connection_id = ConnectionId(self.next_connection_id.fetch_add(1, SeqCst)); let connection_state = ConnectionState { outgoing_tx, next_message_id: Default::default(), @@ -131,6 +137,16 @@ impl Peer { futures::pin_mut!(read_message); loop { futures::select_biased! { + outgoing = outgoing_rx.next().fuse() => match outgoing { + Some(outgoing) => { + match writer.write_message(&outgoing).timeout(WRITE_TIMEOUT).await { + None => break 'outer Err(anyhow!("timed out writing RPC message")), + Some(Err(result)) => break 'outer Err(result).context("failed to write RPC message"), + _ => {} + } + } + None => break 'outer Ok(()), + }, incoming = read_message => match incoming { Ok(incoming) => { if incoming_tx.send(incoming).await.is_err() { @@ -142,16 +158,6 @@ impl Peer { break 'outer Err(error).context("received invalid RPC message") } }, - outgoing = outgoing_rx.recv().fuse() => match outgoing { - Some(outgoing) => { - match writer.write_message(&outgoing).timeout(WRITE_TIMEOUT).await { - None => break 'outer Err(anyhow!("timed out writing RPC message")), - Some(Err(result)) => break 'outer Err(result).context("failed to write RPC message"), - _ => {} - } - } - None => break 'outer Ok(()), - } } } }; @@ -172,7 +178,9 @@ impl Peer { if let Some(responding_to) = incoming.responding_to { let channel = response_channels.lock().as_mut()?.remove(&responding_to); if let Some(mut tx) = channel { - tx.send(incoming).await.ok(); + let mut requester_resumed = barrier::channel(); + tx.send((incoming, requester_resumed.0)).await.ok(); + requester_resumed.1.recv().await; } else { log::warn!("received RPC response to unknown request {}", responding_to); } @@ -200,7 +208,7 @@ impl Peer { } pub fn request( - self: &Arc, + &self, receiver_id: ConnectionId, request: T, ) -> impl Future> { @@ -208,7 +216,7 @@ impl Peer { } pub fn forward_request( - self: &Arc, + &self, sender_id: ConnectionId, receiver_id: ConnectionId, request: T, @@ -217,15 +225,13 @@ impl Peer { } pub fn request_internal( - self: &Arc, + &self, original_sender_id: Option, receiver_id: ConnectionId, request: T, ) -> impl Future> { - let this = self.clone(); let (tx, mut rx) = mpsc::channel(1); - async move { - let mut connection = this.connection_state(receiver_id)?; + let send = self.connection_state(receiver_id).and_then(|connection| { let message_id = connection.next_message_id.fetch_add(1, SeqCst); connection .response_channels @@ -235,10 +241,17 @@ impl Peer { .insert(message_id, tx); connection .outgoing_tx - .send(request.into_envelope(message_id, None, original_sender_id.map(|id| id.0))) - .await + .unbounded_send(request.into_envelope( + message_id, + None, + original_sender_id.map(|id| id.0), + )) .map_err(|_| anyhow!("connection was closed"))?; - let response = rx + Ok(()) + }); + async move { + send?; + let (response, _barrier) = rx .recv() .await .ok_or_else(|| anyhow!("connection was closed"))?; @@ -251,81 +264,61 @@ impl Peer { } } - pub fn send( - self: &Arc, - receiver_id: ConnectionId, - message: T, - ) -> impl Future> { - let this = self.clone(); - async move { - let mut connection = this.connection_state(receiver_id)?; - let message_id = connection - .next_message_id - .fetch_add(1, atomic::Ordering::SeqCst); - connection - .outgoing_tx - .send(message.into_envelope(message_id, None, None)) - .await?; - Ok(()) - } + pub fn send(&self, receiver_id: ConnectionId, message: T) -> Result<()> { + let connection = self.connection_state(receiver_id)?; + let message_id = connection + .next_message_id + .fetch_add(1, atomic::Ordering::SeqCst); + connection + .outgoing_tx + .unbounded_send(message.into_envelope(message_id, None, None))?; + Ok(()) } pub fn forward_send( - self: &Arc, + &self, sender_id: ConnectionId, receiver_id: ConnectionId, message: T, - ) -> impl Future> { - let this = self.clone(); - async move { - let mut connection = this.connection_state(receiver_id)?; - let message_id = connection - .next_message_id - .fetch_add(1, atomic::Ordering::SeqCst); - connection - .outgoing_tx - .send(message.into_envelope(message_id, None, Some(sender_id.0))) - .await?; - Ok(()) - } + ) -> Result<()> { + let connection = self.connection_state(receiver_id)?; + let message_id = connection + .next_message_id + .fetch_add(1, atomic::Ordering::SeqCst); + connection + .outgoing_tx + .unbounded_send(message.into_envelope(message_id, None, Some(sender_id.0)))?; + Ok(()) } pub fn respond( - self: &Arc, + &self, receipt: Receipt, response: T::Response, - ) -> impl Future> { - let this = self.clone(); - async move { - let mut connection = this.connection_state(receipt.sender_id)?; - let message_id = connection - .next_message_id - .fetch_add(1, atomic::Ordering::SeqCst); - connection - .outgoing_tx - .send(response.into_envelope(message_id, Some(receipt.message_id), None)) - .await?; - Ok(()) - } + ) -> Result<()> { + let connection = self.connection_state(receipt.sender_id)?; + let message_id = connection + .next_message_id + .fetch_add(1, atomic::Ordering::SeqCst); + connection + .outgoing_tx + .unbounded_send(response.into_envelope(message_id, Some(receipt.message_id), None))?; + Ok(()) } pub fn respond_with_error( - self: &Arc, + &self, receipt: Receipt, response: proto::Error, - ) -> impl Future> { - let this = self.clone(); - async move { - let mut connection = this.connection_state(receipt.sender_id)?; - let message_id = connection - .next_message_id - .fetch_add(1, atomic::Ordering::SeqCst); - connection - .outgoing_tx - .send(response.into_envelope(message_id, Some(receipt.message_id), None)) - .await?; - Ok(()) - } + ) -> Result<()> { + let connection = self.connection_state(receipt.sender_id)?; + let message_id = connection + .next_message_id + .fetch_add(1, atomic::Ordering::SeqCst); + connection + .outgoing_tx + .unbounded_send(response.into_envelope(message_id, Some(receipt.message_id), None))?; + Ok(()) } fn connection_state(&self, connection_id: ConnectionId) -> Result { @@ -447,7 +440,7 @@ mod tests { let envelope = envelope.into_any(); if let Some(envelope) = envelope.downcast_ref::>() { let receipt = envelope.receipt(); - peer.respond(receipt, proto::Ack {}).await? + peer.respond(receipt, proto::Ack {})? } else if let Some(envelope) = envelope.downcast_ref::>() { @@ -475,7 +468,7 @@ mod tests { } }; - peer.respond(receipt, response).await? + peer.respond(receipt, response)? } else { panic!("unknown message type"); } @@ -518,7 +511,6 @@ mod tests { message: "message 1".to_string(), }, ) - .await .unwrap(); server .send( @@ -527,12 +519,8 @@ mod tests { message: "message 2".to_string(), }, ) - .await - .unwrap(); - server - .respond(request.receipt(), proto::Ack {}) - .await .unwrap(); + server.respond(request.receipt(), proto::Ack {}).unwrap(); // Prevent the connection from being dropped server_incoming.next().await; diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index c99de584f7..2ce5a6342d 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -59,6 +59,8 @@ features = ["runtime-async-std-rustls", "postgres", "time", "uuid"] collections = { path = "../collections", features = ["test-support"] } gpui = { path = "../gpui" } zed = { path = "../zed", features = ["test-support"] } +ctor = "0.1" +env_logger = "0.8" lazy_static = "1.4" serde_json = { version = "1.0.64", features = ["preserve_order"] } diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 98006e7865..672d6054de 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -131,7 +131,7 @@ impl Server { } this.state_mut().add_connection(connection_id, user_id); - if let Err(err) = this.update_contacts_for_users(&[user_id]).await { + if let Err(err) = this.update_contacts_for_users(&[user_id]) { log::error!("error updating contacts for {:?}: {}", user_id, err); } @@ -141,34 +141,35 @@ impl Server { let next_message = incoming_rx.next().fuse(); futures::pin_mut!(next_message); futures::select_biased! { + result = handle_io => { + if let Err(err) = result { + log::error!("error handling rpc connection {:?} - {:?}", addr, err); + } + break; + } message = next_message => { if let Some(message) = message { let start_time = Instant::now(); - log::info!("RPC message received: {}", message.payload_type_name()); + let type_name = message.payload_type_name(); + log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name); if let Some(handler) = this.handlers.get(&message.payload_type_id()) { if let Err(err) = (handler)(this.clone(), message).await { - log::error!("error handling message: {:?}", err); + log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err); } else { - log::info!("RPC message handled. duration:{:?}", start_time.elapsed()); + log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed()); } if let Some(mut notifications) = this.notifications.clone() { let _ = notifications.send(()).await; } } else { - log::warn!("unhandled message: {}", message.payload_type_name()); + log::warn!("unhandled message: {}", type_name); } } else { log::info!("rpc connection closed {:?}", addr); break; } } - handle_io = handle_io => { - if let Err(err) = handle_io { - log::error!("error handling rpc connection {:?} - {:?}", addr, err); - } - break; - } } } @@ -191,8 +192,7 @@ impl Server { self.peer .send(conn_id, proto::UnshareProject { project_id }) }, - ) - .await?; + )?; } } @@ -205,18 +205,15 @@ impl Server { peer_id: connection_id.0, }, ) - }) - .await?; + })?; } - self.update_contacts_for_users(removed_connection.contact_ids.iter()) - .await?; - + self.update_contacts_for_users(removed_connection.contact_ids.iter())?; Ok(()) } async fn ping(self: Arc, request: TypedEnvelope) -> tide::Result<()> { - self.peer.respond(request.receipt(), proto::Ack {}).await?; + self.peer.respond(request.receipt(), proto::Ack {})?; Ok(()) } @@ -229,12 +226,10 @@ impl Server { let user_id = state.user_id_for_connection(request.sender_id)?; state.register_project(request.sender_id, user_id) }; - self.peer - .respond( - request.receipt(), - proto::RegisterProjectResponse { project_id }, - ) - .await?; + self.peer.respond( + request.receipt(), + proto::RegisterProjectResponse { project_id }, + )?; Ok(()) } @@ -246,8 +241,7 @@ impl Server { .state_mut() .unregister_project(request.payload.project_id, request.sender_id) .ok_or_else(|| anyhow!("no such project"))?; - self.update_contacts_for_users(project.authorized_user_ids().iter()) - .await?; + self.update_contacts_for_users(project.authorized_user_ids().iter())?; Ok(()) } @@ -257,7 +251,7 @@ impl Server { ) -> tide::Result<()> { self.state_mut() .share_project(request.payload.project_id, request.sender_id); - self.peer.respond(request.receipt(), proto::Ack {}).await?; + self.peer.respond(request.receipt(), proto::Ack {})?; Ok(()) } @@ -273,11 +267,8 @@ impl Server { broadcast(request.sender_id, project.connection_ids, |conn_id| { self.peer .send(conn_id, proto::UnshareProject { project_id }) - }) - .await?; - self.update_contacts_for_users(&project.authorized_user_ids) - .await?; - + })?; + self.update_contacts_for_users(&project.authorized_user_ids)?; Ok(()) } @@ -351,20 +342,17 @@ impl Server { }), }, ) - }) - .await?; - self.peer.respond(request.receipt(), response).await?; - self.update_contacts_for_users(&contact_user_ids).await?; + })?; + self.peer.respond(request.receipt(), response)?; + self.update_contacts_for_users(&contact_user_ids)?; } Err(error) => { - self.peer - .respond_with_error( - request.receipt(), - proto::Error { - message: error.to_string(), - }, - ) - .await?; + self.peer.respond_with_error( + request.receipt(), + proto::Error { + message: error.to_string(), + }, + )?; } } @@ -387,10 +375,8 @@ impl Server { peer_id: sender_id.0, }, ) - }) - .await?; - self.update_contacts_for_users(&worktree.authorized_user_ids) - .await?; + })?; + self.update_contacts_for_users(&worktree.authorized_user_ids)?; } Ok(()) } @@ -412,8 +398,7 @@ impl Server { Err(err) => { let message = err.to_string(); self.peer - .respond_with_error(receipt, proto::Error { message }) - .await?; + .respond_with_error(receipt, proto::Error { message })?; return Ok(()); } } @@ -432,17 +417,15 @@ impl Server { ); if ok { - self.peer.respond(receipt, proto::Ack {}).await?; - self.update_contacts_for_users(&contact_user_ids).await?; + self.peer.respond(receipt, proto::Ack {})?; + self.update_contacts_for_users(&contact_user_ids)?; } else { - self.peer - .respond_with_error( - receipt, - proto::Error { - message: NO_SUCH_PROJECT.to_string(), - }, - ) - .await?; + self.peer.respond_with_error( + receipt, + proto::Error { + message: NO_SUCH_PROJECT.to_string(), + }, + )?; } Ok(()) @@ -457,7 +440,6 @@ impl Server { let (worktree, guest_connection_ids) = self.state_mut() .unregister_worktree(project_id, worktree_id, request.sender_id)?; - broadcast(request.sender_id, guest_connection_ids, |conn_id| { self.peer.send( conn_id, @@ -466,10 +448,8 @@ impl Server { worktree_id, }, ) - }) - .await?; - self.update_contacts_for_users(&worktree.authorized_user_ids) - .await?; + })?; + self.update_contacts_for_users(&worktree.authorized_user_ids)?; Ok(()) } @@ -511,20 +491,16 @@ impl Server { request.payload.clone(), ) }, - ) - .await?; - self.peer.respond(request.receipt(), proto::Ack {}).await?; - self.update_contacts_for_users(&shared_worktree.authorized_user_ids) - .await?; + )?; + self.peer.respond(request.receipt(), proto::Ack {})?; + self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?; } else { - self.peer - .respond_with_error( - request.receipt(), - proto::Error { - message: "no such worktree".to_string(), - }, - ) - .await?; + self.peer.respond_with_error( + request.receipt(), + proto::Error { + message: "no such worktree".to_string(), + }, + )?; } Ok(()) } @@ -547,8 +523,7 @@ impl Server { broadcast(request.sender_id, connection_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) - }) - .await?; + })?; Ok(()) } @@ -574,8 +549,7 @@ impl Server { broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) - }) - .await?; + })?; Ok(()) } @@ -590,8 +564,7 @@ impl Server { broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) - }) - .await?; + })?; Ok(()) } @@ -606,8 +579,7 @@ impl Server { broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) - }) - .await?; + })?; Ok(()) } @@ -625,7 +597,7 @@ impl Server { .peer .forward_request(request.sender_id, host_connection_id, request.payload) .await?; - self.peer.respond(receipt, response).await?; + self.peer.respond(receipt, response)?; Ok(()) } @@ -643,7 +615,7 @@ impl Server { .peer .forward_request(request.sender_id, host_connection_id, request.payload) .await?; - self.peer.respond(receipt, response).await?; + self.peer.respond(receipt, response)?; Ok(()) } @@ -657,8 +629,7 @@ impl Server { .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))? .host_connection_id; self.peer - .forward_send(request.sender_id, host_connection_id, request.payload) - .await?; + .forward_send(request.sender_id, host_connection_id, request.payload)?; Ok(()) } @@ -686,16 +657,12 @@ impl Server { broadcast(host, guests, |conn_id| { let response = response.clone(); - let peer = &self.peer; - async move { - if conn_id == sender { - peer.respond(receipt, response).await - } else { - peer.forward_send(host, conn_id, response).await - } + if conn_id == sender { + self.peer.respond(receipt, response) + } else { + self.peer.forward_send(host, conn_id, response) } - }) - .await?; + })?; Ok(()) } @@ -719,7 +686,7 @@ impl Server { .peer .forward_request(sender, host, request.payload.clone()) .await?; - self.peer.respond(receipt, response).await?; + self.peer.respond(receipt, response)?; Ok(()) } @@ -743,8 +710,7 @@ impl Server { .peer .forward_request(sender, host, request.payload.clone()) .await?; - self.peer.respond(receipt, response).await?; - + self.peer.respond(receipt, response)?; Ok(()) } @@ -767,8 +733,7 @@ impl Server { .peer .forward_request(sender, host, request.payload.clone()) .await?; - self.peer.respond(receipt, response).await?; - + self.peer.respond(receipt, response)?; Ok(()) } @@ -783,9 +748,8 @@ impl Server { broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) - }) - .await?; - self.peer.respond(request.receipt(), proto::Ack {}).await?; + })?; + self.peer.respond(request.receipt(), proto::Ack {})?; Ok(()) } @@ -800,8 +764,7 @@ impl Server { broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) - }) - .await?; + })?; Ok(()) } @@ -816,8 +779,7 @@ impl Server { broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) - }) - .await?; + })?; Ok(()) } @@ -832,8 +794,7 @@ impl Server { broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) - }) - .await?; + })?; Ok(()) } @@ -843,20 +804,18 @@ impl Server { ) -> tide::Result<()> { let user_id = self.state().user_id_for_connection(request.sender_id)?; let channels = self.app_state.db.get_accessible_channels(user_id).await?; - self.peer - .respond( - request.receipt(), - proto::GetChannelsResponse { - channels: channels - .into_iter() - .map(|chan| proto::Channel { - id: chan.id.to_proto(), - name: chan.name, - }) - .collect(), - }, - ) - .await?; + self.peer.respond( + request.receipt(), + proto::GetChannelsResponse { + channels: channels + .into_iter() + .map(|chan| proto::Channel { + id: chan.id.to_proto(), + name: chan.name, + }) + .collect(), + }, + )?; Ok(()) } @@ -879,34 +838,30 @@ impl Server { }) .collect(); self.peer - .respond(receipt, proto::GetUsersResponse { users }) - .await?; + .respond(receipt, proto::GetUsersResponse { users })?; Ok(()) } - async fn update_contacts_for_users<'a>( + fn update_contacts_for_users<'a>( self: &Arc, user_ids: impl IntoIterator, - ) -> tide::Result<()> { - let mut send_futures = Vec::new(); - - { - let state = self.state(); - for user_id in user_ids { - let contacts = state.contacts_for_user(*user_id); - for connection_id in state.connection_ids_for_user(*user_id) { - send_futures.push(self.peer.send( - connection_id, - proto::UpdateContacts { - contacts: contacts.clone(), - }, - )); + ) -> anyhow::Result<()> { + let mut result = Ok(()); + let state = self.state(); + for user_id in user_ids { + let contacts = state.contacts_for_user(*user_id); + for connection_id in state.connection_ids_for_user(*user_id) { + if let Err(error) = self.peer.send( + connection_id, + proto::UpdateContacts { + contacts: contacts.clone(), + }, + ) { + result = Err(error); } } } - futures::future::try_join_all(send_futures).await?; - - Ok(()) + result } async fn join_channel( @@ -939,15 +894,13 @@ impl Server { nonce: Some(msg.nonce.as_u128().into()), }) .collect::>(); - self.peer - .respond( - request.receipt(), - proto::JoinChannelResponse { - done: messages.len() < MESSAGE_COUNT_PER_PAGE, - messages, - }, - ) - .await?; + self.peer.respond( + request.receipt(), + proto::JoinChannelResponse { + done: messages.len() < MESSAGE_COUNT_PER_PAGE, + messages, + }, + )?; Ok(()) } @@ -993,25 +946,21 @@ impl Server { // Validate the message body. let body = request.payload.body.trim().to_string(); if body.len() > MAX_MESSAGE_LEN { - self.peer - .respond_with_error( - receipt, - proto::Error { - message: "message is too long".to_string(), - }, - ) - .await?; + self.peer.respond_with_error( + receipt, + proto::Error { + message: "message is too long".to_string(), + }, + )?; return Ok(()); } if body.is_empty() { - self.peer - .respond_with_error( - receipt, - proto::Error { - message: "message can't be blank".to_string(), - }, - ) - .await?; + self.peer.respond_with_error( + receipt, + proto::Error { + message: "message can't be blank".to_string(), + }, + )?; return Ok(()); } @@ -1019,14 +968,12 @@ impl Server { let nonce = if let Some(nonce) = request.payload.nonce { nonce } else { - self.peer - .respond_with_error( - receipt, - proto::Error { - message: "nonce can't be blank".to_string(), - }, - ) - .await?; + self.peer.respond_with_error( + receipt, + proto::Error { + message: "nonce can't be blank".to_string(), + }, + )?; return Ok(()); }; @@ -1051,16 +998,13 @@ impl Server { message: Some(message.clone()), }, ) - }) - .await?; - self.peer - .respond( - receipt, - proto::SendChannelMessageResponse { - message: Some(message), - }, - ) - .await?; + })?; + self.peer.respond( + receipt, + proto::SendChannelMessageResponse { + message: Some(message), + }, + )?; Ok(()) } @@ -1097,15 +1041,13 @@ impl Server { nonce: Some(msg.nonce.as_u128().into()), }) .collect::>(); - self.peer - .respond( - request.receipt(), - proto::GetChannelMessagesResponse { - done: messages.len() < MESSAGE_COUNT_PER_PAGE, - messages, - }, - ) - .await?; + self.peer.respond( + request.receipt(), + proto::GetChannelMessagesResponse { + done: messages.len() < MESSAGE_COUNT_PER_PAGE, + messages, + }, + )?; Ok(()) } @@ -1118,21 +1060,25 @@ impl Server { } } -pub async fn broadcast( +fn broadcast( sender_id: ConnectionId, receiver_ids: Vec, mut f: F, ) -> anyhow::Result<()> where - F: FnMut(ConnectionId) -> T, - T: Future>, + F: FnMut(ConnectionId) -> anyhow::Result<()>, { - let futures = receiver_ids - .into_iter() - .filter(|id| *id != sender_id) - .map(|id| f(id)); - futures::future::try_join_all(futures).await?; - Ok(()) + let mut result = Ok(()); + for receiver_id in receiver_ids { + if receiver_id != sender_id { + if let Err(error) = f(receiver_id) { + if result.is_ok() { + result = Err(error); + } + } + } + } + result } pub fn add_routes(app: &mut tide::Server>, rpc: &Arc) { @@ -1247,6 +1193,13 @@ mod tests { project::{DiagnosticSummary, Project, ProjectPath}, }; + #[cfg(test)] + #[ctor::ctor] + fn init_logger() { + // std::env::set_var("RUST_LOG", "info"); + env_logger::init(); + } + #[gpui::test] async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { let (window_b, _) = cx_b.add_window(|_| EmptyView);