diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index e648617fe1..8540a671be 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -315,7 +315,7 @@ impl Server { .add_request_handler(forward_read_only_project_request::) .add_request_handler(forward_read_only_project_request::) .add_request_handler(forward_read_only_project_request::) - .add_request_handler(forward_find_search_candidates_request) + .add_request_handler(forward_read_only_project_request::) .add_request_handler(forward_read_only_project_request::) .add_request_handler(forward_read_only_project_request::) .add_request_handler(forward_read_only_project_request::) @@ -666,7 +666,6 @@ impl Server { let total_duration_ms = received_at.elapsed().as_micros() as f64 / 1000.0; let processing_duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0; let queue_duration_ms = total_duration_ms - processing_duration_ms; - let payload_type = M::NAME; match result { Err(error) => { @@ -675,7 +674,6 @@ impl Server { total_duration_ms, processing_duration_ms, queue_duration_ms, - payload_type, "error handling message" ) } @@ -780,12 +778,11 @@ impl Server { async move { if *teardown.borrow() { tracing::error!("server is tearing down"); - return + return; } - let (connection_id, handle_io, mut incoming_rx) = this - .peer - .add_connection(connection, { + let (connection_id, handle_io, mut incoming_rx) = + this.peer.add_connection(connection, { let executor = executor.clone(); move |duration| executor.sleep(duration) }); @@ -802,10 +799,14 @@ impl Server { } }; - let supermaven_client = this.app_state.config.supermaven_admin_api_key.clone().map(|supermaven_admin_api_key| Arc::new(SupermavenAdminApi::new( - supermaven_admin_api_key.to_string(), - http_client.clone(), - ))); + let supermaven_client = this.app_state.config.supermaven_admin_api_key.clone().map( + |supermaven_admin_api_key| { + Arc::new(SupermavenAdminApi::new( + supermaven_admin_api_key.to_string(), + http_client.clone(), + )) + }, + ); let session = Session { principal: principal.clone(), @@ -820,7 +821,15 @@ impl Server { supermaven_client, }; - if let Err(error) = this.send_initial_client_update(connection_id, zed_version, send_connection_id, &session).await { + if let Err(error) = this + .send_initial_client_update( + connection_id, + zed_version, + send_connection_id, + &session, + ) + .await + { tracing::error!(?error, "failed to send initial client update"); return; } @@ -837,14 +846,22 @@ impl Server { // // This arrangement ensures we will attempt to process earlier messages first, but fall // back to processing messages arrived later in the spirit of making progress. + const MAX_CONCURRENT_HANDLERS: usize = 256; let mut foreground_message_handlers = FuturesUnordered::new(); - let concurrent_handlers = Arc::new(Semaphore::new(256)); + let concurrent_handlers = Arc::new(Semaphore::new(MAX_CONCURRENT_HANDLERS)); + let get_concurrent_handlers = { + let concurrent_handlers = concurrent_handlers.clone(); + move || MAX_CONCURRENT_HANDLERS - concurrent_handlers.available_permits() + }; loop { let next_message = async { let permit = concurrent_handlers.clone().acquire_owned().await.unwrap(); let message = incoming_rx.next().await; - (permit, message) - }.fuse(); + // Cache the concurrent_handlers here, so that we know what the + // queue looks like as each handler starts + (permit, message, get_concurrent_handlers()) + } + .fuse(); futures::pin_mut!(next_message); futures::select_biased! { _ = teardown.changed().fuse() => return, @@ -856,12 +873,16 @@ impl Server { } _ = foreground_message_handlers.next() => {} next_message = next_message => { - let (permit, message) = next_message; + let (permit, message, concurrent_handlers) = next_message; if let Some(message) = message { let type_name = message.payload_type_name(); // note: we copy all the fields from the parent span so we can query them in the logs. // (https://github.com/tokio-rs/tracing/issues/2670). - let span = tracing::info_span!("receive message", %connection_id, %address, type_name, + let span = tracing::info_span!("receive message", + %connection_id, + %address, + type_name, + concurrent_handlers, user_id=field::Empty, login=field::Empty, impersonator=field::Empty, @@ -895,12 +916,13 @@ impl Server { } drop(foreground_message_handlers); - tracing::info!("signing out"); + let concurrent_handlers = get_concurrent_handlers(); + tracing::info!(concurrent_handlers, "signing out"); if let Err(error) = connection_lost(session, teardown, executor).await { tracing::error!(?error, "error signing out"); } - - }.instrument(span) + } + .instrument(span) } async fn send_initial_client_update( @@ -2286,25 +2308,6 @@ where Ok(()) } -async fn forward_find_search_candidates_request( - request: proto::FindSearchCandidates, - response: Response, - session: Session, -) -> Result<()> { - let project_id = ProjectId::from_proto(request.remote_entity_id()); - let host_connection_id = session - .db() - .await - .host_for_read_only_project_request(project_id, session.connection_id) - .await?; - let payload = session - .peer - .forward_request(session.connection_id, host_connection_id, request) - .await?; - response.send(payload)?; - Ok(()) -} - /// forward a project request to the host. These requests are disallowed /// for guests. async fn forward_mutating_project_request( @@ -2336,6 +2339,7 @@ async fn multi_lsp_query( session: Session, ) -> Result<()> { tracing::Span::current().record("multi_lsp_query_request", request.request_str()); + tracing::info!("multi_lsp_query message received"); forward_mutating_project_request(request, response, session).await } diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 80a104641f..c1fd1df5ff 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -422,8 +422,26 @@ impl Peer { receiver_id: ConnectionId, request: T, ) -> impl Future> { + let request_start_time = Instant::now(); + let payload_type = T::NAME; + let elapsed_time = move || request_start_time.elapsed().as_millis(); + tracing::info!(payload_type, "start forwarding request"); self.request_internal(Some(sender_id), receiver_id, request) .map_ok(|envelope| envelope.payload) + .inspect_err(move |_| { + tracing::error!( + waiting_for_host_ms = elapsed_time(), + payload_type, + "error forwarding request" + ) + }) + .inspect_ok(move |_| { + tracing::info!( + waiting_for_host_ms = elapsed_time(), + payload_type, + "finished forwarding request" + ) + }) } fn request_internal(