From a5b7cfd128820daa5765f411b28edcd8d89fed37 Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Sun, 27 Jul 2025 15:50:04 +0200 Subject: [PATCH] agent_servers: Use built-in interrupt handling for Claude sessions (#35154) We no longer have to stop and restart the entire process. I left in the Start/Resume mode handling since we will likely need to handle restarting Claude in other situations. Release Notes: - N/A --- Cargo.lock | 1 + crates/agent_servers/Cargo.toml | 1 + crates/agent_servers/src/claude.rs | 134 ++++++++++++----------------- 3 files changed, 58 insertions(+), 78 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d1ea9cc32f..43a74fe687 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -168,6 +168,7 @@ dependencies = [ "nix 0.29.0", "paths", "project", + "rand 0.8.5", "schemars", "serde", "serde_json", diff --git a/crates/agent_servers/Cargo.toml b/crates/agent_servers/Cargo.toml index 4371f7684d..023799bc6c 100644 --- a/crates/agent_servers/Cargo.toml +++ b/crates/agent_servers/Cargo.toml @@ -29,6 +29,7 @@ itertools.workspace = true log.workspace = true paths.workspace = true project.workspace = true +rand.workspace = true schemars.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/crates/agent_servers/src/claude.rs b/crates/agent_servers/src/claude.rs index d63d8c43cf..9e10542a6d 100644 --- a/crates/agent_servers/src/claude.rs +++ b/crates/agent_servers/src/claude.rs @@ -9,7 +9,6 @@ use smol::process::Child; use std::cell::RefCell; use std::fmt::Display; use std::path::Path; -use std::pin::pin; use std::rc::Rc; use uuid::Uuid; @@ -66,19 +65,6 @@ impl AgentServer for ClaudeCode { } } -#[cfg(unix)] -fn send_interrupt(pid: libc::pid_t) -> anyhow::Result<()> { - let pid = nix::unistd::Pid::from_raw(pid); - - nix::sys::signal::kill(pid, nix::sys::signal::SIGINT) - .map_err(|e| anyhow!("Failed to interrupt process: {}", e)) -} - -#[cfg(windows)] -fn send_interrupt(_pid: i32) -> anyhow::Result<()> { - panic!("Cancel not implemented on Windows") -} - struct ClaudeAgentConnection { sessions: Rc>>, } @@ -127,7 +113,6 @@ impl AgentConnection for ClaudeAgentConnection { let (incoming_message_tx, mut incoming_message_rx) = mpsc::unbounded(); let (outgoing_tx, outgoing_rx) = mpsc::unbounded(); - let (cancel_tx, mut cancel_rx) = mpsc::unbounded::>>(); let session_id = acp::SessionId(Uuid::new_v4().to_string().into()); @@ -137,50 +122,28 @@ impl AgentConnection for ClaudeAgentConnection { let session_id = session_id.clone(); async move { let mut outgoing_rx = Some(outgoing_rx); - let mut mode = ClaudeSessionMode::Start; - loop { - let mut child = spawn_claude( - &command, - mode, - session_id.clone(), - &mcp_config_path, - &cwd, - ) - .await?; - mode = ClaudeSessionMode::Resume; + let mut child = spawn_claude( + &command, + ClaudeSessionMode::Start, + session_id.clone(), + &mcp_config_path, + &cwd, + ) + .await?; - let pid = child.id(); - log::trace!("Spawned (pid: {})", pid); + let pid = child.id(); + log::trace!("Spawned (pid: {})", pid); - let mut io_fut = pin!( - ClaudeAgentSession::handle_io( - outgoing_rx.take().unwrap(), - incoming_message_tx.clone(), - child.stdin.take().unwrap(), - child.stdout.take().unwrap(), - ) - .fuse() - ); + ClaudeAgentSession::handle_io( + outgoing_rx.take().unwrap(), + incoming_message_tx.clone(), + child.stdin.take().unwrap(), + child.stdout.take().unwrap(), + ) + .await?; - select_biased! { - done_tx = cancel_rx.next() => { - if let Some(done_tx) = done_tx { - log::trace!("Interrupted (pid: {})", pid); - let result = send_interrupt(pid as i32); - outgoing_rx.replace(io_fut.await?); - done_tx.send(result).log_err(); - continue; - } - } - result = io_fut => { - result?; - } - } - - log::trace!("Stopped (pid: {})", pid); - break; - } + log::trace!("Stopped (pid: {})", pid); drop(mcp_config_path); anyhow::Ok(()) @@ -213,7 +176,6 @@ impl AgentConnection for ClaudeAgentConnection { let session = ClaudeAgentSession { outgoing_tx, end_turn_tx, - cancel_tx, _handler_task: handler_task, _mcp_server: Some(permission_mcp_server), }; @@ -278,37 +240,24 @@ impl AgentConnection for ClaudeAgentConnection { }) } - fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) { + fn cancel(&self, session_id: &acp::SessionId, _cx: &mut App) { let sessions = self.sessions.borrow(); let Some(session) = sessions.get(&session_id) else { log::warn!("Attempted to cancel nonexistent session {}", session_id); return; }; - let (done_tx, done_rx) = oneshot::channel(); - if session - .cancel_tx - .unbounded_send(done_tx) - .log_err() - .is_some() - { - let end_turn_tx = session.end_turn_tx.clone(); - cx.foreground_executor() - .spawn(async move { - done_rx.await??; - if let Some(end_turn_tx) = end_turn_tx.take() { - end_turn_tx.send(Ok(())).ok(); - } - anyhow::Ok(()) - }) - .detach_and_log_err(cx); - } + session + .outgoing_tx + .unbounded_send(SdkMessage::new_interrupt_message()) + .log_err(); } } #[derive(Clone, Copy)] enum ClaudeSessionMode { Start, + #[expect(dead_code)] Resume, } @@ -364,7 +313,6 @@ async fn spawn_claude( struct ClaudeAgentSession { outgoing_tx: UnboundedSender, end_turn_tx: Rc>>>>, - cancel_tx: UnboundedSender>>, _mcp_server: Option, _handler_task: Task<()>, } @@ -377,6 +325,8 @@ impl ClaudeAgentSession { cx: &mut AsyncApp, ) { match message { + // we should only be sending these out, they don't need to be in the thread + SdkMessage::ControlRequest { .. } => {} SdkMessage::Assistant { message, session_id: _, @@ -643,14 +593,12 @@ enum SdkMessage { #[serde(skip_serializing_if = "Option::is_none")] session_id: Option, }, - // A user message User { message: Message, // from Anthropic SDK #[serde(skip_serializing_if = "Option::is_none")] session_id: Option, }, - // Emitted as the last message in a conversation Result { subtype: ResultErrorType, @@ -675,6 +623,18 @@ enum SdkMessage { #[serde(rename = "permissionMode")] permission_mode: PermissionMode, }, + /// Messages used to control the conversation, outside of chat messages to the model + ControlRequest { + request_id: String, + request: ControlRequest, + }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "subtype", rename_all = "snake_case")] +enum ControlRequest { + /// Cancel the current conversation + Interrupt, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -695,6 +655,24 @@ impl Display for ResultErrorType { } } +impl SdkMessage { + fn new_interrupt_message() -> Self { + use rand::Rng; + // In the Claude Code TS SDK they just generate a random 12 character string, + // `Math.random().toString(36).substring(2, 15)` + let request_id = rand::thread_rng() + .sample_iter(&rand::distributions::Alphanumeric) + .take(12) + .map(char::from) + .collect(); + + Self::ControlRequest { + request_id, + request: ControlRequest::Interrupt, + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] struct McpServer { name: String,