From 0f395df9a8f8f8dcceeed3f5cd98e7ba5014cc1d Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Mon, 28 Jul 2025 18:02:21 -0300 Subject: [PATCH 01/13] Update to new schema --- Cargo.lock | 4 +--- Cargo.toml | 2 +- crates/acp_thread/src/acp_thread.rs | 14 +++++++------- crates/agent_servers/src/codex.rs | 12 ++++-------- crates/agent_servers/src/mcp_server.rs | 7 ++++--- 5 files changed, 17 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8d9a622655..eb034f4cc3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -138,9 +138,7 @@ dependencies = [ [[package]] name = "agent-client-protocol" -version = "0.0.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72ec54650c1fc2d63498bab47eeeaa9eddc7d239d53f615b797a0e84f7ccc87b" +version = "0.0.12" dependencies = [ "schemars", "serde", diff --git a/Cargo.toml b/Cargo.toml index 16ace7dee0..d733f2242e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -413,7 +413,7 @@ zlog_settings = { path = "crates/zlog_settings" } # agentic-coding-protocol = "0.0.10" -agent-client-protocol = "0.0.11" +agent-client-protocol = {path="../agent-client-protocol"} aho-corasick = "1.1" alacritty_terminal = { git = "https://github.com/zed-industries/alacritty.git", branch = "add-hush-login-flag" } any_vec = "0.14" diff --git a/crates/acp_thread/src/acp_thread.rs b/crates/acp_thread/src/acp_thread.rs index d572992c54..d02f2d6bb6 100644 --- a/crates/acp_thread/src/acp_thread.rs +++ b/crates/acp_thread/src/acp_thread.rs @@ -391,7 +391,7 @@ impl ToolCallContent { cx: &mut App, ) -> Self { match content { - acp::ToolCallContent::ContentBlock(content) => Self::ContentBlock { + acp::ToolCallContent::Content { content } => Self::ContentBlock { content: ContentBlock::new(content, &language_registry, cx), }, acp::ToolCallContent::Diff { diff } => Self::Diff { @@ -682,14 +682,14 @@ impl AcpThread { cx: &mut Context, ) -> Result<()> { match update { - acp::SessionUpdate::UserMessage(content_block) => { - self.push_user_content_block(content_block, cx); + acp::SessionUpdate::UserMessageChunk { content } => { + self.push_user_content_block(content, cx); } - acp::SessionUpdate::AgentMessageChunk(content_block) => { - self.push_assistant_content_block(content_block, false, cx); + acp::SessionUpdate::AgentMessageChunk { content } => { + self.push_assistant_content_block(content, false, cx); } - acp::SessionUpdate::AgentThoughtChunk(content_block) => { - self.push_assistant_content_block(content_block, true, cx); + acp::SessionUpdate::AgentThoughtChunk { content } => { + self.push_assistant_content_block(content, true, cx); } acp::SessionUpdate::ToolCall(tool_call) => { self.upsert_tool_call(tool_call, cx); diff --git a/crates/agent_servers/src/codex.rs b/crates/agent_servers/src/codex.rs index b10ce9cf54..d40aebfbd7 100644 --- a/crates/agent_servers/src/codex.rs +++ b/crates/agent_servers/src/codex.rs @@ -73,7 +73,7 @@ impl AgentServer for Codex { client .client() .context("Failed to subscribe")? - .on_notification(acp::SESSION_UPDATE_METHOD_NAME, { + .on_notification(acp::AGENT_METHODS.session_update, { move |notification, _cx| { let notification_tx = notification_tx.clone(); log::trace!( @@ -149,13 +149,9 @@ impl AgentConnection for CodexConnection { let response = client .request::(context_server::types::CallToolParams { - name: acp::NEW_SESSION_TOOL_NAME.into(), + name: acp::AGENT_METHODS.new_session.into(), arguments: Some(serde_json::to_value(acp::NewSessionArguments { - mcp_servers: [( - mcp_server::SERVER_NAME.to_string(), - mcp_server.server_config()?, - )] - .into(), + mcp_servers: vec![mcp_server.server_config()?], client_tools: acp::ClientTools { request_permission: Some(acp::McpToolId { mcp_server: mcp_server::SERVER_NAME.into(), @@ -227,7 +223,7 @@ impl AgentConnection for CodexConnection { let result = client .request_with::( context_server::types::CallToolParams { - name: acp::PROMPT_TOOL_NAME.into(), + name: acp::AGENT_METHODS.prompt.into(), arguments: Some(serde_json::to_value(params)?), meta: None, }, diff --git a/crates/agent_servers/src/mcp_server.rs b/crates/agent_servers/src/mcp_server.rs index 055b89dfe2..ec655800ed 100644 --- a/crates/agent_servers/src/mcp_server.rs +++ b/crates/agent_servers/src/mcp_server.rs @@ -37,7 +37,7 @@ impl ZedMcpServer { Ok(Self { server: mcp_server }) } - pub fn server_config(&self) -> Result { + pub fn server_config(&self) -> Result { #[cfg(not(test))] let zed_path = anyhow::Context::context( std::env::current_exe(), @@ -47,13 +47,14 @@ impl ZedMcpServer { #[cfg(test)] let zed_path = crate::e2e_tests::get_zed_path(); - Ok(acp::McpServerConfig { + Ok(acp::McpServer { + name: SERVER_NAME.into(), command: zed_path, args: vec![ "--nc".into(), self.server.socket_path().display().to_string(), ], - env: None, + env: vec![], }) } From ced3d09f10c1d64fa8dd31065724e4b12b69e9ee Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Mon, 28 Jul 2025 18:32:30 -0300 Subject: [PATCH 02/13] Extract acp_connection --- Cargo.lock | 4 +- Cargo.toml | 2 +- crates/acp_thread/src/acp_thread.rs | 3 +- crates/acp_thread/src/connection.rs | 2 - crates/acp_thread/src/old_acp_support.rs | 6 +- crates/agent_servers/acp | 0 crates/agent_servers/src/acp_connection.rs | 256 +++++++++++++++++++++ crates/agent_servers/src/agent_servers.rs | 1 + crates/agent_servers/src/claude.rs | 9 +- crates/agent_servers/src/codex.rs | 255 +------------------- crates/agent_servers/src/gemini.rs | 182 +++------------ 11 files changed, 305 insertions(+), 415 deletions(-) create mode 100644 crates/agent_servers/acp create mode 100644 crates/agent_servers/src/acp_connection.rs diff --git a/Cargo.lock b/Cargo.lock index eb034f4cc3..e5b1be5a8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -138,7 +138,9 @@ dependencies = [ [[package]] name = "agent-client-protocol" -version = "0.0.12" +version = "0.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4255a06cc2414033d1fe4baf1968bcc8f16d7e5814f272b97779b5806d129142" dependencies = [ "schemars", "serde", diff --git a/Cargo.toml b/Cargo.toml index d733f2242e..81da82cbb7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -413,7 +413,7 @@ zlog_settings = { path = "crates/zlog_settings" } # agentic-coding-protocol = "0.0.10" -agent-client-protocol = {path="../agent-client-protocol"} +agent-client-protocol = "0.0.13" aho-corasick = "1.1" alacritty_terminal = { git = "https://github.com/zed-industries/alacritty.git", branch = "add-hush-login-flag" } any_vec = "0.14" diff --git a/crates/acp_thread/src/acp_thread.rs b/crates/acp_thread/src/acp_thread.rs index d02f2d6bb6..1c4b0ec06f 100644 --- a/crates/acp_thread/src/acp_thread.rs +++ b/crates/acp_thread/src/acp_thread.rs @@ -616,6 +616,7 @@ impl Error for LoadError {} impl AcpThread { pub fn new( + title: impl Into, connection: Rc, project: Entity, session_id: acp::SessionId, @@ -628,7 +629,7 @@ impl AcpThread { shared_buffers: Default::default(), entries: Default::default(), plan: Default::default(), - title: connection.name().into(), + title: title.into(), project, send_task: None, connection, diff --git a/crates/acp_thread/src/connection.rs b/crates/acp_thread/src/connection.rs index 5b25b71863..97161a19c0 100644 --- a/crates/acp_thread/src/connection.rs +++ b/crates/acp_thread/src/connection.rs @@ -9,8 +9,6 @@ use ui::App; use crate::AcpThread; pub trait AgentConnection { - fn name(&self) -> &'static str; - fn new_thread( self: Rc, project: Entity, diff --git a/crates/acp_thread/src/old_acp_support.rs b/crates/acp_thread/src/old_acp_support.rs index 44cd00348f..d7ef1b73da 100644 --- a/crates/acp_thread/src/old_acp_support.rs +++ b/crates/acp_thread/src/old_acp_support.rs @@ -367,10 +367,6 @@ pub struct OldAcpAgentConnection { } impl AgentConnection for OldAcpAgentConnection { - fn name(&self) -> &'static str { - self.name - } - fn new_thread( self: Rc, project: Entity, @@ -394,7 +390,7 @@ impl AgentConnection for OldAcpAgentConnection { cx.update(|cx| { let thread = cx.new(|cx| { let session_id = acp::SessionId("acp-old-no-id".into()); - AcpThread::new(self.clone(), project, session_id, cx) + AcpThread::new("Gemini", self.clone(), project, session_id, cx) }); thread }) diff --git a/crates/agent_servers/acp b/crates/agent_servers/acp new file mode 100644 index 0000000000..e69de29bb2 diff --git a/crates/agent_servers/src/acp_connection.rs b/crates/agent_servers/src/acp_connection.rs new file mode 100644 index 0000000000..9139d62c38 --- /dev/null +++ b/crates/agent_servers/src/acp_connection.rs @@ -0,0 +1,256 @@ +use agent_client_protocol as acp; +use anyhow::anyhow; +use collections::HashMap; +use context_server::listener::McpServerTool; +use context_server::types::requests; +use context_server::{ContextServer, ContextServerCommand, ContextServerId}; +use futures::channel::{mpsc, oneshot}; +use project::Project; +use smol::stream::StreamExt as _; +use std::cell::RefCell; +use std::rc::Rc; +use std::{path::Path, sync::Arc}; +use util::ResultExt; + +use anyhow::{Context, Result}; +use gpui::{App, AppContext as _, AsyncApp, Entity, Task, WeakEntity}; + +use crate::mcp_server::ZedMcpServer; +use crate::{AgentServerCommand, mcp_server}; +use acp_thread::{AcpThread, AgentConnection}; + +pub struct AcpConnection { + server_name: &'static str, + client: Arc, + sessions: Rc>>, + _notification_handler_task: Task<()>, +} + +impl AcpConnection { + pub async fn stdio( + server_name: &'static str, + command: AgentServerCommand, + cx: &mut AsyncApp, + ) -> Result { + let client: Arc = ContextServer::stdio( + ContextServerId(format!("{}-mcp-server", server_name).into()), + ContextServerCommand { + path: command.path, + args: command.args, + env: command.env, + }, + ) + .into(); + ContextServer::start(client.clone(), cx).await?; + + let (notification_tx, mut notification_rx) = mpsc::unbounded(); + client + .client() + .context("Failed to subscribe")? + .on_notification(acp::AGENT_METHODS.session_update, { + move |notification, _cx| { + let notification_tx = notification_tx.clone(); + log::trace!( + "ACP Notification: {}", + serde_json::to_string_pretty(¬ification).unwrap() + ); + + if let Some(notification) = + serde_json::from_value::(notification).log_err() + { + notification_tx.unbounded_send(notification).ok(); + } + } + }); + + let sessions = Rc::new(RefCell::new(HashMap::default())); + + let notification_handler_task = cx.spawn({ + let sessions = sessions.clone(); + async move |cx| { + while let Some(notification) = notification_rx.next().await { + Self::handle_session_notification(notification, sessions.clone(), cx) + } + } + }); + + Ok(Self { + server_name, + client, + sessions, + _notification_handler_task: notification_handler_task, + }) + } + + pub fn handle_session_notification( + notification: acp::SessionNotification, + threads: Rc>>, + cx: &mut AsyncApp, + ) { + let threads = threads.borrow(); + let Some(thread) = threads + .get(¬ification.session_id) + .and_then(|session| session.thread.upgrade()) + else { + log::error!( + "Thread not found for session ID: {}", + notification.session_id + ); + return; + }; + + thread + .update(cx, |thread, cx| { + thread.handle_session_update(notification.update, cx) + }) + .log_err(); + } +} + +pub struct AcpSession { + thread: WeakEntity, + cancel_tx: Option>, + _mcp_server: ZedMcpServer, +} + +impl AgentConnection for AcpConnection { + fn new_thread( + self: Rc, + project: Entity, + cwd: &Path, + cx: &mut AsyncApp, + ) -> Task>> { + let client = self.client.client(); + let sessions = self.sessions.clone(); + let cwd = cwd.to_path_buf(); + cx.spawn(async move |cx| { + let client = client.context("MCP server is not initialized yet")?; + let (mut thread_tx, thread_rx) = watch::channel(WeakEntity::new_invalid()); + + let mcp_server = ZedMcpServer::new(thread_rx, cx).await?; + + let response = client + .request::(context_server::types::CallToolParams { + name: acp::AGENT_METHODS.new_session.into(), + arguments: Some(serde_json::to_value(acp::NewSessionArguments { + mcp_servers: vec![mcp_server.server_config()?], + client_tools: acp::ClientTools { + request_permission: Some(acp::McpToolId { + mcp_server: mcp_server::SERVER_NAME.into(), + tool_name: mcp_server::RequestPermissionTool::NAME.into(), + }), + read_text_file: Some(acp::McpToolId { + mcp_server: mcp_server::SERVER_NAME.into(), + tool_name: mcp_server::ReadTextFileTool::NAME.into(), + }), + write_text_file: Some(acp::McpToolId { + mcp_server: mcp_server::SERVER_NAME.into(), + tool_name: mcp_server::WriteTextFileTool::NAME.into(), + }), + }, + cwd, + })?), + meta: None, + }) + .await?; + + if response.is_error.unwrap_or_default() { + return Err(anyhow!(response.text_contents())); + } + + let result = serde_json::from_value::( + response.structured_content.context("Empty response")?, + )?; + + let thread = cx.new(|cx| { + AcpThread::new( + self.server_name, + self.clone(), + project, + result.session_id.clone(), + cx, + ) + })?; + + thread_tx.send(thread.downgrade())?; + + let session = AcpSession { + thread: thread.downgrade(), + cancel_tx: None, + _mcp_server: mcp_server, + }; + sessions.borrow_mut().insert(result.session_id, session); + + Ok(thread) + }) + } + + fn authenticate(&self, _cx: &mut App) -> Task> { + Task::ready(Err(anyhow!("Authentication not supported"))) + } + + fn prompt( + &self, + params: agent_client_protocol::PromptArguments, + cx: &mut App, + ) -> Task> { + let client = self.client.client(); + let sessions = self.sessions.clone(); + + cx.foreground_executor().spawn(async move { + let client = client.context("MCP server is not initialized yet")?; + + let (new_cancel_tx, cancel_rx) = oneshot::channel(); + { + let mut sessions = sessions.borrow_mut(); + let session = sessions + .get_mut(¶ms.session_id) + .context("Session not found")?; + session.cancel_tx.replace(new_cancel_tx); + } + + let result = client + .request_with::( + context_server::types::CallToolParams { + name: acp::AGENT_METHODS.prompt.into(), + arguments: Some(serde_json::to_value(params)?), + meta: None, + }, + Some(cancel_rx), + None, + ) + .await; + + if let Err(err) = &result + && err.is::() + { + return Ok(()); + } + + let response = result?; + + if response.is_error.unwrap_or_default() { + return Err(anyhow!(response.text_contents())); + } + + Ok(()) + }) + } + + fn cancel(&self, session_id: &agent_client_protocol::SessionId, _cx: &mut App) { + let mut sessions = self.sessions.borrow_mut(); + + if let Some(cancel_tx) = sessions + .get_mut(session_id) + .and_then(|session| session.cancel_tx.take()) + { + cancel_tx.send(()).ok(); + } + } +} + +impl Drop for AcpConnection { + fn drop(&mut self) { + self.client.stop().log_err(); + } +} diff --git a/crates/agent_servers/src/agent_servers.rs b/crates/agent_servers/src/agent_servers.rs index 212bb74d8a..6a031a190e 100644 --- a/crates/agent_servers/src/agent_servers.rs +++ b/crates/agent_servers/src/agent_servers.rs @@ -1,3 +1,4 @@ +mod acp_connection; mod claude; mod codex; mod gemini; diff --git a/crates/agent_servers/src/claude.rs b/crates/agent_servers/src/claude.rs index 6565786204..590da69cd8 100644 --- a/crates/agent_servers/src/claude.rs +++ b/crates/agent_servers/src/claude.rs @@ -70,10 +70,6 @@ struct ClaudeAgentConnection { } impl AgentConnection for ClaudeAgentConnection { - fn name(&self) -> &'static str { - ClaudeCode.name() - } - fn new_thread( self: Rc, project: Entity, @@ -168,8 +164,9 @@ impl AgentConnection for ClaudeAgentConnection { } }); - let thread = - cx.new(|cx| AcpThread::new(self.clone(), project, session_id.clone(), cx))?; + let thread = cx.new(|cx| { + AcpThread::new("Claude Code", self.clone(), project, session_id.clone(), cx) + })?; thread_tx.send(thread.downgrade())?; diff --git a/crates/agent_servers/src/codex.rs b/crates/agent_servers/src/codex.rs index d40aebfbd7..1909781efa 100644 --- a/crates/agent_servers/src/codex.rs +++ b/crates/agent_servers/src/codex.rs @@ -1,24 +1,14 @@ -use agent_client_protocol as acp; -use anyhow::anyhow; -use collections::HashMap; -use context_server::listener::McpServerTool; -use context_server::types::requests; -use context_server::{ContextServer, ContextServerCommand, ContextServerId}; -use futures::channel::{mpsc, oneshot}; use project::Project; use settings::SettingsStore; -use smol::stream::StreamExt as _; -use std::cell::RefCell; +use std::path::Path; use std::rc::Rc; -use std::{path::Path, sync::Arc}; -use util::ResultExt; -use anyhow::{Context, Result}; -use gpui::{App, AppContext as _, AsyncApp, Entity, Task, WeakEntity}; +use anyhow::Result; +use gpui::{App, Entity, Task}; -use crate::mcp_server::ZedMcpServer; -use crate::{AgentServer, AgentServerCommand, AllAgentServersSettings, mcp_server}; -use acp_thread::{AcpThread, AgentConnection}; +use crate::acp_connection::AcpConnection; +use crate::{AgentServer, AgentServerCommand, AllAgentServersSettings}; +use acp_thread::AgentConnection; #[derive(Clone)] pub struct Codex; @@ -47,6 +37,7 @@ impl AgentServer for Codex { cx: &mut App, ) -> Task>> { let project = project.clone(); + let server_name = self.name(); cx.spawn(async move |cx| { let settings = cx.read_global(|settings: &SettingsStore, _| { settings.get::(None).codex.clone() @@ -58,240 +49,12 @@ impl AgentServer for Codex { anyhow::bail!("Failed to find codex binary"); }; - let client: Arc = ContextServer::stdio( - ContextServerId("codex-mcp-server".into()), - ContextServerCommand { - path: command.path, - args: command.args, - env: command.env, - }, - ) - .into(); - ContextServer::start(client.clone(), cx).await?; - - let (notification_tx, mut notification_rx) = mpsc::unbounded(); - client - .client() - .context("Failed to subscribe")? - .on_notification(acp::AGENT_METHODS.session_update, { - move |notification, _cx| { - let notification_tx = notification_tx.clone(); - log::trace!( - "ACP Notification: {}", - serde_json::to_string_pretty(¬ification).unwrap() - ); - - if let Some(notification) = - serde_json::from_value::(notification) - .log_err() - { - notification_tx.unbounded_send(notification).ok(); - } - } - }); - - let sessions = Rc::new(RefCell::new(HashMap::default())); - - let notification_handler_task = cx.spawn({ - let sessions = sessions.clone(); - async move |cx| { - while let Some(notification) = notification_rx.next().await { - CodexConnection::handle_session_notification( - notification, - sessions.clone(), - cx, - ) - } - } - }); - - let connection = CodexConnection { - client, - sessions, - _notification_handler_task: notification_handler_task, - }; - Ok(Rc::new(connection) as _) + let conn = AcpConnection::stdio(server_name, command, cx).await?; + Ok(Rc::new(conn) as _) }) } } -struct CodexConnection { - client: Arc, - sessions: Rc>>, - _notification_handler_task: Task<()>, -} - -struct CodexSession { - thread: WeakEntity, - cancel_tx: Option>, - _mcp_server: ZedMcpServer, -} - -impl AgentConnection for CodexConnection { - fn name(&self) -> &'static str { - "Codex" - } - - fn new_thread( - self: Rc, - project: Entity, - cwd: &Path, - cx: &mut AsyncApp, - ) -> Task>> { - let client = self.client.client(); - let sessions = self.sessions.clone(); - let cwd = cwd.to_path_buf(); - cx.spawn(async move |cx| { - let client = client.context("MCP server is not initialized yet")?; - let (mut thread_tx, thread_rx) = watch::channel(WeakEntity::new_invalid()); - - let mcp_server = ZedMcpServer::new(thread_rx, cx).await?; - - let response = client - .request::(context_server::types::CallToolParams { - name: acp::AGENT_METHODS.new_session.into(), - arguments: Some(serde_json::to_value(acp::NewSessionArguments { - mcp_servers: vec![mcp_server.server_config()?], - client_tools: acp::ClientTools { - request_permission: Some(acp::McpToolId { - mcp_server: mcp_server::SERVER_NAME.into(), - tool_name: mcp_server::RequestPermissionTool::NAME.into(), - }), - read_text_file: Some(acp::McpToolId { - mcp_server: mcp_server::SERVER_NAME.into(), - tool_name: mcp_server::ReadTextFileTool::NAME.into(), - }), - write_text_file: Some(acp::McpToolId { - mcp_server: mcp_server::SERVER_NAME.into(), - tool_name: mcp_server::WriteTextFileTool::NAME.into(), - }), - }, - cwd, - })?), - meta: None, - }) - .await?; - - if response.is_error.unwrap_or_default() { - return Err(anyhow!(response.text_contents())); - } - - let result = serde_json::from_value::( - response.structured_content.context("Empty response")?, - )?; - - let thread = - cx.new(|cx| AcpThread::new(self.clone(), project, result.session_id.clone(), cx))?; - - thread_tx.send(thread.downgrade())?; - - let session = CodexSession { - thread: thread.downgrade(), - cancel_tx: None, - _mcp_server: mcp_server, - }; - sessions.borrow_mut().insert(result.session_id, session); - - Ok(thread) - }) - } - - fn authenticate(&self, _cx: &mut App) -> Task> { - Task::ready(Err(anyhow!("Authentication not supported"))) - } - - fn prompt( - &self, - params: agent_client_protocol::PromptArguments, - cx: &mut App, - ) -> Task> { - let client = self.client.client(); - let sessions = self.sessions.clone(); - - cx.foreground_executor().spawn(async move { - let client = client.context("MCP server is not initialized yet")?; - - let (new_cancel_tx, cancel_rx) = oneshot::channel(); - { - let mut sessions = sessions.borrow_mut(); - let session = sessions - .get_mut(¶ms.session_id) - .context("Session not found")?; - session.cancel_tx.replace(new_cancel_tx); - } - - let result = client - .request_with::( - context_server::types::CallToolParams { - name: acp::AGENT_METHODS.prompt.into(), - arguments: Some(serde_json::to_value(params)?), - meta: None, - }, - Some(cancel_rx), - None, - ) - .await; - - if let Err(err) = &result - && err.is::() - { - return Ok(()); - } - - let response = result?; - - if response.is_error.unwrap_or_default() { - return Err(anyhow!(response.text_contents())); - } - - Ok(()) - }) - } - - fn cancel(&self, session_id: &agent_client_protocol::SessionId, _cx: &mut App) { - let mut sessions = self.sessions.borrow_mut(); - - if let Some(cancel_tx) = sessions - .get_mut(session_id) - .and_then(|session| session.cancel_tx.take()) - { - cancel_tx.send(()).ok(); - } - } -} - -impl CodexConnection { - pub fn handle_session_notification( - notification: acp::SessionNotification, - threads: Rc>>, - cx: &mut AsyncApp, - ) { - let threads = threads.borrow(); - let Some(thread) = threads - .get(¬ification.session_id) - .and_then(|session| session.thread.upgrade()) - else { - log::error!( - "Thread not found for session ID: {}", - notification.session_id - ); - return; - }; - - thread - .update(cx, |thread, cx| { - thread.handle_session_update(notification.update, cx) - }) - .log_err(); - } -} - -impl Drop for CodexConnection { - fn drop(&mut self) { - self.client.stop().log_err(); - } -} - #[cfg(test)] pub(crate) mod tests { use super::*; diff --git a/crates/agent_servers/src/gemini.rs b/crates/agent_servers/src/gemini.rs index 8b9fed5777..70c7f8efb5 100644 --- a/crates/agent_servers/src/gemini.rs +++ b/crates/agent_servers/src/gemini.rs @@ -1,25 +1,18 @@ -use anyhow::anyhow; -use std::cell::RefCell; -use std::path::Path; -use std::rc::Rc; -use util::ResultExt as _; - -use crate::{AgentServer, AgentServerCommand, AgentServerVersion}; -use acp_thread::{AgentConnection, LoadError, OldAcpAgentConnection, OldAcpClientDelegate}; -use agentic_coding_protocol as acp_old; -use anyhow::{Context as _, Result}; -use gpui::{AppContext as _, AsyncApp, Entity, Task, WeakEntity}; use project::Project; use settings::SettingsStore; -use ui::App; +use std::path::Path; +use std::rc::Rc; -use crate::AllAgentServersSettings; +use anyhow::Result; +use gpui::{App, Entity, Task}; + +use crate::acp_connection::AcpConnection; +use crate::{AgentServer, AgentServerCommand, AllAgentServersSettings}; +use acp_thread::AgentConnection; #[derive(Clone)] pub struct Gemini; -const ACP_ARG: &str = "--experimental-acp"; - impl AgentServer for Gemini { fn name(&self) -> &'static str { "Gemini" @@ -39,166 +32,49 @@ impl AgentServer for Gemini { fn connect( &self, - root_dir: &Path, + _root_dir: &Path, project: &Entity, cx: &mut App, ) -> Task>> { - let root_dir = root_dir.to_path_buf(); let project = project.clone(); - let this = self.clone(); - let name = self.name(); - + let server_name = self.name(); cx.spawn(async move |cx| { - let command = this.command(&project, cx).await?; + let settings = cx.read_global(|settings: &SettingsStore, _| { + settings.get::(None).gemini.clone() + })?; - let mut child = util::command::new_smol_command(&command.path) - .args(command.args.iter()) - .current_dir(root_dir) - .stdin(std::process::Stdio::piped()) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::inherit()) - .kill_on_drop(true) - .spawn()?; + let Some(command) = AgentServerCommand::resolve( + "gemini", + &["--experimental-mcp"], + settings, + &project, + cx, + ) + .await + else { + anyhow::bail!("Failed to find gemini binary"); + }; - let stdin = child.stdin.take().unwrap(); - let stdout = child.stdout.take().unwrap(); - - let foreground_executor = cx.foreground_executor().clone(); - - let thread_rc = Rc::new(RefCell::new(WeakEntity::new_invalid())); - - let (connection, io_fut) = acp_old::AgentConnection::connect_to_agent( - OldAcpClientDelegate::new(thread_rc.clone(), cx.clone()), - stdin, - stdout, - move |fut| foreground_executor.spawn(fut).detach(), - ); - - let io_task = cx.background_spawn(async move { - io_fut.await.log_err(); - }); - - let child_status = cx.background_spawn(async move { - let result = match child.status().await { - Err(e) => Err(anyhow!(e)), - Ok(result) if result.success() => Ok(()), - Ok(result) => { - if let Some(AgentServerVersion::Unsupported { - error_message, - upgrade_message, - upgrade_command, - }) = this.version(&command).await.log_err() - { - Err(anyhow!(LoadError::Unsupported { - error_message, - upgrade_message, - upgrade_command - })) - } else { - Err(anyhow!(LoadError::Exited(result.code().unwrap_or(-127)))) - } - } - }; - drop(io_task); - result - }); - - let connection: Rc = Rc::new(OldAcpAgentConnection { - name, - connection, - child_status, - }); - - Ok(connection) + let conn = AcpConnection::stdio(server_name, command, cx).await?; + Ok(Rc::new(conn) as _) }) } } -impl Gemini { - async fn command( - &self, - project: &Entity, - cx: &mut AsyncApp, - ) -> Result { - let settings = cx.read_global(|settings: &SettingsStore, _| { - settings.get::(None).gemini.clone() - })?; - - if let Some(command) = - AgentServerCommand::resolve("gemini", &[ACP_ARG], settings, &project, cx).await - { - return Ok(command); - }; - - let (fs, node_runtime) = project.update(cx, |project, _| { - (project.fs().clone(), project.node_runtime().cloned()) - })?; - let node_runtime = node_runtime.context("gemini not found on path")?; - - let directory = ::paths::agent_servers_dir().join("gemini"); - fs.create_dir(&directory).await?; - node_runtime - .npm_install_packages(&directory, &[("@google/gemini-cli", "latest")]) - .await?; - let path = directory.join("node_modules/.bin/gemini"); - - Ok(AgentServerCommand { - path, - args: vec![ACP_ARG.into()], - env: None, - }) - } - - async fn version(&self, command: &AgentServerCommand) -> Result { - let version_fut = util::command::new_smol_command(&command.path) - .args(command.args.iter()) - .arg("--version") - .kill_on_drop(true) - .output(); - - let help_fut = util::command::new_smol_command(&command.path) - .args(command.args.iter()) - .arg("--help") - .kill_on_drop(true) - .output(); - - let (version_output, help_output) = futures::future::join(version_fut, help_fut).await; - - let current_version = String::from_utf8(version_output?.stdout)?; - let supported = String::from_utf8(help_output?.stdout)?.contains(ACP_ARG); - - if supported { - Ok(AgentServerVersion::Supported) - } else { - Ok(AgentServerVersion::Unsupported { - error_message: format!( - "Your installed version of Gemini {} doesn't support the Agentic Coding Protocol (ACP).", - current_version - ).into(), - upgrade_message: "Upgrade Gemini to Latest".into(), - upgrade_command: "npm install -g @google/gemini-cli@latest".into(), - }) - } - } -} - #[cfg(test)] pub(crate) mod tests { use super::*; use crate::AgentServerCommand; use std::path::Path; - crate::common_e2e_tests!(Gemini, allow_option_id = "0"); + crate::common_e2e_tests!(Gemini, allow_option_id = "allow"); pub fn local_command() -> AgentServerCommand { - let cli_path = Path::new(env!("CARGO_MANIFEST_DIR")) - .join("../../../gemini-cli/packages/cli") - .to_string_lossy() - .to_string(); + let cli_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("../../../gemini/packages/cli"); AgentServerCommand { path: "node".into(), - args: vec![cli_path, ACP_ARG.into()], + args: vec![cli_path.to_string_lossy().to_string()], env: None, } } From b48faddaf416f2597bbec9bf30823017cd1d1931 Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Mon, 28 Jul 2025 18:45:05 -0300 Subject: [PATCH 03/13] Restore gemini change --- crates/agent_servers/src/gemini.rs | 180 ++++++++++++++++++++++++----- 1 file changed, 152 insertions(+), 28 deletions(-) diff --git a/crates/agent_servers/src/gemini.rs b/crates/agent_servers/src/gemini.rs index 70c7f8efb5..8b9fed5777 100644 --- a/crates/agent_servers/src/gemini.rs +++ b/crates/agent_servers/src/gemini.rs @@ -1,18 +1,25 @@ -use project::Project; -use settings::SettingsStore; +use anyhow::anyhow; +use std::cell::RefCell; use std::path::Path; use std::rc::Rc; +use util::ResultExt as _; -use anyhow::Result; -use gpui::{App, Entity, Task}; +use crate::{AgentServer, AgentServerCommand, AgentServerVersion}; +use acp_thread::{AgentConnection, LoadError, OldAcpAgentConnection, OldAcpClientDelegate}; +use agentic_coding_protocol as acp_old; +use anyhow::{Context as _, Result}; +use gpui::{AppContext as _, AsyncApp, Entity, Task, WeakEntity}; +use project::Project; +use settings::SettingsStore; +use ui::App; -use crate::acp_connection::AcpConnection; -use crate::{AgentServer, AgentServerCommand, AllAgentServersSettings}; -use acp_thread::AgentConnection; +use crate::AllAgentServersSettings; #[derive(Clone)] pub struct Gemini; +const ACP_ARG: &str = "--experimental-acp"; + impl AgentServer for Gemini { fn name(&self) -> &'static str { "Gemini" @@ -32,49 +39,166 @@ impl AgentServer for Gemini { fn connect( &self, - _root_dir: &Path, + root_dir: &Path, project: &Entity, cx: &mut App, ) -> Task>> { + let root_dir = root_dir.to_path_buf(); let project = project.clone(); - let server_name = self.name(); + let this = self.clone(); + let name = self.name(); + cx.spawn(async move |cx| { - let settings = cx.read_global(|settings: &SettingsStore, _| { - settings.get::(None).gemini.clone() - })?; + let command = this.command(&project, cx).await?; - let Some(command) = AgentServerCommand::resolve( - "gemini", - &["--experimental-mcp"], - settings, - &project, - cx, - ) - .await - else { - anyhow::bail!("Failed to find gemini binary"); - }; + let mut child = util::command::new_smol_command(&command.path) + .args(command.args.iter()) + .current_dir(root_dir) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::inherit()) + .kill_on_drop(true) + .spawn()?; - let conn = AcpConnection::stdio(server_name, command, cx).await?; - Ok(Rc::new(conn) as _) + let stdin = child.stdin.take().unwrap(); + let stdout = child.stdout.take().unwrap(); + + let foreground_executor = cx.foreground_executor().clone(); + + let thread_rc = Rc::new(RefCell::new(WeakEntity::new_invalid())); + + let (connection, io_fut) = acp_old::AgentConnection::connect_to_agent( + OldAcpClientDelegate::new(thread_rc.clone(), cx.clone()), + stdin, + stdout, + move |fut| foreground_executor.spawn(fut).detach(), + ); + + let io_task = cx.background_spawn(async move { + io_fut.await.log_err(); + }); + + let child_status = cx.background_spawn(async move { + let result = match child.status().await { + Err(e) => Err(anyhow!(e)), + Ok(result) if result.success() => Ok(()), + Ok(result) => { + if let Some(AgentServerVersion::Unsupported { + error_message, + upgrade_message, + upgrade_command, + }) = this.version(&command).await.log_err() + { + Err(anyhow!(LoadError::Unsupported { + error_message, + upgrade_message, + upgrade_command + })) + } else { + Err(anyhow!(LoadError::Exited(result.code().unwrap_or(-127)))) + } + } + }; + drop(io_task); + result + }); + + let connection: Rc = Rc::new(OldAcpAgentConnection { + name, + connection, + child_status, + }); + + Ok(connection) }) } } +impl Gemini { + async fn command( + &self, + project: &Entity, + cx: &mut AsyncApp, + ) -> Result { + let settings = cx.read_global(|settings: &SettingsStore, _| { + settings.get::(None).gemini.clone() + })?; + + if let Some(command) = + AgentServerCommand::resolve("gemini", &[ACP_ARG], settings, &project, cx).await + { + return Ok(command); + }; + + let (fs, node_runtime) = project.update(cx, |project, _| { + (project.fs().clone(), project.node_runtime().cloned()) + })?; + let node_runtime = node_runtime.context("gemini not found on path")?; + + let directory = ::paths::agent_servers_dir().join("gemini"); + fs.create_dir(&directory).await?; + node_runtime + .npm_install_packages(&directory, &[("@google/gemini-cli", "latest")]) + .await?; + let path = directory.join("node_modules/.bin/gemini"); + + Ok(AgentServerCommand { + path, + args: vec![ACP_ARG.into()], + env: None, + }) + } + + async fn version(&self, command: &AgentServerCommand) -> Result { + let version_fut = util::command::new_smol_command(&command.path) + .args(command.args.iter()) + .arg("--version") + .kill_on_drop(true) + .output(); + + let help_fut = util::command::new_smol_command(&command.path) + .args(command.args.iter()) + .arg("--help") + .kill_on_drop(true) + .output(); + + let (version_output, help_output) = futures::future::join(version_fut, help_fut).await; + + let current_version = String::from_utf8(version_output?.stdout)?; + let supported = String::from_utf8(help_output?.stdout)?.contains(ACP_ARG); + + if supported { + Ok(AgentServerVersion::Supported) + } else { + Ok(AgentServerVersion::Unsupported { + error_message: format!( + "Your installed version of Gemini {} doesn't support the Agentic Coding Protocol (ACP).", + current_version + ).into(), + upgrade_message: "Upgrade Gemini to Latest".into(), + upgrade_command: "npm install -g @google/gemini-cli@latest".into(), + }) + } + } +} + #[cfg(test)] pub(crate) mod tests { use super::*; use crate::AgentServerCommand; use std::path::Path; - crate::common_e2e_tests!(Gemini, allow_option_id = "allow"); + crate::common_e2e_tests!(Gemini, allow_option_id = "0"); pub fn local_command() -> AgentServerCommand { - let cli_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("../../../gemini/packages/cli"); + let cli_path = Path::new(env!("CARGO_MANIFEST_DIR")) + .join("../../../gemini-cli/packages/cli") + .to_string_lossy() + .to_string(); AgentServerCommand { path: "node".into(), - args: vec![cli_path.to_string_lossy().to_string()], + args: vec![cli_path, ACP_ARG.into()], env: None, } } From 912ab505b2677a90de74c73d6258cd4f709aee2d Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Mon, 28 Jul 2025 20:04:32 -0300 Subject: [PATCH 04/13] Connect to gemini over MCP --- crates/agent_servers/src/codex.rs | 1 + crates/agent_servers/src/gemini.rs | 159 ++++------------------------- 2 files changed, 19 insertions(+), 141 deletions(-) diff --git a/crates/agent_servers/src/codex.rs b/crates/agent_servers/src/codex.rs index 1909781efa..06d8d10a91 100644 --- a/crates/agent_servers/src/codex.rs +++ b/crates/agent_servers/src/codex.rs @@ -48,6 +48,7 @@ impl AgentServer for Codex { else { anyhow::bail!("Failed to find codex binary"); }; + // todo! check supported version let conn = AcpConnection::stdio(server_name, command, cx).await?; Ok(Rc::new(conn) as _) diff --git a/crates/agent_servers/src/gemini.rs b/crates/agent_servers/src/gemini.rs index 8b9fed5777..07c4e1b539 100644 --- a/crates/agent_servers/src/gemini.rs +++ b/crates/agent_servers/src/gemini.rs @@ -1,14 +1,10 @@ -use anyhow::anyhow; -use std::cell::RefCell; use std::path::Path; use std::rc::Rc; -use util::ResultExt as _; -use crate::{AgentServer, AgentServerCommand, AgentServerVersion}; -use acp_thread::{AgentConnection, LoadError, OldAcpAgentConnection, OldAcpClientDelegate}; -use agentic_coding_protocol as acp_old; -use anyhow::{Context as _, Result}; -use gpui::{AppContext as _, AsyncApp, Entity, Task, WeakEntity}; +use crate::{AgentServer, AgentServerCommand, acp_connection::AcpConnection}; +use acp_thread::AgentConnection; +use anyhow::Result; +use gpui::{Entity, Task}; use project::Project; use settings::SettingsStore; use ui::App; @@ -39,149 +35,30 @@ impl AgentServer for Gemini { fn connect( &self, - root_dir: &Path, + _root_dir: &Path, project: &Entity, cx: &mut App, ) -> Task>> { - let root_dir = root_dir.to_path_buf(); let project = project.clone(); - let this = self.clone(); - let name = self.name(); - + let server_name = self.name(); cx.spawn(async move |cx| { - let command = this.command(&project, cx).await?; + let settings = cx.read_global(|settings: &SettingsStore, _| { + settings.get::(None).gemini.clone() + })?; - let mut child = util::command::new_smol_command(&command.path) - .args(command.args.iter()) - .current_dir(root_dir) - .stdin(std::process::Stdio::piped()) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::inherit()) - .kill_on_drop(true) - .spawn()?; + let Some(command) = + AgentServerCommand::resolve("gemini", &[ACP_ARG], settings, &project, cx).await + else { + anyhow::bail!("Failed to find gemini binary"); + }; + // todo! check supported version - let stdin = child.stdin.take().unwrap(); - let stdout = child.stdout.take().unwrap(); - - let foreground_executor = cx.foreground_executor().clone(); - - let thread_rc = Rc::new(RefCell::new(WeakEntity::new_invalid())); - - let (connection, io_fut) = acp_old::AgentConnection::connect_to_agent( - OldAcpClientDelegate::new(thread_rc.clone(), cx.clone()), - stdin, - stdout, - move |fut| foreground_executor.spawn(fut).detach(), - ); - - let io_task = cx.background_spawn(async move { - io_fut.await.log_err(); - }); - - let child_status = cx.background_spawn(async move { - let result = match child.status().await { - Err(e) => Err(anyhow!(e)), - Ok(result) if result.success() => Ok(()), - Ok(result) => { - if let Some(AgentServerVersion::Unsupported { - error_message, - upgrade_message, - upgrade_command, - }) = this.version(&command).await.log_err() - { - Err(anyhow!(LoadError::Unsupported { - error_message, - upgrade_message, - upgrade_command - })) - } else { - Err(anyhow!(LoadError::Exited(result.code().unwrap_or(-127)))) - } - } - }; - drop(io_task); - result - }); - - let connection: Rc = Rc::new(OldAcpAgentConnection { - name, - connection, - child_status, - }); - - Ok(connection) + let conn = AcpConnection::stdio(server_name, command, cx).await?; + Ok(Rc::new(conn) as _) }) } } -impl Gemini { - async fn command( - &self, - project: &Entity, - cx: &mut AsyncApp, - ) -> Result { - let settings = cx.read_global(|settings: &SettingsStore, _| { - settings.get::(None).gemini.clone() - })?; - - if let Some(command) = - AgentServerCommand::resolve("gemini", &[ACP_ARG], settings, &project, cx).await - { - return Ok(command); - }; - - let (fs, node_runtime) = project.update(cx, |project, _| { - (project.fs().clone(), project.node_runtime().cloned()) - })?; - let node_runtime = node_runtime.context("gemini not found on path")?; - - let directory = ::paths::agent_servers_dir().join("gemini"); - fs.create_dir(&directory).await?; - node_runtime - .npm_install_packages(&directory, &[("@google/gemini-cli", "latest")]) - .await?; - let path = directory.join("node_modules/.bin/gemini"); - - Ok(AgentServerCommand { - path, - args: vec![ACP_ARG.into()], - env: None, - }) - } - - async fn version(&self, command: &AgentServerCommand) -> Result { - let version_fut = util::command::new_smol_command(&command.path) - .args(command.args.iter()) - .arg("--version") - .kill_on_drop(true) - .output(); - - let help_fut = util::command::new_smol_command(&command.path) - .args(command.args.iter()) - .arg("--help") - .kill_on_drop(true) - .output(); - - let (version_output, help_output) = futures::future::join(version_fut, help_fut).await; - - let current_version = String::from_utf8(version_output?.stdout)?; - let supported = String::from_utf8(help_output?.stdout)?.contains(ACP_ARG); - - if supported { - Ok(AgentServerVersion::Supported) - } else { - Ok(AgentServerVersion::Unsupported { - error_message: format!( - "Your installed version of Gemini {} doesn't support the Agentic Coding Protocol (ACP).", - current_version - ).into(), - upgrade_message: "Upgrade Gemini to Latest".into(), - upgrade_command: "npm install -g @google/gemini-cli@latest".into(), - }) - } - } -} - #[cfg(test)] pub(crate) mod tests { use super::*; @@ -198,7 +75,7 @@ pub(crate) mod tests { AgentServerCommand { path: "node".into(), - args: vec![cli_path, ACP_ARG.into()], + args: vec![cli_path], env: None, } } From 254c6be42b69048bc5e1689e5438d1880fa8034c Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Tue, 29 Jul 2025 10:12:57 +0200 Subject: [PATCH 05/13] Fix broken test --- crates/acp_thread/src/acp_thread.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/acp_thread/src/acp_thread.rs b/crates/acp_thread/src/acp_thread.rs index 1c4b0ec06f..841d320796 100644 --- a/crates/acp_thread/src/acp_thread.rs +++ b/crates/acp_thread/src/acp_thread.rs @@ -1601,6 +1601,7 @@ mod tests { }; AcpThread::new( + "test", Rc::new(connection), project, acp::SessionId("test".into()), From 6656403ce85602e159262765654269297c95e630 Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Tue, 29 Jul 2025 21:15:00 -0300 Subject: [PATCH 06/13] Auth WIP --- Cargo.lock | 2 - Cargo.toml | 2 +- crates/acp_thread/src/acp_thread.rs | 4 - crates/acp_thread/src/connection.rs | 8 +- crates/acp_thread/src/old_acp_support.rs | 15 +++- crates/agent_servers/src/acp_connection.rs | 100 ++++++++++++++++----- crates/agent_servers/src/claude.rs | 10 ++- crates/agent_ui/src/acp/thread_view.rs | 55 ++++++++---- 8 files changed, 140 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1682b80a8c..f68136d978 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -139,8 +139,6 @@ dependencies = [ [[package]] name = "agent-client-protocol" version = "0.0.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4255a06cc2414033d1fe4baf1968bcc8f16d7e5814f272b97779b5806d129142" dependencies = [ "schemars", "serde", diff --git a/Cargo.toml b/Cargo.toml index 81da82cbb7..d733f2242e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -413,7 +413,7 @@ zlog_settings = { path = "crates/zlog_settings" } # agentic-coding-protocol = "0.0.10" -agent-client-protocol = "0.0.13" +agent-client-protocol = {path="../agent-client-protocol"} aho-corasick = "1.1" alacritty_terminal = { git = "https://github.com/zed-industries/alacritty.git", branch = "add-hush-login-flag" } any_vec = "0.14" diff --git a/crates/acp_thread/src/acp_thread.rs b/crates/acp_thread/src/acp_thread.rs index 841d320796..3b9f0842bd 100644 --- a/crates/acp_thread/src/acp_thread.rs +++ b/crates/acp_thread/src/acp_thread.rs @@ -958,10 +958,6 @@ impl AcpThread { cx.notify(); } - pub fn authenticate(&self, cx: &mut App) -> impl use<> + Future> { - self.connection.authenticate(cx) - } - #[cfg(any(test, feature = "test-support"))] pub fn send_raw( &mut self, diff --git a/crates/acp_thread/src/connection.rs b/crates/acp_thread/src/connection.rs index 97161a19c0..2e7deaf7df 100644 --- a/crates/acp_thread/src/connection.rs +++ b/crates/acp_thread/src/connection.rs @@ -1,6 +1,6 @@ -use std::{path::Path, rc::Rc}; +use std::{cell::Ref, path::Path, rc::Rc}; -use agent_client_protocol as acp; +use agent_client_protocol::{self as acp}; use anyhow::Result; use gpui::{AsyncApp, Entity, Task}; use project::Project; @@ -16,7 +16,9 @@ pub trait AgentConnection { cx: &mut AsyncApp, ) -> Task>>; - fn authenticate(&self, cx: &mut App) -> Task>; + fn state(&self) -> Ref<'_, acp::AgentState>; + + fn authenticate(&self, method: acp::AuthMethodId, cx: &mut App) -> Task>; fn prompt(&self, params: acp::PromptArguments, cx: &mut App) -> Task>; diff --git a/crates/acp_thread/src/old_acp_support.rs b/crates/acp_thread/src/old_acp_support.rs index d7ef1b73da..4d06f81d06 100644 --- a/crates/acp_thread/src/old_acp_support.rs +++ b/crates/acp_thread/src/old_acp_support.rs @@ -5,7 +5,13 @@ use anyhow::{Context as _, Result}; use futures::channel::oneshot; use gpui::{AppContext as _, AsyncApp, Entity, Task, WeakEntity}; use project::Project; -use std::{cell::RefCell, error::Error, fmt, path::Path, rc::Rc}; +use std::{ + cell::{Ref, RefCell}, + error::Error, + fmt, + path::Path, + rc::Rc, +}; use ui::App; use crate::{AcpThread, AgentConnection}; @@ -364,6 +370,7 @@ pub struct OldAcpAgentConnection { pub name: &'static str, pub connection: acp_old::AgentConnection, pub child_status: Task>, + pub agent_state: Rc>, } impl AgentConnection for OldAcpAgentConnection { @@ -397,7 +404,11 @@ impl AgentConnection for OldAcpAgentConnection { }) } - fn authenticate(&self, cx: &mut App) -> Task> { + fn state(&self) -> Ref<'_, acp::AgentState> { + self.agent_state.borrow() + } + + fn authenticate(&self, _method_id: acp::AuthMethodId, cx: &mut App) -> Task> { let task = self .connection .request_any(acp_old::AuthenticateParams.into_any()); diff --git a/crates/agent_servers/src/acp_connection.rs b/crates/agent_servers/src/acp_connection.rs index 9139d62c38..96067fe520 100644 --- a/crates/agent_servers/src/acp_connection.rs +++ b/crates/agent_servers/src/acp_connection.rs @@ -7,10 +7,10 @@ use context_server::{ContextServer, ContextServerCommand, ContextServerId}; use futures::channel::{mpsc, oneshot}; use project::Project; use smol::stream::StreamExt as _; -use std::cell::RefCell; +use std::cell::{Ref, RefCell}; use std::rc::Rc; use std::{path::Path, sync::Arc}; -use util::ResultExt; +use util::{ResultExt, TryFutureExt}; use anyhow::{Context, Result}; use gpui::{App, AppContext as _, AsyncApp, Entity, Task, WeakEntity}; @@ -20,10 +20,12 @@ use crate::{AgentServerCommand, mcp_server}; use acp_thread::{AcpThread, AgentConnection}; pub struct AcpConnection { + agent_state: Rc>, server_name: &'static str, client: Arc, sessions: Rc>>, - _notification_handler_task: Task<()>, + _agent_state_task: Task<()>, + _session_update_task: Task<()>, } impl AcpConnection { @@ -43,29 +45,55 @@ impl AcpConnection { .into(); ContextServer::start(client.clone(), cx).await?; - let (notification_tx, mut notification_rx) = mpsc::unbounded(); - client - .client() - .context("Failed to subscribe")? - .on_notification(acp::AGENT_METHODS.session_update, { - move |notification, _cx| { - let notification_tx = notification_tx.clone(); - log::trace!( - "ACP Notification: {}", - serde_json::to_string_pretty(¬ification).unwrap() - ); + let (mut state_tx, mut state_rx) = watch::channel(acp::AgentState::default()); + let mcp_client = client.client().context("Failed to subscribe")?; - if let Some(notification) = - serde_json::from_value::(notification).log_err() - { - notification_tx.unbounded_send(notification).ok(); - } + mcp_client.on_notification(acp::AGENT_METHODS.agent_state, { + move |notification, _cx| { + log::trace!( + "ACP Notification: {}", + serde_json::to_string_pretty(¬ification).unwrap() + ); + + if let Some(state) = + serde_json::from_value::(notification).log_err() + { + state_tx.send(state).log_err(); } - }); + } + }); + + let (notification_tx, mut notification_rx) = mpsc::unbounded(); + mcp_client.on_notification(acp::AGENT_METHODS.session_update, { + move |notification, _cx| { + let notification_tx = notification_tx.clone(); + log::trace!( + "ACP Notification: {}", + serde_json::to_string_pretty(¬ification).unwrap() + ); + + if let Some(notification) = + serde_json::from_value::(notification).log_err() + { + notification_tx.unbounded_send(notification).ok(); + } + } + }); let sessions = Rc::new(RefCell::new(HashMap::default())); + let initial_state = state_rx.recv().await?; + let agent_state = Rc::new(RefCell::new(initial_state)); - let notification_handler_task = cx.spawn({ + let agent_state_task = cx.foreground_executor().spawn({ + let agent_state = agent_state.clone(); + async move { + while let Some(state) = state_rx.recv().log_err().await { + agent_state.replace(state); + } + } + }); + + let session_update_handler_task = cx.spawn({ let sessions = sessions.clone(); async move |cx| { while let Some(notification) = notification_rx.next().await { @@ -78,7 +106,9 @@ impl AcpConnection { server_name, client, sessions, - _notification_handler_task: notification_handler_task, + agent_state, + _agent_state_task: agent_state_task, + _session_update_task: session_update_handler_task, }) } @@ -185,8 +215,30 @@ impl AgentConnection for AcpConnection { }) } - fn authenticate(&self, _cx: &mut App) -> Task> { - Task::ready(Err(anyhow!("Authentication not supported"))) + fn state(&self) -> Ref<'_, acp::AgentState> { + self.agent_state.borrow() + } + + fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task> { + let client = self.client.client(); + cx.foreground_executor().spawn(async move { + let params = acp::AuthenticateArguments { method_id }; + + let response = client + .context("MCP server is not initialized yet")? + .request::(context_server::types::CallToolParams { + name: acp::AGENT_METHODS.authenticate.into(), + arguments: Some(serde_json::to_value(params)?), + meta: None, + }) + .await?; + + if response.is_error.unwrap_or_default() { + Err(anyhow!(response.text_contents())) + } else { + Ok(()) + } + }) } fn prompt( diff --git a/crates/agent_servers/src/claude.rs b/crates/agent_servers/src/claude.rs index 590da69cd8..0f49403a0b 100644 --- a/crates/agent_servers/src/claude.rs +++ b/crates/agent_servers/src/claude.rs @@ -6,7 +6,7 @@ use context_server::listener::McpServerTool; use project::Project; use settings::SettingsStore; use smol::process::Child; -use std::cell::RefCell; +use std::cell::{Ref, RefCell}; use std::fmt::Display; use std::path::Path; use std::rc::Rc; @@ -58,6 +58,7 @@ impl AgentServer for ClaudeCode { _cx: &mut App, ) -> Task>> { let connection = ClaudeAgentConnection { + agent_state: Default::default(), sessions: Default::default(), }; @@ -66,6 +67,7 @@ impl AgentServer for ClaudeCode { } struct ClaudeAgentConnection { + agent_state: Rc>, sessions: Rc>>, } @@ -183,7 +185,11 @@ impl AgentConnection for ClaudeAgentConnection { }) } - fn authenticate(&self, _cx: &mut App) -> Task> { + fn state(&self) -> Ref<'_, acp::AgentState> { + self.agent_state.borrow() + } + + fn authenticate(&self, _: acp::AuthMethodId, _cx: &mut App) -> Task> { Task::ready(Err(anyhow!("Authentication not supported"))) } diff --git a/crates/agent_ui/src/acp/thread_view.rs b/crates/agent_ui/src/acp/thread_view.rs index e46e1ae3ab..824748a0aa 100644 --- a/crates/agent_ui/src/acp/thread_view.rs +++ b/crates/agent_ui/src/acp/thread_view.rs @@ -216,6 +216,15 @@ impl AcpThreadView { } }; + if connection.state().needs_authentication { + this.update(cx, |this, cx| { + this.thread_state = ThreadState::Unauthenticated { connection }; + cx.notify(); + }) + .ok(); + return; + } + let result = match connection .clone() .new_thread(project.clone(), &root_dir, cx) @@ -223,6 +232,7 @@ impl AcpThreadView { { Err(e) => { let mut cx = cx.clone(); + // todo! remove duplication if e.downcast_ref::().is_some() { this.update(&mut cx, |this, cx| { this.thread_state = ThreadState::Unauthenticated { connection }; @@ -640,13 +650,18 @@ impl AcpThreadView { Some(entry.diffs().map(|diff| diff.multibuffer.clone())) } - fn authenticate(&mut self, window: &mut Window, cx: &mut Context) { + fn authenticate( + &mut self, + method: acp::AuthMethodId, + window: &mut Window, + cx: &mut Context, + ) { let ThreadState::Unauthenticated { ref connection } = self.thread_state else { return; }; self.last_error.take(); - let authenticate = connection.authenticate(cx); + let authenticate = connection.authenticate(method, cx); self.auth_task = Some(cx.spawn_in(window, { let project = self.project.clone(); let agent = self.agent.clone(); @@ -2197,22 +2212,26 @@ impl Render for AcpThreadView { .on_action(cx.listener(Self::next_history_message)) .on_action(cx.listener(Self::open_agent_diff)) .child(match &self.thread_state { - ThreadState::Unauthenticated { .. } => { - v_flex() - .p_2() - .flex_1() - .items_center() - .justify_center() - .child(self.render_pending_auth_state()) - .child( - h_flex().mt_1p5().justify_center().child( - Button::new("sign-in", format!("Sign in to {}", self.agent.name())) - .on_click(cx.listener(|this, _, window, cx| { - this.authenticate(window, cx) - })), - ), - ) - } + ThreadState::Unauthenticated { connection } => v_flex() + .p_2() + .flex_1() + .items_center() + .justify_center() + .child(self.render_pending_auth_state()) + .child(h_flex().mt_1p5().justify_center().children( + connection.state().auth_methods.iter().map(|method| { + Button::new( + SharedString::from(method.id.0.clone()), + method.label.clone(), + ) + .on_click({ + let method_id = method.id.clone(); + cx.listener(move |this, _, window, cx| { + this.authenticate(method_id.clone(), window, cx) + }) + }) + }), + )), ThreadState::Loading { .. } => v_flex().flex_1().child(self.render_empty_state(cx)), ThreadState::LoadError(e) => v_flex() .p_2() From 81c111510f43241cef933567fc2905051dfc5fa3 Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Wed, 30 Jul 2025 15:48:40 +0200 Subject: [PATCH 07/13] Refactor handling of ContextServer notifications The notification handler registration is now more explicit, with handlers set up before server initialization to avoid potential race conditions. --- crates/agent_servers/src/acp_connection.rs | 85 +++++++++++---------- crates/context_server/src/client.rs | 14 ++-- crates/context_server/src/context_server.rs | 27 ++++++- crates/context_server/src/protocol.rs | 9 ++- 4 files changed, 79 insertions(+), 56 deletions(-) diff --git a/crates/agent_servers/src/acp_connection.rs b/crates/agent_servers/src/acp_connection.rs index 95c09e2c52..5883f6ac45 100644 --- a/crates/agent_servers/src/acp_connection.rs +++ b/crates/agent_servers/src/acp_connection.rs @@ -22,7 +22,7 @@ use acp_thread::{AcpThread, AgentConnection}; pub struct AcpConnection { agent_state: Rc>, server_name: &'static str, - client: Arc, + context_server: Arc, sessions: Rc>>, _agent_state_task: Task<()>, _session_update_task: Task<()>, @@ -35,7 +35,7 @@ impl AcpConnection { working_directory: Option>, cx: &mut AsyncApp, ) -> Result { - let client: Arc = ContextServer::stdio( + let context_server: Arc = ContextServer::stdio( ContextServerId(format!("{}-mcp-server", server_name).into()), ContextServerCommand { path: command.path, @@ -45,42 +45,9 @@ impl AcpConnection { working_directory, ) .into(); - ContextServer::start(client.clone(), cx).await?; let (mut state_tx, mut state_rx) = watch::channel(acp::AgentState::default()); - let mcp_client = client.client().context("Failed to subscribe")?; - - mcp_client.on_notification(acp::AGENT_METHODS.agent_state, { - move |notification, _cx| { - log::trace!( - "ACP Notification: {}", - serde_json::to_string_pretty(¬ification).unwrap() - ); - - if let Some(state) = - serde_json::from_value::(notification).log_err() - { - state_tx.send(state).log_err(); - } - } - }); - let (notification_tx, mut notification_rx) = mpsc::unbounded(); - mcp_client.on_notification(acp::AGENT_METHODS.session_update, { - move |notification, _cx| { - let notification_tx = notification_tx.clone(); - log::trace!( - "ACP Notification: {}", - serde_json::to_string_pretty(¬ification).unwrap() - ); - - if let Some(notification) = - serde_json::from_value::(notification).log_err() - { - notification_tx.unbounded_send(notification).ok(); - } - } - }); let sessions = Rc::new(RefCell::new(HashMap::default())); let initial_state = state_rx.recv().await?; @@ -104,9 +71,47 @@ impl AcpConnection { } }); + context_server + .start_with_handlers( + vec![ + (acp::AGENT_METHODS.agent_state, { + Box::new(move |notification, _cx| { + log::trace!( + "ACP Notification: {}", + serde_json::to_string_pretty(¬ification).unwrap() + ); + + if let Some(state) = + serde_json::from_value::(notification).log_err() + { + state_tx.send(state).log_err(); + } + }) + }), + (acp::AGENT_METHODS.session_update, { + Box::new(move |notification, _cx| { + let notification_tx = notification_tx.clone(); + log::trace!( + "ACP Notification: {}", + serde_json::to_string_pretty(¬ification).unwrap() + ); + + if let Some(notification) = + serde_json::from_value::(notification) + .log_err() + { + notification_tx.unbounded_send(notification).ok(); + } + }) + }), + ], + cx, + ) + .await?; + Ok(Self { server_name, - client, + context_server, sessions, agent_state, _agent_state_task: agent_state_task, @@ -152,7 +157,7 @@ impl AgentConnection for AcpConnection { cwd: &Path, cx: &mut AsyncApp, ) -> Task>> { - let client = self.client.client(); + let client = self.context_server.client(); let sessions = self.sessions.clone(); let cwd = cwd.to_path_buf(); cx.spawn(async move |cx| { @@ -222,7 +227,7 @@ impl AgentConnection for AcpConnection { } fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task> { - let client = self.client.client(); + let client = self.context_server.client(); cx.foreground_executor().spawn(async move { let params = acp::AuthenticateArguments { method_id }; @@ -248,7 +253,7 @@ impl AgentConnection for AcpConnection { params: agent_client_protocol::PromptArguments, cx: &mut App, ) -> Task> { - let client = self.client.client(); + let client = self.context_server.client(); let sessions = self.sessions.clone(); cx.foreground_executor().spawn(async move { @@ -305,6 +310,6 @@ impl AgentConnection for AcpConnection { impl Drop for AcpConnection { fn drop(&mut self) { - self.client.stop().log_err(); + self.context_server.stop().log_err(); } } diff --git a/crates/context_server/src/client.rs b/crates/context_server/src/client.rs index 1eb29bbbf9..65283afa87 100644 --- a/crates/context_server/src/client.rs +++ b/crates/context_server/src/client.rs @@ -441,14 +441,12 @@ impl Client { Ok(()) } - #[allow(unused)] - pub fn on_notification(&self, method: &'static str, f: F) - where - F: 'static + Send + FnMut(Value, AsyncApp), - { - self.notification_handlers - .lock() - .insert(method, Box::new(f)); + pub fn on_notification( + &self, + method: &'static str, + f: Box, + ) { + self.notification_handlers.lock().insert(method, f); } } diff --git a/crates/context_server/src/context_server.rs b/crates/context_server/src/context_server.rs index e76e7972f7..34fa29678d 100644 --- a/crates/context_server/src/context_server.rs +++ b/crates/context_server/src/context_server.rs @@ -95,8 +95,28 @@ impl ContextServer { self.client.read().clone() } - pub async fn start(self: Arc, cx: &AsyncApp) -> Result<()> { - let client = match &self.configuration { + pub async fn start(&self, cx: &AsyncApp) -> Result<()> { + self.initialize(self.new_client(cx)?).await + } + + /// Starts the context server, making sure handlers are registered before initialization happens + pub async fn start_with_handlers( + &self, + notification_handlers: Vec<( + &'static str, + Box, + )>, + cx: &AsyncApp, + ) -> Result<()> { + let client = self.new_client(cx)?; + for (method, handler) in notification_handlers { + client.on_notification(method, handler); + } + self.initialize(client).await + } + + fn new_client(&self, cx: &AsyncApp) -> Result { + Ok(match &self.configuration { ContextServerTransport::Stdio(command, working_directory) => Client::stdio( client::ContextServerId(self.id.0.clone()), client::ModelContextServerBinary { @@ -113,8 +133,7 @@ impl ContextServer { transport.clone(), cx.clone(), )?, - }; - self.initialize(client).await + }) } async fn initialize(&self, client: Client) -> Result<()> { diff --git a/crates/context_server/src/protocol.rs b/crates/context_server/src/protocol.rs index 9ccbc8a553..5355f20f62 100644 --- a/crates/context_server/src/protocol.rs +++ b/crates/context_server/src/protocol.rs @@ -115,10 +115,11 @@ impl InitializedContextServerProtocol { self.inner.notify(T::METHOD, params) } - pub fn on_notification(&self, method: &'static str, f: F) - where - F: 'static + Send + FnMut(Value, AsyncApp), - { + pub fn on_notification( + &self, + method: &'static str, + f: Box, + ) { self.inner.on_notification(method, f); } } From 738296345eaa880ebda8adf247e9974c7a20c880 Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Wed, 30 Jul 2025 11:46:11 -0300 Subject: [PATCH 08/13] Inline tool schemas --- crates/context_server/src/listener.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/context_server/src/listener.rs b/crates/context_server/src/listener.rs index 34e3a9a78c..0e85fb2129 100644 --- a/crates/context_server/src/listener.rs +++ b/crates/context_server/src/listener.rs @@ -83,14 +83,18 @@ impl McpServer { } pub fn add_tool(&mut self, tool: T) { - let output_schema = schemars::schema_for!(T::Output); - let unit_schema = schemars::schema_for!(()); + let mut settings = schemars::generate::SchemaSettings::draft07(); + settings.inline_subschemas = true; + let mut generator = settings.into_generator(); + + let output_schema = generator.root_schema_for::(); + let unit_schema = generator.root_schema_for::(); let registered_tool = RegisteredTool { tool: Tool { name: T::NAME.into(), description: Some(tool.description().into()), - input_schema: schemars::schema_for!(T::Input).into(), + input_schema: generator.root_schema_for::().into(), output_schema: if output_schema == unit_schema { None } else { From 27708143ecac70b963bbe70153d426fe5f061277 Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Wed, 30 Jul 2025 13:30:50 -0300 Subject: [PATCH 09/13] Fix auth --- crates/acp_thread/src/acp_thread.rs | 1 - crates/acp_thread/src/connection.rs | 14 +++++- crates/acp_thread/src/old_acp_support.rs | 31 ++++--------- crates/agent_servers/src/acp_connection.rs | 54 +++++++--------------- crates/agent_servers/src/claude.rs | 8 ++-- crates/agent_ui/src/acp/thread_view.rs | 28 ++++------- 6 files changed, 48 insertions(+), 88 deletions(-) diff --git a/crates/acp_thread/src/acp_thread.rs b/crates/acp_thread/src/acp_thread.rs index 8aa07da330..bc2f8c756a 100644 --- a/crates/acp_thread/src/acp_thread.rs +++ b/crates/acp_thread/src/acp_thread.rs @@ -1595,7 +1595,6 @@ mod tests { connection, child_status: io_task, current_thread: thread_rc, - agent_state: Default::default(), }; AcpThread::new( diff --git a/crates/acp_thread/src/connection.rs b/crates/acp_thread/src/connection.rs index 2e7deaf7df..11f1fcc94c 100644 --- a/crates/acp_thread/src/connection.rs +++ b/crates/acp_thread/src/connection.rs @@ -1,4 +1,4 @@ -use std::{cell::Ref, path::Path, rc::Rc}; +use std::{error::Error, fmt, path::Path, rc::Rc}; use agent_client_protocol::{self as acp}; use anyhow::Result; @@ -16,7 +16,7 @@ pub trait AgentConnection { cx: &mut AsyncApp, ) -> Task>>; - fn state(&self) -> Ref<'_, acp::AgentState>; + fn auth_methods(&self) -> Vec; fn authenticate(&self, method: acp::AuthMethodId, cx: &mut App) -> Task>; @@ -24,3 +24,13 @@ pub trait AgentConnection { fn cancel(&self, session_id: &acp::SessionId, cx: &mut App); } + +#[derive(Debug)] +pub struct AuthRequired; + +impl Error for AuthRequired {} +impl fmt::Display for AuthRequired { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "AuthRequired") + } +} diff --git a/crates/acp_thread/src/old_acp_support.rs b/crates/acp_thread/src/old_acp_support.rs index 718ad0da03..88313e0fd5 100644 --- a/crates/acp_thread/src/old_acp_support.rs +++ b/crates/acp_thread/src/old_acp_support.rs @@ -5,17 +5,11 @@ use anyhow::{Context as _, Result}; use futures::channel::oneshot; use gpui::{AppContext as _, AsyncApp, Entity, Task, WeakEntity}; use project::Project; -use std::{ - cell::{Ref, RefCell}, - error::Error, - fmt, - path::Path, - rc::Rc, -}; +use std::{cell::RefCell, path::Path, rc::Rc}; use ui::App; use util::ResultExt as _; -use crate::{AcpThread, AgentConnection}; +use crate::{AcpThread, AgentConnection, AuthRequired}; #[derive(Clone)] pub struct OldAcpClientDelegate { @@ -357,21 +351,10 @@ fn into_new_plan_status(status: acp_old::PlanEntryStatus) -> acp::PlanEntryStatu } } -#[derive(Debug)] -pub struct Unauthenticated; - -impl Error for Unauthenticated {} -impl fmt::Display for Unauthenticated { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Unauthenticated") - } -} - pub struct OldAcpAgentConnection { pub name: &'static str, pub connection: acp_old::AgentConnection, pub child_status: Task>, - pub agent_state: Rc>, pub current_thread: Rc>>, } @@ -394,7 +377,7 @@ impl AgentConnection for OldAcpAgentConnection { let result = acp_old::InitializeParams::response_from_any(result)?; if !result.is_authenticated { - anyhow::bail!(Unauthenticated) + anyhow::bail!(AuthRequired) } cx.update(|cx| { @@ -408,8 +391,12 @@ impl AgentConnection for OldAcpAgentConnection { }) } - fn state(&self) -> Ref<'_, acp::AgentState> { - self.agent_state.borrow() + fn auth_methods(&self) -> Vec { + vec![acp::AuthMethod { + id: acp::AuthMethodId("acp-old-no-id".into()), + label: "Log in".into(), + description: None, + }] } fn authenticate(&self, _method_id: acp::AuthMethodId, cx: &mut App) -> Task> { diff --git a/crates/agent_servers/src/acp_connection.rs b/crates/agent_servers/src/acp_connection.rs index 95c09e2c52..c19a145196 100644 --- a/crates/agent_servers/src/acp_connection.rs +++ b/crates/agent_servers/src/acp_connection.rs @@ -7,24 +7,23 @@ use context_server::{ContextServer, ContextServerCommand, ContextServerId}; use futures::channel::{mpsc, oneshot}; use project::Project; use smol::stream::StreamExt as _; -use std::cell::{Ref, RefCell}; +use std::cell::RefCell; use std::rc::Rc; use std::{path::Path, sync::Arc}; -use util::{ResultExt, TryFutureExt}; +use util::ResultExt; use anyhow::{Context, Result}; use gpui::{App, AppContext as _, AsyncApp, Entity, Task, WeakEntity}; use crate::mcp_server::ZedMcpServer; use crate::{AgentServerCommand, mcp_server}; -use acp_thread::{AcpThread, AgentConnection}; +use acp_thread::{AcpThread, AgentConnection, AuthRequired}; pub struct AcpConnection { - agent_state: Rc>, + auth_methods: Rc>>, server_name: &'static str, client: Arc, sessions: Rc>>, - _agent_state_task: Task<()>, _session_update_task: Task<()>, } @@ -47,24 +46,8 @@ impl AcpConnection { .into(); ContextServer::start(client.clone(), cx).await?; - let (mut state_tx, mut state_rx) = watch::channel(acp::AgentState::default()); let mcp_client = client.client().context("Failed to subscribe")?; - mcp_client.on_notification(acp::AGENT_METHODS.agent_state, { - move |notification, _cx| { - log::trace!( - "ACP Notification: {}", - serde_json::to_string_pretty(¬ification).unwrap() - ); - - if let Some(state) = - serde_json::from_value::(notification).log_err() - { - state_tx.send(state).log_err(); - } - } - }); - let (notification_tx, mut notification_rx) = mpsc::unbounded(); mcp_client.on_notification(acp::AGENT_METHODS.session_update, { move |notification, _cx| { @@ -83,17 +66,6 @@ impl AcpConnection { }); let sessions = Rc::new(RefCell::new(HashMap::default())); - let initial_state = state_rx.recv().await?; - let agent_state = Rc::new(RefCell::new(initial_state)); - - let agent_state_task = cx.foreground_executor().spawn({ - let agent_state = agent_state.clone(); - async move { - while let Some(state) = state_rx.recv().log_err().await { - agent_state.replace(state); - } - } - }); let session_update_handler_task = cx.spawn({ let sessions = sessions.clone(); @@ -105,11 +77,10 @@ impl AcpConnection { }); Ok(Self { + auth_methods: Default::default(), server_name, client, sessions, - agent_state, - _agent_state_task: agent_state_task, _session_update_task: session_update_handler_task, }) } @@ -154,6 +125,7 @@ impl AgentConnection for AcpConnection { ) -> Task>> { let client = self.client.client(); let sessions = self.sessions.clone(); + let auth_methods = self.auth_methods.clone(); let cwd = cwd.to_path_buf(); cx.spawn(async move |cx| { let client = client.context("MCP server is not initialized yet")?; @@ -194,12 +166,18 @@ impl AgentConnection for AcpConnection { response.structured_content.context("Empty response")?, )?; + auth_methods.replace(result.auth_methods); + + let Some(session_id) = result.session_id else { + anyhow::bail!(AuthRequired); + }; + let thread = cx.new(|cx| { AcpThread::new( self.server_name, self.clone(), project, - result.session_id.clone(), + session_id.clone(), cx, ) })?; @@ -211,14 +189,14 @@ impl AgentConnection for AcpConnection { cancel_tx: None, _mcp_server: mcp_server, }; - sessions.borrow_mut().insert(result.session_id, session); + sessions.borrow_mut().insert(session_id, session); Ok(thread) }) } - fn state(&self) -> Ref<'_, acp::AgentState> { - self.agent_state.borrow() + fn auth_methods(&self) -> Vec { + self.auth_methods.borrow().clone() } fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task> { diff --git a/crates/agent_servers/src/claude.rs b/crates/agent_servers/src/claude.rs index 0f49403a0b..736fdd2726 100644 --- a/crates/agent_servers/src/claude.rs +++ b/crates/agent_servers/src/claude.rs @@ -6,7 +6,7 @@ use context_server::listener::McpServerTool; use project::Project; use settings::SettingsStore; use smol::process::Child; -use std::cell::{Ref, RefCell}; +use std::cell::RefCell; use std::fmt::Display; use std::path::Path; use std::rc::Rc; @@ -58,7 +58,6 @@ impl AgentServer for ClaudeCode { _cx: &mut App, ) -> Task>> { let connection = ClaudeAgentConnection { - agent_state: Default::default(), sessions: Default::default(), }; @@ -67,7 +66,6 @@ impl AgentServer for ClaudeCode { } struct ClaudeAgentConnection { - agent_state: Rc>, sessions: Rc>>, } @@ -185,8 +183,8 @@ impl AgentConnection for ClaudeAgentConnection { }) } - fn state(&self) -> Ref<'_, acp::AgentState> { - self.agent_state.borrow() + fn auth_methods(&self) -> Vec { + vec![] } fn authenticate(&self, _: acp::AuthMethodId, _cx: &mut App) -> Task> { diff --git a/crates/agent_ui/src/acp/thread_view.rs b/crates/agent_ui/src/acp/thread_view.rs index 824748a0aa..6d7684bbfc 100644 --- a/crates/agent_ui/src/acp/thread_view.rs +++ b/crates/agent_ui/src/acp/thread_view.rs @@ -216,15 +216,6 @@ impl AcpThreadView { } }; - if connection.state().needs_authentication { - this.update(cx, |this, cx| { - this.thread_state = ThreadState::Unauthenticated { connection }; - cx.notify(); - }) - .ok(); - return; - } - let result = match connection .clone() .new_thread(project.clone(), &root_dir, cx) @@ -233,7 +224,7 @@ impl AcpThreadView { Err(e) => { let mut cx = cx.clone(); // todo! remove duplication - if e.downcast_ref::().is_some() { + if e.downcast_ref::().is_some() { this.update(&mut cx, |this, cx| { this.thread_state = ThreadState::Unauthenticated { connection }; cx.notify(); @@ -2219,17 +2210,14 @@ impl Render for AcpThreadView { .justify_center() .child(self.render_pending_auth_state()) .child(h_flex().mt_1p5().justify_center().children( - connection.state().auth_methods.iter().map(|method| { - Button::new( - SharedString::from(method.id.0.clone()), - method.label.clone(), - ) - .on_click({ - let method_id = method.id.clone(); - cx.listener(move |this, _, window, cx| { - this.authenticate(method_id.clone(), window, cx) + connection.auth_methods().into_iter().map(|method| { + Button::new(SharedString::from(method.id.0.clone()), method.label) + .on_click({ + let method_id = method.id.clone(); + cx.listener(move |this, _, window, cx| { + this.authenticate(method_id.clone(), window, cx) + }) }) - }) }), )), ThreadState::Loading { .. } => v_flex().flex_1().child(self.render_empty_state(cx)), From 02d3043ec560a7e7bdc1df47efc70c4a2e4fb378 Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Wed, 30 Jul 2025 14:28:01 -0300 Subject: [PATCH 10/13] Rename arg to experimental-mcp --- crates/agent_servers/src/gemini.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/agent_servers/src/gemini.rs b/crates/agent_servers/src/gemini.rs index 9b7fde42bf..77e7d1063f 100644 --- a/crates/agent_servers/src/gemini.rs +++ b/crates/agent_servers/src/gemini.rs @@ -14,7 +14,7 @@ use crate::AllAgentServersSettings; #[derive(Clone)] pub struct Gemini; -const ACP_ARG: &str = "--experimental-acp"; +const MCP_ARG: &str = "--experimental-mcp"; impl AgentServer for Gemini { fn name(&self) -> &'static str { @@ -48,7 +48,7 @@ impl AgentServer for Gemini { })?; let Some(command) = - AgentServerCommand::resolve("gemini", &[ACP_ARG], settings, &project, cx).await + AgentServerCommand::resolve("gemini", &[MCP_ARG], settings, &project, cx).await else { anyhow::bail!("Failed to find gemini binary"); }; From 8563ed22521b98224bdc7b8d1c295a490b5f532f Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Fri, 1 Aug 2025 17:07:50 -0300 Subject: [PATCH 11/13] Compiling --- Cargo.lock | 16 +- crates/acp_thread/src/acp_thread.rs | 7 +- crates/acp_thread/src/connection.rs | 4 +- crates/acp_thread/src/old_acp_support.rs | 11 +- crates/agent_servers/src/acp_connection.rs | 348 +++++++++------------ crates/agent_servers/src/agent_servers.rs | 4 - crates/agent_servers/src/claude.rs | 6 +- crates/agent_servers/src/codex.rs | 78 ----- crates/agent_servers/src/e2e_tests.rs | 3 - crates/agent_servers/src/gemini.rs | 10 +- crates/agent_servers/src/mcp_server.rs | 208 ------------ crates/agent_servers/src/settings.rs | 11 +- crates/agent_ui/src/acp/thread_view.rs | 15 +- crates/agent_ui/src/agent_panel.rs | 33 -- crates/agent_ui/src/agent_ui.rs | 2 - 15 files changed, 188 insertions(+), 568 deletions(-) delete mode 100644 crates/agent_servers/src/codex.rs delete mode 100644 crates/agent_servers/src/mcp_server.rs diff --git a/Cargo.lock b/Cargo.lock index f4c328c957..f31ecdef99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -140,6 +140,10 @@ dependencies = [ name = "agent-client-protocol" version = "0.0.13" dependencies = [ + "anyhow", + "futures 0.3.31", + "log", + "parking_lot", "schemars", "serde", "serde_json", @@ -9624,9 +9628,9 @@ dependencies = [ [[package]] name = "lock_api" -version = "0.4.12" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +checksum = "96936507f153605bddfcda068dd804796c84324ed2510809e5b2a624c81da765" dependencies = [ "autocfg", "scopeguard", @@ -11315,9 +11319,9 @@ checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" [[package]] name = "parking_lot" -version = "0.12.3" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13" dependencies = [ "lock_api", "parking_lot_core", @@ -11325,9 +11329,9 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.10" +version = "0.9.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5" dependencies = [ "cfg-if", "libc", diff --git a/crates/acp_thread/src/acp_thread.rs b/crates/acp_thread/src/acp_thread.rs index bc2f8c756a..3bf6134862 100644 --- a/crates/acp_thread/src/acp_thread.rs +++ b/crates/acp_thread/src/acp_thread.rs @@ -999,7 +999,7 @@ impl AcpThread { let result = this .update(cx, |this, cx| { this.connection.prompt( - acp::PromptArguments { + acp::PromptRequest { prompt: message, session_id: this.session_id.clone(), }, @@ -1595,6 +1595,11 @@ mod tests { connection, child_status: io_task, current_thread: thread_rc, + auth_methods: [acp::AuthMethod { + id: acp::AuthMethodId("acp-old-no-id".into()), + label: "Log in".into(), + description: None, + }], }; AcpThread::new( diff --git a/crates/acp_thread/src/connection.rs b/crates/acp_thread/src/connection.rs index 11f1fcc94c..929500a67b 100644 --- a/crates/acp_thread/src/connection.rs +++ b/crates/acp_thread/src/connection.rs @@ -16,11 +16,11 @@ pub trait AgentConnection { cx: &mut AsyncApp, ) -> Task>>; - fn auth_methods(&self) -> Vec; + fn auth_methods(&self) -> &[acp::AuthMethod]; fn authenticate(&self, method: acp::AuthMethodId, cx: &mut App) -> Task>; - fn prompt(&self, params: acp::PromptArguments, cx: &mut App) -> Task>; + fn prompt(&self, params: acp::PromptRequest, cx: &mut App) -> Task>; fn cancel(&self, session_id: &acp::SessionId, cx: &mut App); } diff --git a/crates/acp_thread/src/old_acp_support.rs b/crates/acp_thread/src/old_acp_support.rs index 88313e0fd5..adb27e21c4 100644 --- a/crates/acp_thread/src/old_acp_support.rs +++ b/crates/acp_thread/src/old_acp_support.rs @@ -356,6 +356,7 @@ pub struct OldAcpAgentConnection { pub connection: acp_old::AgentConnection, pub child_status: Task>, pub current_thread: Rc>>, + pub auth_methods: [acp::AuthMethod; 1], } impl AgentConnection for OldAcpAgentConnection { @@ -391,12 +392,8 @@ impl AgentConnection for OldAcpAgentConnection { }) } - fn auth_methods(&self) -> Vec { - vec![acp::AuthMethod { - id: acp::AuthMethodId("acp-old-no-id".into()), - label: "Log in".into(), - description: None, - }] + fn auth_methods(&self) -> &[acp::AuthMethod] { + &self.auth_methods } fn authenticate(&self, _method_id: acp::AuthMethodId, cx: &mut App) -> Task> { @@ -409,7 +406,7 @@ impl AgentConnection for OldAcpAgentConnection { }) } - fn prompt(&self, params: acp::PromptArguments, cx: &mut App) -> Task> { + fn prompt(&self, params: acp::PromptRequest, cx: &mut App) -> Task> { let chunks = params .prompt .into_iter() diff --git a/crates/agent_servers/src/acp_connection.rs b/crates/agent_servers/src/acp_connection.rs index bfb4d8b40f..ca9ec2aea0 100644 --- a/crates/agent_servers/src/acp_connection.rs +++ b/crates/agent_servers/src/acp_connection.rs @@ -1,123 +1,87 @@ use agent_client_protocol as acp; -use anyhow::anyhow; use collections::HashMap; -use context_server::listener::McpServerTool; -use context_server::types::requests; -use context_server::{ContextServer, ContextServerCommand, ContextServerId}; -use futures::channel::{mpsc, oneshot}; +use futures::channel::oneshot; use project::Project; -use smol::stream::StreamExt as _; use std::cell::RefCell; +use std::path::Path; use std::rc::Rc; -use std::{path::Path, sync::Arc}; use util::ResultExt; -use anyhow::{Context, Result}; +use anyhow::{Context as _, Result}; use gpui::{App, AppContext as _, AsyncApp, Entity, Task, WeakEntity}; -use crate::mcp_server::ZedMcpServer; -use crate::{AgentServerCommand, mcp_server}; +use crate::AgentServerCommand; use acp_thread::{AcpThread, AgentConnection, AuthRequired}; pub struct AcpConnection { - auth_methods: Rc>>, server_name: &'static str, - context_server: Arc, + connection: Rc, sessions: Rc>>, - _session_update_task: Task<()>, + auth_methods: Vec, + _io_task: Task>, +} + +pub struct AcpSession { + thread: WeakEntity, } impl AcpConnection { pub async fn stdio( server_name: &'static str, command: AgentServerCommand, - working_directory: Option>, + root_dir: &Path, cx: &mut AsyncApp, ) -> Result { - let context_server: Arc = ContextServer::stdio( - ContextServerId(format!("{}-mcp-server", server_name).into()), - ContextServerCommand { - path: command.path, - args: command.args, - env: command.env, - }, - working_directory, - ) - .into(); + let mut child = util::command::new_smol_command(&command.path) + .args(command.args.iter().map(|arg| arg.as_str())) + .envs(command.env.iter().flatten()) + .current_dir(root_dir) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::inherit()) + .kill_on_drop(true) + .spawn()?; - let (notification_tx, mut notification_rx) = mpsc::unbounded(); + let stdout = child.stdout.take().expect("Failed to take stdout"); + let stdin = child.stdin.take().expect("Failed to take stdin"); let sessions = Rc::new(RefCell::new(HashMap::default())); - let session_update_handler_task = cx.spawn({ - let sessions = sessions.clone(); - async move |cx| { - while let Some(notification) = notification_rx.next().await { - Self::handle_session_notification(notification, sessions.clone(), cx) - } + let client = ClientDelegate { + sessions: sessions.clone(), + cx: cx.clone(), + }; + let (connection, io_task) = acp::AgentConnection::new(client, stdin, stdout, { + let foreground_executor = cx.foreground_executor().clone(); + move |fut| { + foreground_executor.spawn(fut).detach(); } }); - context_server - .start_with_handlers( - vec![(acp::AGENT_METHODS.session_update, { - Box::new(move |notification, _cx| { - let notification_tx = notification_tx.clone(); - log::trace!( - "ACP Notification: {}", - serde_json::to_string_pretty(¬ification).unwrap() - ); + let io_task = cx.background_spawn(io_task); - if let Some(notification) = - serde_json::from_value::(notification) - .log_err() - { - notification_tx.unbounded_send(notification).ok(); - } - }) - })], - cx, - ) + let response = connection + .initialize(acp::InitializeRequest { + protocol_version: acp::VERSION, + client_capabilities: acp::ClientCapabilities { + fs: acp::FileSystemCapability { + read_text_file: true, + write_text_file: true, + }, + }, + }) .await?; + // todo! check version + Ok(Self { - auth_methods: Default::default(), + auth_methods: response.auth_methods, + connection: connection.into(), server_name, - context_server, sessions, - _session_update_task: session_update_handler_task, + _io_task: io_task, }) } - - pub fn handle_session_notification( - notification: acp::SessionNotification, - threads: Rc>>, - cx: &mut AsyncApp, - ) { - let threads = threads.borrow(); - let Some(thread) = threads - .get(¬ification.session_id) - .and_then(|session| session.thread.upgrade()) - else { - log::error!( - "Thread not found for session ID: {}", - notification.session_id - ); - return; - }; - - thread - .update(cx, |thread, cx| { - thread.handle_session_update(notification.update, cx) - }) - .log_err(); - } -} - -pub struct AcpSession { - thread: WeakEntity, - cancel_tx: Option>, - _mcp_server: ZedMcpServer, } impl AgentConnection for AcpConnection { @@ -127,52 +91,19 @@ impl AgentConnection for AcpConnection { cwd: &Path, cx: &mut AsyncApp, ) -> Task>> { - let client = self.context_server.client(); + let conn = self.connection.clone(); let sessions = self.sessions.clone(); - let auth_methods = self.auth_methods.clone(); let cwd = cwd.to_path_buf(); cx.spawn(async move |cx| { - let client = client.context("MCP server is not initialized yet")?; - let (mut thread_tx, thread_rx) = watch::channel(WeakEntity::new_invalid()); - - let mcp_server = ZedMcpServer::new(thread_rx, cx).await?; - - let response = client - .request::(context_server::types::CallToolParams { - name: acp::AGENT_METHODS.new_session.into(), - arguments: Some(serde_json::to_value(acp::NewSessionArguments { - mcp_servers: vec![mcp_server.server_config()?], - client_tools: acp::ClientTools { - request_permission: Some(acp::McpToolId { - mcp_server: mcp_server::SERVER_NAME.into(), - tool_name: mcp_server::RequestPermissionTool::NAME.into(), - }), - read_text_file: Some(acp::McpToolId { - mcp_server: mcp_server::SERVER_NAME.into(), - tool_name: mcp_server::ReadTextFileTool::NAME.into(), - }), - write_text_file: Some(acp::McpToolId { - mcp_server: mcp_server::SERVER_NAME.into(), - tool_name: mcp_server::WriteTextFileTool::NAME.into(), - }), - }, - cwd, - })?), - meta: None, + let response = conn + .new_session(acp::NewSessionRequest { + // todo! Zed MCP server? + mcp_servers: vec![], + cwd, }) .await?; - if response.is_error.unwrap_or_default() { - return Err(anyhow!(response.text_contents())); - } - - let result = serde_json::from_value::( - response.structured_content.context("Empty response")?, - )?; - - auth_methods.replace(result.auth_methods); - - let Some(session_id) = result.session_id else { + let Some(session_id) = response.session_id else { anyhow::bail!(AuthRequired); }; @@ -186,12 +117,8 @@ impl AgentConnection for AcpConnection { ) })?; - thread_tx.send(thread.downgrade())?; - let session = AcpSession { thread: thread.downgrade(), - cancel_tx: None, - _mcp_server: mcp_server, }; sessions.borrow_mut().insert(session_id, session); @@ -199,94 +126,115 @@ impl AgentConnection for AcpConnection { }) } - fn auth_methods(&self) -> Vec { - self.auth_methods.borrow().clone() + fn auth_methods(&self) -> &[acp::AuthMethod] { + &self.auth_methods } fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task> { - let client = self.context_server.client(); + let conn = self.connection.clone(); cx.foreground_executor().spawn(async move { - let params = acp::AuthenticateArguments { method_id }; - - let response = client - .context("MCP server is not initialized yet")? - .request::(context_server::types::CallToolParams { - name: acp::AGENT_METHODS.authenticate.into(), - arguments: Some(serde_json::to_value(params)?), - meta: None, + let result = conn + .authenticate(acp::AuthenticateRequest { + method_id: method_id.clone(), }) .await?; - if response.is_error.unwrap_or_default() { - Err(anyhow!(response.text_contents())) - } else { - Ok(()) - } + Ok(result) }) } - fn prompt( + fn prompt(&self, params: acp::PromptRequest, cx: &mut App) -> Task> { + let conn = self.connection.clone(); + cx.foreground_executor() + .spawn(async move { Ok(conn.prompt(params).await?) }) + } + + fn cancel(&self, session_id: &acp::SessionId, _cx: &mut App) { + self.connection.cancel(session_id.clone()).log_err(); + } +} + +struct ClientDelegate { + sessions: Rc>>, + cx: AsyncApp, +} + +impl acp::Client for ClientDelegate { + async fn request_permission( &self, - params: agent_client_protocol::PromptArguments, - cx: &mut App, - ) -> Task> { - let client = self.context_server.client(); - let sessions = self.sessions.clone(); + arguments: acp::RequestPermissionRequest, + ) -> Result { + let cx = &mut self.cx.clone(); + let result = self + .sessions + .borrow() + .get(&arguments.session_id) + .context("Failed to get session")? + .thread + .update(cx, |thread, cx| { + thread.request_tool_call_permission(arguments.tool_call, arguments.options, cx) + })? + .await; - cx.foreground_executor().spawn(async move { - let client = client.context("MCP server is not initialized yet")?; + let outcome = match result { + Ok(option) => acp::RequestPermissionOutcome::Selected { option_id: option }, + Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled, + }; - let (new_cancel_tx, cancel_rx) = oneshot::channel(); - { - let mut sessions = sessions.borrow_mut(); - let session = sessions - .get_mut(¶ms.session_id) - .context("Session not found")?; - session.cancel_tx.replace(new_cancel_tx); - } - - let result = client - .request_with::( - context_server::types::CallToolParams { - name: acp::AGENT_METHODS.prompt.into(), - arguments: Some(serde_json::to_value(params)?), - meta: None, - }, - Some(cancel_rx), - None, - ) - .await; - - if let Err(err) = &result - && err.is::() - { - return Ok(()); - } - - let response = result?; - - if response.is_error.unwrap_or_default() { - return Err(anyhow!(response.text_contents())); - } - - Ok(()) - }) + Ok(acp::RequestPermissionResponse { outcome }) } - fn cancel(&self, session_id: &agent_client_protocol::SessionId, _cx: &mut App) { - let mut sessions = self.sessions.borrow_mut(); + async fn write_text_file( + &self, + arguments: acp::WriteTextFileRequest, + ) -> Result<(), acp::Error> { + let cx = &mut self.cx.clone(); + self.sessions + .borrow() + .get(&arguments.session_id) + .context("Failed to get session")? + .thread + .update(cx, |thread, cx| { + thread.write_text_file(arguments.path, arguments.content, cx) + })? + .await?; - if let Some(cancel_tx) = sessions - .get_mut(session_id) - .and_then(|session| session.cancel_tx.take()) - { - cancel_tx.send(()).ok(); - } - } -} - -impl Drop for AcpConnection { - fn drop(&mut self) { - self.context_server.stop().log_err(); + Ok(()) + } + + async fn read_text_file( + &self, + arguments: acp::ReadTextFileRequest, + ) -> Result { + let cx = &mut self.cx.clone(); + let content = self + .sessions + .borrow() + .get(&arguments.session_id) + .context("Failed to get session")? + .thread + .update(cx, |thread, cx| { + thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx) + })? + .await?; + + Ok(acp::ReadTextFileResponse { content }) + } + + async fn session_notification( + &self, + notification: acp::SessionNotification, + ) -> Result<(), acp::Error> { + let cx = &mut self.cx.clone(); + let sessions = self.sessions.borrow(); + let session = sessions + .get(¬ification.session_id) + .context("Failed to get session")?; + + session.thread.update(cx, |thread, cx| { + thread.handle_session_update(notification.update, cx) + })??; + + Ok(()) } } diff --git a/crates/agent_servers/src/agent_servers.rs b/crates/agent_servers/src/agent_servers.rs index 6a031a190e..13bad53cd9 100644 --- a/crates/agent_servers/src/agent_servers.rs +++ b/crates/agent_servers/src/agent_servers.rs @@ -1,15 +1,12 @@ mod acp_connection; mod claude; -mod codex; mod gemini; -mod mcp_server; mod settings; #[cfg(test)] mod e2e_tests; pub use claude::*; -pub use codex::*; pub use gemini::*; pub use settings::*; @@ -39,7 +36,6 @@ pub trait AgentServer: Send { fn connect( &self, - // these will go away when old_acp is fully removed root_dir: &Path, project: &Entity, cx: &mut App, diff --git a/crates/agent_servers/src/claude.rs b/crates/agent_servers/src/claude.rs index 736fdd2726..9040b83085 100644 --- a/crates/agent_servers/src/claude.rs +++ b/crates/agent_servers/src/claude.rs @@ -183,15 +183,15 @@ impl AgentConnection for ClaudeAgentConnection { }) } - fn auth_methods(&self) -> Vec { - vec![] + fn auth_methods(&self) -> &[acp::AuthMethod] { + &[] } fn authenticate(&self, _: acp::AuthMethodId, _cx: &mut App) -> Task> { Task::ready(Err(anyhow!("Authentication not supported"))) } - fn prompt(&self, params: acp::PromptArguments, cx: &mut App) -> Task> { + fn prompt(&self, params: acp::PromptRequest, cx: &mut App) -> Task> { let sessions = self.sessions.borrow(); let Some(session) = sessions.get(¶ms.session_id) else { return Task::ready(Err(anyhow!( diff --git a/crates/agent_servers/src/codex.rs b/crates/agent_servers/src/codex.rs deleted file mode 100644 index 3e774ed83e..0000000000 --- a/crates/agent_servers/src/codex.rs +++ /dev/null @@ -1,78 +0,0 @@ -use project::Project; -use settings::SettingsStore; -use std::path::Path; -use std::rc::Rc; - -use anyhow::Result; -use gpui::{App, Entity, Task}; - -use crate::acp_connection::AcpConnection; -use crate::{AgentServer, AgentServerCommand, AllAgentServersSettings}; -use acp_thread::AgentConnection; - -#[derive(Clone)] -pub struct Codex; - -impl AgentServer for Codex { - fn name(&self) -> &'static str { - "Codex" - } - - fn empty_state_headline(&self) -> &'static str { - "Welcome to Codex" - } - - fn empty_state_message(&self) -> &'static str { - "What can I help with?" - } - - fn logo(&self) -> ui::IconName { - ui::IconName::AiOpenAi - } - - fn connect( - &self, - _root_dir: &Path, - project: &Entity, - cx: &mut App, - ) -> Task>> { - let project = project.clone(); - let server_name = self.name(); - let working_directory = project.read(cx).active_project_directory(cx); - cx.spawn(async move |cx| { - let settings = cx.read_global(|settings: &SettingsStore, _| { - settings.get::(None).codex.clone() - })?; - - let Some(command) = - AgentServerCommand::resolve("codex", &["mcp"], settings, &project, cx).await - else { - anyhow::bail!("Failed to find codex binary"); - }; - // todo! check supported version - - let conn = AcpConnection::stdio(server_name, command, working_directory, cx).await?; - Ok(Rc::new(conn) as _) - }) - } -} - -#[cfg(test)] -pub(crate) mod tests { - use super::*; - use crate::AgentServerCommand; - use std::path::Path; - - crate::common_e2e_tests!(Codex, allow_option_id = "approve"); - - pub fn local_command() -> AgentServerCommand { - let cli_path = Path::new(env!("CARGO_MANIFEST_DIR")) - .join("../../../codex/codex-rs/target/debug/codex"); - - AgentServerCommand { - path: cli_path, - args: vec![], - env: None, - } - } -} diff --git a/crates/agent_servers/src/e2e_tests.rs b/crates/agent_servers/src/e2e_tests.rs index e9c72eabc9..16bf1e6b47 100644 --- a/crates/agent_servers/src/e2e_tests.rs +++ b/crates/agent_servers/src/e2e_tests.rs @@ -375,9 +375,6 @@ pub async fn init_test(cx: &mut TestAppContext) -> Arc { gemini: Some(AgentServerSettings { command: crate::gemini::tests::local_command(), }), - codex: Some(AgentServerSettings { - command: crate::codex::tests::local_command(), - }), }, cx, ); diff --git a/crates/agent_servers/src/gemini.rs b/crates/agent_servers/src/gemini.rs index 77e7d1063f..372ce76aa9 100644 --- a/crates/agent_servers/src/gemini.rs +++ b/crates/agent_servers/src/gemini.rs @@ -14,7 +14,7 @@ use crate::AllAgentServersSettings; #[derive(Clone)] pub struct Gemini; -const MCP_ARG: &str = "--experimental-mcp"; +const ACP_ARG: &str = "--experimental-acp"; impl AgentServer for Gemini { fn name(&self) -> &'static str { @@ -35,26 +35,26 @@ impl AgentServer for Gemini { fn connect( &self, - _root_dir: &Path, + root_dir: &Path, project: &Entity, cx: &mut App, ) -> Task>> { let project = project.clone(); let server_name = self.name(); - let working_directory = project.read(cx).active_project_directory(cx); + let root_dir = root_dir.to_path_buf(); cx.spawn(async move |cx| { let settings = cx.read_global(|settings: &SettingsStore, _| { settings.get::(None).gemini.clone() })?; let Some(command) = - AgentServerCommand::resolve("gemini", &[MCP_ARG], settings, &project, cx).await + AgentServerCommand::resolve("gemini", &[ACP_ARG], settings, &project, cx).await else { anyhow::bail!("Failed to find gemini binary"); }; // todo! check supported version - let conn = AcpConnection::stdio(server_name, command, working_directory, cx).await?; + let conn = AcpConnection::stdio(server_name, command, &root_dir, cx).await?; Ok(Rc::new(conn) as _) }) } diff --git a/crates/agent_servers/src/mcp_server.rs b/crates/agent_servers/src/mcp_server.rs deleted file mode 100644 index ec655800ed..0000000000 --- a/crates/agent_servers/src/mcp_server.rs +++ /dev/null @@ -1,208 +0,0 @@ -use acp_thread::AcpThread; -use agent_client_protocol as acp; -use anyhow::Result; -use context_server::listener::{McpServerTool, ToolResponse}; -use context_server::types::{ - Implementation, InitializeParams, InitializeResponse, ProtocolVersion, ServerCapabilities, - ToolsCapabilities, requests, -}; -use futures::channel::oneshot; -use gpui::{App, AsyncApp, Task, WeakEntity}; -use indoc::indoc; - -pub struct ZedMcpServer { - server: context_server::listener::McpServer, -} - -pub const SERVER_NAME: &str = "zed"; - -impl ZedMcpServer { - pub async fn new( - thread_rx: watch::Receiver>, - cx: &AsyncApp, - ) -> Result { - let mut mcp_server = context_server::listener::McpServer::new(cx).await?; - mcp_server.handle_request::(Self::handle_initialize); - - mcp_server.add_tool(RequestPermissionTool { - thread_rx: thread_rx.clone(), - }); - mcp_server.add_tool(ReadTextFileTool { - thread_rx: thread_rx.clone(), - }); - mcp_server.add_tool(WriteTextFileTool { - thread_rx: thread_rx.clone(), - }); - - Ok(Self { server: mcp_server }) - } - - pub fn server_config(&self) -> Result { - #[cfg(not(test))] - let zed_path = anyhow::Context::context( - std::env::current_exe(), - "finding current executable path for use in mcp_server", - )?; - - #[cfg(test)] - let zed_path = crate::e2e_tests::get_zed_path(); - - Ok(acp::McpServer { - name: SERVER_NAME.into(), - command: zed_path, - args: vec![ - "--nc".into(), - self.server.socket_path().display().to_string(), - ], - env: vec![], - }) - } - - fn handle_initialize(_: InitializeParams, cx: &App) -> Task> { - cx.foreground_executor().spawn(async move { - Ok(InitializeResponse { - protocol_version: ProtocolVersion("2025-06-18".into()), - capabilities: ServerCapabilities { - experimental: None, - logging: None, - completions: None, - prompts: None, - resources: None, - tools: Some(ToolsCapabilities { - list_changed: Some(false), - }), - }, - server_info: Implementation { - name: SERVER_NAME.into(), - version: "0.1.0".into(), - }, - meta: None, - }) - }) - } -} - -// Tools - -#[derive(Clone)] -pub struct RequestPermissionTool { - thread_rx: watch::Receiver>, -} - -impl McpServerTool for RequestPermissionTool { - type Input = acp::RequestPermissionArguments; - type Output = acp::RequestPermissionOutput; - - const NAME: &'static str = "Confirmation"; - - fn description(&self) -> &'static str { - indoc! {" - Request permission for tool calls. - - This tool is meant to be called programmatically by the agent loop, not the LLM. - "} - } - - async fn run( - &self, - input: Self::Input, - cx: &mut AsyncApp, - ) -> Result> { - let mut thread_rx = self.thread_rx.clone(); - let Some(thread) = thread_rx.recv().await?.upgrade() else { - anyhow::bail!("Thread closed"); - }; - - let result = thread - .update(cx, |thread, cx| { - thread.request_tool_call_permission(input.tool_call, input.options, cx) - })? - .await; - - let outcome = match result { - Ok(option_id) => acp::RequestPermissionOutcome::Selected { option_id }, - Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Canceled, - }; - - Ok(ToolResponse { - content: vec![], - structured_content: acp::RequestPermissionOutput { outcome }, - }) - } -} - -#[derive(Clone)] -pub struct ReadTextFileTool { - thread_rx: watch::Receiver>, -} - -impl McpServerTool for ReadTextFileTool { - type Input = acp::ReadTextFileArguments; - type Output = acp::ReadTextFileOutput; - - const NAME: &'static str = "Read"; - - fn description(&self) -> &'static str { - "Reads the content of the given file in the project including unsaved changes." - } - - async fn run( - &self, - input: Self::Input, - cx: &mut AsyncApp, - ) -> Result> { - let mut thread_rx = self.thread_rx.clone(); - let Some(thread) = thread_rx.recv().await?.upgrade() else { - anyhow::bail!("Thread closed"); - }; - - let content = thread - .update(cx, |thread, cx| { - thread.read_text_file(input.path, input.line, input.limit, false, cx) - })? - .await?; - - Ok(ToolResponse { - content: vec![], - structured_content: acp::ReadTextFileOutput { content }, - }) - } -} - -#[derive(Clone)] -pub struct WriteTextFileTool { - thread_rx: watch::Receiver>, -} - -impl McpServerTool for WriteTextFileTool { - type Input = acp::WriteTextFileArguments; - type Output = (); - - const NAME: &'static str = "Write"; - - fn description(&self) -> &'static str { - "Write to a file replacing its contents" - } - - async fn run( - &self, - input: Self::Input, - cx: &mut AsyncApp, - ) -> Result> { - let mut thread_rx = self.thread_rx.clone(); - let Some(thread) = thread_rx.recv().await?.upgrade() else { - anyhow::bail!("Thread closed"); - }; - - thread - .update(cx, |thread, cx| { - thread.write_text_file(input.path, input.content, cx) - })? - .await?; - - Ok(ToolResponse { - content: vec![], - structured_content: (), - }) - } -} diff --git a/crates/agent_servers/src/settings.rs b/crates/agent_servers/src/settings.rs index aeb34a5e61..645674b5f1 100644 --- a/crates/agent_servers/src/settings.rs +++ b/crates/agent_servers/src/settings.rs @@ -13,7 +13,6 @@ pub fn init(cx: &mut App) { pub struct AllAgentServersSettings { pub gemini: Option, pub claude: Option, - pub codex: Option, } #[derive(Deserialize, Serialize, Clone, JsonSchema, Debug)] @@ -30,21 +29,13 @@ impl settings::Settings for AllAgentServersSettings { fn load(sources: SettingsSources, _: &mut App) -> Result { let mut settings = AllAgentServersSettings::default(); - for AllAgentServersSettings { - gemini, - claude, - codex, - } in sources.defaults_and_customizations() - { + for AllAgentServersSettings { gemini, claude } in sources.defaults_and_customizations() { if gemini.is_some() { settings.gemini = gemini.clone(); } if claude.is_some() { settings.claude = claude.clone(); } - if codex.is_some() { - settings.codex = codex.clone(); - } } Ok(settings) diff --git a/crates/agent_ui/src/acp/thread_view.rs b/crates/agent_ui/src/acp/thread_view.rs index 6d7684bbfc..17575e42db 100644 --- a/crates/agent_ui/src/acp/thread_view.rs +++ b/crates/agent_ui/src/acp/thread_view.rs @@ -2211,13 +2211,16 @@ impl Render for AcpThreadView { .child(self.render_pending_auth_state()) .child(h_flex().mt_1p5().justify_center().children( connection.auth_methods().into_iter().map(|method| { - Button::new(SharedString::from(method.id.0.clone()), method.label) - .on_click({ - let method_id = method.id.clone(); - cx.listener(move |this, _, window, cx| { - this.authenticate(method_id.clone(), window, cx) - }) + Button::new( + SharedString::from(method.id.0.clone()), + method.label.clone(), + ) + .on_click({ + let method_id = method.id.clone(); + cx.listener(move |this, _, window, cx| { + this.authenticate(method_id.clone(), window, cx) }) + }) }), )), ThreadState::Loading { .. } => v_flex().flex_1().child(self.render_empty_state(cx)), diff --git a/crates/agent_ui/src/agent_panel.rs b/crates/agent_ui/src/agent_panel.rs index 91217cb030..875320372d 100644 --- a/crates/agent_ui/src/agent_panel.rs +++ b/crates/agent_ui/src/agent_panel.rs @@ -1991,20 +1991,6 @@ impl AgentPanel { ); }), ) - .item( - ContextMenuEntry::new("New Codex Thread") - .icon(IconName::AiOpenAi) - .icon_color(Color::Muted) - .handler(move |window, cx| { - window.dispatch_action( - NewExternalAgentThread { - agent: Some(crate::ExternalAgent::Codex), - } - .boxed_clone(), - cx, - ); - }), - ) }); menu })) @@ -2666,25 +2652,6 @@ impl AgentPanel { ) }, ), - ) - .child( - NewThreadButton::new( - "new-codex-thread-btn", - "New Codex Thread", - IconName::AiOpenAi, - ) - .on_click( - |window, cx| { - window.dispatch_action( - Box::new(NewExternalAgentThread { - agent: Some( - crate::ExternalAgent::Codex, - ), - }), - cx, - ) - }, - ), ), ) }), diff --git a/crates/agent_ui/src/agent_ui.rs b/crates/agent_ui/src/agent_ui.rs index 4b75cc9e77..6ae78585de 100644 --- a/crates/agent_ui/src/agent_ui.rs +++ b/crates/agent_ui/src/agent_ui.rs @@ -150,7 +150,6 @@ enum ExternalAgent { #[default] Gemini, ClaudeCode, - Codex, } impl ExternalAgent { @@ -158,7 +157,6 @@ impl ExternalAgent { match self { ExternalAgent::Gemini => Rc::new(agent_servers::Gemini), ExternalAgent::ClaudeCode => Rc::new(agent_servers::ClaudeCode), - ExternalAgent::Codex => Rc::new(agent_servers::Codex), } } } From 8acb58b6e50fa26128675a9b8f60ee82a28c90dd Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Fri, 1 Aug 2025 18:15:26 -0300 Subject: [PATCH 12/13] Fix types --- crates/agent_servers/src/acp_connection.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/crates/agent_servers/src/acp_connection.rs b/crates/agent_servers/src/acp_connection.rs index ca9ec2aea0..0ced22fc65 100644 --- a/crates/agent_servers/src/acp_connection.rs +++ b/crates/agent_servers/src/acp_connection.rs @@ -1,11 +1,10 @@ -use agent_client_protocol as acp; +use agent_client_protocol::{self as acp, Agent as _}; use collections::HashMap; use futures::channel::oneshot; use project::Project; use std::cell::RefCell; use std::path::Path; use std::rc::Rc; -use util::ResultExt; use anyhow::{Context as _, Result}; use gpui::{App, AppContext as _, AsyncApp, Entity, Task, WeakEntity}; @@ -15,7 +14,7 @@ use acp_thread::{AcpThread, AgentConnection, AuthRequired}; pub struct AcpConnection { server_name: &'static str, - connection: Rc, + connection: Rc, sessions: Rc>>, auth_methods: Vec, _io_task: Task>, @@ -51,7 +50,7 @@ impl AcpConnection { sessions: sessions.clone(), cx: cx.clone(), }; - let (connection, io_task) = acp::AgentConnection::new(client, stdin, stdout, { + let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, { let foreground_executor = cx.foreground_executor().clone(); move |fut| { foreground_executor.spawn(fut).detach(); @@ -149,8 +148,14 @@ impl AgentConnection for AcpConnection { .spawn(async move { Ok(conn.prompt(params).await?) }) } - fn cancel(&self, session_id: &acp::SessionId, _cx: &mut App) { - self.connection.cancel(session_id.clone()).log_err(); + fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) { + let conn = self.connection.clone(); + let params = acp::CancelledNotification { + session_id: session_id.clone(), + }; + cx.foreground_executor() + .spawn(async move { conn.cancelled(params).await }) + .detach(); } } From 8890f590b1f1791e3640c188c62859c00843f59a Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Fri, 1 Aug 2025 22:25:04 -0600 Subject: [PATCH 13/13] Fix some breakages against agent-client-protocol/main --- crates/agent_ui/src/acp/thread_view.rs | 38 ++++++++++++++++---------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/crates/agent_ui/src/acp/thread_view.rs b/crates/agent_ui/src/acp/thread_view.rs index 167f7e6136..26166b6960 100644 --- a/crates/agent_ui/src/acp/thread_view.rs +++ b/crates/agent_ui/src/acp/thread_view.rs @@ -2844,10 +2844,6 @@ mod tests { } impl AgentConnection for StubAgentConnection { - fn name(&self) -> &'static str { - "StubAgentConnection" - } - fn new_thread( self: Rc, project: Entity, @@ -2863,17 +2859,27 @@ mod tests { .into(), ); let thread = cx - .new(|cx| AcpThread::new(self.clone(), project, session_id.clone(), cx)) + .new(|cx| { + AcpThread::new("New Thread", self.clone(), project, session_id.clone(), cx) + }) .unwrap(); self.sessions.lock().insert(session_id, thread.downgrade()); Task::ready(Ok(thread)) } - fn authenticate(&self, _cx: &mut App) -> Task> { + fn auth_methods(&self) -> &[agent_client_protocol::AuthMethod] { + todo!() + } + + fn authenticate( + &self, + _method: acp::AuthMethodId, + _cx: &mut App, + ) -> Task> { unimplemented!() } - fn prompt(&self, params: acp::PromptArguments, cx: &mut App) -> Task> { + fn prompt(&self, params: acp::PromptRequest, cx: &mut App) -> Task> { let sessions = self.sessions.lock(); let thread = sessions.get(¶ms.session_id).unwrap(); let mut tasks = vec![]; @@ -2920,10 +2926,6 @@ mod tests { struct SaboteurAgentConnection; impl AgentConnection for SaboteurAgentConnection { - fn name(&self) -> &'static str { - "SaboteurAgentConnection" - } - fn new_thread( self: Rc, project: Entity, @@ -2931,15 +2933,23 @@ mod tests { cx: &mut gpui::AsyncApp, ) -> Task>> { Task::ready(Ok(cx - .new(|cx| AcpThread::new(self, project, SessionId("test".into()), cx)) + .new(|cx| AcpThread::new("New Thread", self, project, SessionId("test".into()), cx)) .unwrap())) } - fn authenticate(&self, _cx: &mut App) -> Task> { + fn auth_methods(&self) -> &[agent_client_protocol::AuthMethod] { + todo!() + } + + fn authenticate( + &self, + _method: acp::AuthMethodId, + _cx: &mut App, + ) -> Task> { unimplemented!() } - fn prompt(&self, _params: acp::PromptArguments, _cx: &mut App) -> Task> { + fn prompt(&self, _params: acp::PromptRequest, _cx: &mut App) -> Task> { Task::ready(Err(anyhow::anyhow!("Error prompting"))) }