From 0f395df9a8f8f8dcceeed3f5cd98e7ba5014cc1d Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Mon, 28 Jul 2025 18:02:21 -0300 Subject: [PATCH 01/10] Update to new schema --- Cargo.lock | 4 +--- Cargo.toml | 2 +- crates/acp_thread/src/acp_thread.rs | 14 +++++++------- crates/agent_servers/src/codex.rs | 12 ++++-------- crates/agent_servers/src/mcp_server.rs | 7 ++++--- 5 files changed, 17 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8d9a622655..eb034f4cc3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -138,9 +138,7 @@ dependencies = [ [[package]] name = "agent-client-protocol" -version = "0.0.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72ec54650c1fc2d63498bab47eeeaa9eddc7d239d53f615b797a0e84f7ccc87b" +version = "0.0.12" dependencies = [ "schemars", "serde", diff --git a/Cargo.toml b/Cargo.toml index 16ace7dee0..d733f2242e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -413,7 +413,7 @@ zlog_settings = { path = "crates/zlog_settings" } # agentic-coding-protocol = "0.0.10" -agent-client-protocol = "0.0.11" +agent-client-protocol = {path="../agent-client-protocol"} aho-corasick = "1.1" alacritty_terminal = { git = "https://github.com/zed-industries/alacritty.git", branch = "add-hush-login-flag" } any_vec = "0.14" diff --git a/crates/acp_thread/src/acp_thread.rs b/crates/acp_thread/src/acp_thread.rs index d572992c54..d02f2d6bb6 100644 --- a/crates/acp_thread/src/acp_thread.rs +++ b/crates/acp_thread/src/acp_thread.rs @@ -391,7 +391,7 @@ impl ToolCallContent { cx: &mut App, ) -> Self { match content { - acp::ToolCallContent::ContentBlock(content) => Self::ContentBlock { + acp::ToolCallContent::Content { content } => Self::ContentBlock { content: ContentBlock::new(content, &language_registry, cx), }, acp::ToolCallContent::Diff { diff } => Self::Diff { @@ -682,14 +682,14 @@ impl AcpThread { cx: &mut Context, ) -> Result<()> { match update { - acp::SessionUpdate::UserMessage(content_block) => { - self.push_user_content_block(content_block, cx); + acp::SessionUpdate::UserMessageChunk { content } => { + self.push_user_content_block(content, cx); } - acp::SessionUpdate::AgentMessageChunk(content_block) => { - self.push_assistant_content_block(content_block, false, cx); + acp::SessionUpdate::AgentMessageChunk { content } => { + self.push_assistant_content_block(content, false, cx); } - acp::SessionUpdate::AgentThoughtChunk(content_block) => { - self.push_assistant_content_block(content_block, true, cx); + acp::SessionUpdate::AgentThoughtChunk { content } => { + self.push_assistant_content_block(content, true, cx); } acp::SessionUpdate::ToolCall(tool_call) => { self.upsert_tool_call(tool_call, cx); diff --git a/crates/agent_servers/src/codex.rs b/crates/agent_servers/src/codex.rs index b10ce9cf54..d40aebfbd7 100644 --- a/crates/agent_servers/src/codex.rs +++ b/crates/agent_servers/src/codex.rs @@ -73,7 +73,7 @@ impl AgentServer for Codex { client .client() .context("Failed to subscribe")? - .on_notification(acp::SESSION_UPDATE_METHOD_NAME, { + .on_notification(acp::AGENT_METHODS.session_update, { move |notification, _cx| { let notification_tx = notification_tx.clone(); log::trace!( @@ -149,13 +149,9 @@ impl AgentConnection for CodexConnection { let response = client .request::(context_server::types::CallToolParams { - name: acp::NEW_SESSION_TOOL_NAME.into(), + name: acp::AGENT_METHODS.new_session.into(), arguments: Some(serde_json::to_value(acp::NewSessionArguments { - mcp_servers: [( - mcp_server::SERVER_NAME.to_string(), - mcp_server.server_config()?, - )] - .into(), + mcp_servers: vec![mcp_server.server_config()?], client_tools: acp::ClientTools { request_permission: Some(acp::McpToolId { mcp_server: mcp_server::SERVER_NAME.into(), @@ -227,7 +223,7 @@ impl AgentConnection for CodexConnection { let result = client .request_with::( context_server::types::CallToolParams { - name: acp::PROMPT_TOOL_NAME.into(), + name: acp::AGENT_METHODS.prompt.into(), arguments: Some(serde_json::to_value(params)?), meta: None, }, diff --git a/crates/agent_servers/src/mcp_server.rs b/crates/agent_servers/src/mcp_server.rs index 055b89dfe2..ec655800ed 100644 --- a/crates/agent_servers/src/mcp_server.rs +++ b/crates/agent_servers/src/mcp_server.rs @@ -37,7 +37,7 @@ impl ZedMcpServer { Ok(Self { server: mcp_server }) } - pub fn server_config(&self) -> Result { + pub fn server_config(&self) -> Result { #[cfg(not(test))] let zed_path = anyhow::Context::context( std::env::current_exe(), @@ -47,13 +47,14 @@ impl ZedMcpServer { #[cfg(test)] let zed_path = crate::e2e_tests::get_zed_path(); - Ok(acp::McpServerConfig { + Ok(acp::McpServer { + name: SERVER_NAME.into(), command: zed_path, args: vec![ "--nc".into(), self.server.socket_path().display().to_string(), ], - env: None, + env: vec![], }) } From ced3d09f10c1d64fa8dd31065724e4b12b69e9ee Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Mon, 28 Jul 2025 18:32:30 -0300 Subject: [PATCH 02/10] Extract acp_connection --- Cargo.lock | 4 +- Cargo.toml | 2 +- crates/acp_thread/src/acp_thread.rs | 3 +- crates/acp_thread/src/connection.rs | 2 - crates/acp_thread/src/old_acp_support.rs | 6 +- crates/agent_servers/acp | 0 crates/agent_servers/src/acp_connection.rs | 256 +++++++++++++++++++++ crates/agent_servers/src/agent_servers.rs | 1 + crates/agent_servers/src/claude.rs | 9 +- crates/agent_servers/src/codex.rs | 255 +------------------- crates/agent_servers/src/gemini.rs | 182 +++------------ 11 files changed, 305 insertions(+), 415 deletions(-) create mode 100644 crates/agent_servers/acp create mode 100644 crates/agent_servers/src/acp_connection.rs diff --git a/Cargo.lock b/Cargo.lock index eb034f4cc3..e5b1be5a8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -138,7 +138,9 @@ dependencies = [ [[package]] name = "agent-client-protocol" -version = "0.0.12" +version = "0.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4255a06cc2414033d1fe4baf1968bcc8f16d7e5814f272b97779b5806d129142" dependencies = [ "schemars", "serde", diff --git a/Cargo.toml b/Cargo.toml index d733f2242e..81da82cbb7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -413,7 +413,7 @@ zlog_settings = { path = "crates/zlog_settings" } # agentic-coding-protocol = "0.0.10" -agent-client-protocol = {path="../agent-client-protocol"} +agent-client-protocol = "0.0.13" aho-corasick = "1.1" alacritty_terminal = { git = "https://github.com/zed-industries/alacritty.git", branch = "add-hush-login-flag" } any_vec = "0.14" diff --git a/crates/acp_thread/src/acp_thread.rs b/crates/acp_thread/src/acp_thread.rs index d02f2d6bb6..1c4b0ec06f 100644 --- a/crates/acp_thread/src/acp_thread.rs +++ b/crates/acp_thread/src/acp_thread.rs @@ -616,6 +616,7 @@ impl Error for LoadError {} impl AcpThread { pub fn new( + title: impl Into, connection: Rc, project: Entity, session_id: acp::SessionId, @@ -628,7 +629,7 @@ impl AcpThread { shared_buffers: Default::default(), entries: Default::default(), plan: Default::default(), - title: connection.name().into(), + title: title.into(), project, send_task: None, connection, diff --git a/crates/acp_thread/src/connection.rs b/crates/acp_thread/src/connection.rs index 5b25b71863..97161a19c0 100644 --- a/crates/acp_thread/src/connection.rs +++ b/crates/acp_thread/src/connection.rs @@ -9,8 +9,6 @@ use ui::App; use crate::AcpThread; pub trait AgentConnection { - fn name(&self) -> &'static str; - fn new_thread( self: Rc, project: Entity, diff --git a/crates/acp_thread/src/old_acp_support.rs b/crates/acp_thread/src/old_acp_support.rs index 44cd00348f..d7ef1b73da 100644 --- a/crates/acp_thread/src/old_acp_support.rs +++ b/crates/acp_thread/src/old_acp_support.rs @@ -367,10 +367,6 @@ pub struct OldAcpAgentConnection { } impl AgentConnection for OldAcpAgentConnection { - fn name(&self) -> &'static str { - self.name - } - fn new_thread( self: Rc, project: Entity, @@ -394,7 +390,7 @@ impl AgentConnection for OldAcpAgentConnection { cx.update(|cx| { let thread = cx.new(|cx| { let session_id = acp::SessionId("acp-old-no-id".into()); - AcpThread::new(self.clone(), project, session_id, cx) + AcpThread::new("Gemini", self.clone(), project, session_id, cx) }); thread }) diff --git a/crates/agent_servers/acp b/crates/agent_servers/acp new file mode 100644 index 0000000000..e69de29bb2 diff --git a/crates/agent_servers/src/acp_connection.rs b/crates/agent_servers/src/acp_connection.rs new file mode 100644 index 0000000000..9139d62c38 --- /dev/null +++ b/crates/agent_servers/src/acp_connection.rs @@ -0,0 +1,256 @@ +use agent_client_protocol as acp; +use anyhow::anyhow; +use collections::HashMap; +use context_server::listener::McpServerTool; +use context_server::types::requests; +use context_server::{ContextServer, ContextServerCommand, ContextServerId}; +use futures::channel::{mpsc, oneshot}; +use project::Project; +use smol::stream::StreamExt as _; +use std::cell::RefCell; +use std::rc::Rc; +use std::{path::Path, sync::Arc}; +use util::ResultExt; + +use anyhow::{Context, Result}; +use gpui::{App, AppContext as _, AsyncApp, Entity, Task, WeakEntity}; + +use crate::mcp_server::ZedMcpServer; +use crate::{AgentServerCommand, mcp_server}; +use acp_thread::{AcpThread, AgentConnection}; + +pub struct AcpConnection { + server_name: &'static str, + client: Arc, + sessions: Rc>>, + _notification_handler_task: Task<()>, +} + +impl AcpConnection { + pub async fn stdio( + server_name: &'static str, + command: AgentServerCommand, + cx: &mut AsyncApp, + ) -> Result { + let client: Arc = ContextServer::stdio( + ContextServerId(format!("{}-mcp-server", server_name).into()), + ContextServerCommand { + path: command.path, + args: command.args, + env: command.env, + }, + ) + .into(); + ContextServer::start(client.clone(), cx).await?; + + let (notification_tx, mut notification_rx) = mpsc::unbounded(); + client + .client() + .context("Failed to subscribe")? + .on_notification(acp::AGENT_METHODS.session_update, { + move |notification, _cx| { + let notification_tx = notification_tx.clone(); + log::trace!( + "ACP Notification: {}", + serde_json::to_string_pretty(¬ification).unwrap() + ); + + if let Some(notification) = + serde_json::from_value::(notification).log_err() + { + notification_tx.unbounded_send(notification).ok(); + } + } + }); + + let sessions = Rc::new(RefCell::new(HashMap::default())); + + let notification_handler_task = cx.spawn({ + let sessions = sessions.clone(); + async move |cx| { + while let Some(notification) = notification_rx.next().await { + Self::handle_session_notification(notification, sessions.clone(), cx) + } + } + }); + + Ok(Self { + server_name, + client, + sessions, + _notification_handler_task: notification_handler_task, + }) + } + + pub fn handle_session_notification( + notification: acp::SessionNotification, + threads: Rc>>, + cx: &mut AsyncApp, + ) { + let threads = threads.borrow(); + let Some(thread) = threads + .get(¬ification.session_id) + .and_then(|session| session.thread.upgrade()) + else { + log::error!( + "Thread not found for session ID: {}", + notification.session_id + ); + return; + }; + + thread + .update(cx, |thread, cx| { + thread.handle_session_update(notification.update, cx) + }) + .log_err(); + } +} + +pub struct AcpSession { + thread: WeakEntity, + cancel_tx: Option>, + _mcp_server: ZedMcpServer, +} + +impl AgentConnection for AcpConnection { + fn new_thread( + self: Rc, + project: Entity, + cwd: &Path, + cx: &mut AsyncApp, + ) -> Task>> { + let client = self.client.client(); + let sessions = self.sessions.clone(); + let cwd = cwd.to_path_buf(); + cx.spawn(async move |cx| { + let client = client.context("MCP server is not initialized yet")?; + let (mut thread_tx, thread_rx) = watch::channel(WeakEntity::new_invalid()); + + let mcp_server = ZedMcpServer::new(thread_rx, cx).await?; + + let response = client + .request::(context_server::types::CallToolParams { + name: acp::AGENT_METHODS.new_session.into(), + arguments: Some(serde_json::to_value(acp::NewSessionArguments { + mcp_servers: vec![mcp_server.server_config()?], + client_tools: acp::ClientTools { + request_permission: Some(acp::McpToolId { + mcp_server: mcp_server::SERVER_NAME.into(), + tool_name: mcp_server::RequestPermissionTool::NAME.into(), + }), + read_text_file: Some(acp::McpToolId { + mcp_server: mcp_server::SERVER_NAME.into(), + tool_name: mcp_server::ReadTextFileTool::NAME.into(), + }), + write_text_file: Some(acp::McpToolId { + mcp_server: mcp_server::SERVER_NAME.into(), + tool_name: mcp_server::WriteTextFileTool::NAME.into(), + }), + }, + cwd, + })?), + meta: None, + }) + .await?; + + if response.is_error.unwrap_or_default() { + return Err(anyhow!(response.text_contents())); + } + + let result = serde_json::from_value::( + response.structured_content.context("Empty response")?, + )?; + + let thread = cx.new(|cx| { + AcpThread::new( + self.server_name, + self.clone(), + project, + result.session_id.clone(), + cx, + ) + })?; + + thread_tx.send(thread.downgrade())?; + + let session = AcpSession { + thread: thread.downgrade(), + cancel_tx: None, + _mcp_server: mcp_server, + }; + sessions.borrow_mut().insert(result.session_id, session); + + Ok(thread) + }) + } + + fn authenticate(&self, _cx: &mut App) -> Task> { + Task::ready(Err(anyhow!("Authentication not supported"))) + } + + fn prompt( + &self, + params: agent_client_protocol::PromptArguments, + cx: &mut App, + ) -> Task> { + let client = self.client.client(); + let sessions = self.sessions.clone(); + + cx.foreground_executor().spawn(async move { + let client = client.context("MCP server is not initialized yet")?; + + let (new_cancel_tx, cancel_rx) = oneshot::channel(); + { + let mut sessions = sessions.borrow_mut(); + let session = sessions + .get_mut(¶ms.session_id) + .context("Session not found")?; + session.cancel_tx.replace(new_cancel_tx); + } + + let result = client + .request_with::( + context_server::types::CallToolParams { + name: acp::AGENT_METHODS.prompt.into(), + arguments: Some(serde_json::to_value(params)?), + meta: None, + }, + Some(cancel_rx), + None, + ) + .await; + + if let Err(err) = &result + && err.is::() + { + return Ok(()); + } + + let response = result?; + + if response.is_error.unwrap_or_default() { + return Err(anyhow!(response.text_contents())); + } + + Ok(()) + }) + } + + fn cancel(&self, session_id: &agent_client_protocol::SessionId, _cx: &mut App) { + let mut sessions = self.sessions.borrow_mut(); + + if let Some(cancel_tx) = sessions + .get_mut(session_id) + .and_then(|session| session.cancel_tx.take()) + { + cancel_tx.send(()).ok(); + } + } +} + +impl Drop for AcpConnection { + fn drop(&mut self) { + self.client.stop().log_err(); + } +} diff --git a/crates/agent_servers/src/agent_servers.rs b/crates/agent_servers/src/agent_servers.rs index 212bb74d8a..6a031a190e 100644 --- a/crates/agent_servers/src/agent_servers.rs +++ b/crates/agent_servers/src/agent_servers.rs @@ -1,3 +1,4 @@ +mod acp_connection; mod claude; mod codex; mod gemini; diff --git a/crates/agent_servers/src/claude.rs b/crates/agent_servers/src/claude.rs index 6565786204..590da69cd8 100644 --- a/crates/agent_servers/src/claude.rs +++ b/crates/agent_servers/src/claude.rs @@ -70,10 +70,6 @@ struct ClaudeAgentConnection { } impl AgentConnection for ClaudeAgentConnection { - fn name(&self) -> &'static str { - ClaudeCode.name() - } - fn new_thread( self: Rc, project: Entity, @@ -168,8 +164,9 @@ impl AgentConnection for ClaudeAgentConnection { } }); - let thread = - cx.new(|cx| AcpThread::new(self.clone(), project, session_id.clone(), cx))?; + let thread = cx.new(|cx| { + AcpThread::new("Claude Code", self.clone(), project, session_id.clone(), cx) + })?; thread_tx.send(thread.downgrade())?; diff --git a/crates/agent_servers/src/codex.rs b/crates/agent_servers/src/codex.rs index d40aebfbd7..1909781efa 100644 --- a/crates/agent_servers/src/codex.rs +++ b/crates/agent_servers/src/codex.rs @@ -1,24 +1,14 @@ -use agent_client_protocol as acp; -use anyhow::anyhow; -use collections::HashMap; -use context_server::listener::McpServerTool; -use context_server::types::requests; -use context_server::{ContextServer, ContextServerCommand, ContextServerId}; -use futures::channel::{mpsc, oneshot}; use project::Project; use settings::SettingsStore; -use smol::stream::StreamExt as _; -use std::cell::RefCell; +use std::path::Path; use std::rc::Rc; -use std::{path::Path, sync::Arc}; -use util::ResultExt; -use anyhow::{Context, Result}; -use gpui::{App, AppContext as _, AsyncApp, Entity, Task, WeakEntity}; +use anyhow::Result; +use gpui::{App, Entity, Task}; -use crate::mcp_server::ZedMcpServer; -use crate::{AgentServer, AgentServerCommand, AllAgentServersSettings, mcp_server}; -use acp_thread::{AcpThread, AgentConnection}; +use crate::acp_connection::AcpConnection; +use crate::{AgentServer, AgentServerCommand, AllAgentServersSettings}; +use acp_thread::AgentConnection; #[derive(Clone)] pub struct Codex; @@ -47,6 +37,7 @@ impl AgentServer for Codex { cx: &mut App, ) -> Task>> { let project = project.clone(); + let server_name = self.name(); cx.spawn(async move |cx| { let settings = cx.read_global(|settings: &SettingsStore, _| { settings.get::(None).codex.clone() @@ -58,240 +49,12 @@ impl AgentServer for Codex { anyhow::bail!("Failed to find codex binary"); }; - let client: Arc = ContextServer::stdio( - ContextServerId("codex-mcp-server".into()), - ContextServerCommand { - path: command.path, - args: command.args, - env: command.env, - }, - ) - .into(); - ContextServer::start(client.clone(), cx).await?; - - let (notification_tx, mut notification_rx) = mpsc::unbounded(); - client - .client() - .context("Failed to subscribe")? - .on_notification(acp::AGENT_METHODS.session_update, { - move |notification, _cx| { - let notification_tx = notification_tx.clone(); - log::trace!( - "ACP Notification: {}", - serde_json::to_string_pretty(¬ification).unwrap() - ); - - if let Some(notification) = - serde_json::from_value::(notification) - .log_err() - { - notification_tx.unbounded_send(notification).ok(); - } - } - }); - - let sessions = Rc::new(RefCell::new(HashMap::default())); - - let notification_handler_task = cx.spawn({ - let sessions = sessions.clone(); - async move |cx| { - while let Some(notification) = notification_rx.next().await { - CodexConnection::handle_session_notification( - notification, - sessions.clone(), - cx, - ) - } - } - }); - - let connection = CodexConnection { - client, - sessions, - _notification_handler_task: notification_handler_task, - }; - Ok(Rc::new(connection) as _) + let conn = AcpConnection::stdio(server_name, command, cx).await?; + Ok(Rc::new(conn) as _) }) } } -struct CodexConnection { - client: Arc, - sessions: Rc>>, - _notification_handler_task: Task<()>, -} - -struct CodexSession { - thread: WeakEntity, - cancel_tx: Option>, - _mcp_server: ZedMcpServer, -} - -impl AgentConnection for CodexConnection { - fn name(&self) -> &'static str { - "Codex" - } - - fn new_thread( - self: Rc, - project: Entity, - cwd: &Path, - cx: &mut AsyncApp, - ) -> Task>> { - let client = self.client.client(); - let sessions = self.sessions.clone(); - let cwd = cwd.to_path_buf(); - cx.spawn(async move |cx| { - let client = client.context("MCP server is not initialized yet")?; - let (mut thread_tx, thread_rx) = watch::channel(WeakEntity::new_invalid()); - - let mcp_server = ZedMcpServer::new(thread_rx, cx).await?; - - let response = client - .request::(context_server::types::CallToolParams { - name: acp::AGENT_METHODS.new_session.into(), - arguments: Some(serde_json::to_value(acp::NewSessionArguments { - mcp_servers: vec![mcp_server.server_config()?], - client_tools: acp::ClientTools { - request_permission: Some(acp::McpToolId { - mcp_server: mcp_server::SERVER_NAME.into(), - tool_name: mcp_server::RequestPermissionTool::NAME.into(), - }), - read_text_file: Some(acp::McpToolId { - mcp_server: mcp_server::SERVER_NAME.into(), - tool_name: mcp_server::ReadTextFileTool::NAME.into(), - }), - write_text_file: Some(acp::McpToolId { - mcp_server: mcp_server::SERVER_NAME.into(), - tool_name: mcp_server::WriteTextFileTool::NAME.into(), - }), - }, - cwd, - })?), - meta: None, - }) - .await?; - - if response.is_error.unwrap_or_default() { - return Err(anyhow!(response.text_contents())); - } - - let result = serde_json::from_value::( - response.structured_content.context("Empty response")?, - )?; - - let thread = - cx.new(|cx| AcpThread::new(self.clone(), project, result.session_id.clone(), cx))?; - - thread_tx.send(thread.downgrade())?; - - let session = CodexSession { - thread: thread.downgrade(), - cancel_tx: None, - _mcp_server: mcp_server, - }; - sessions.borrow_mut().insert(result.session_id, session); - - Ok(thread) - }) - } - - fn authenticate(&self, _cx: &mut App) -> Task> { - Task::ready(Err(anyhow!("Authentication not supported"))) - } - - fn prompt( - &self, - params: agent_client_protocol::PromptArguments, - cx: &mut App, - ) -> Task> { - let client = self.client.client(); - let sessions = self.sessions.clone(); - - cx.foreground_executor().spawn(async move { - let client = client.context("MCP server is not initialized yet")?; - - let (new_cancel_tx, cancel_rx) = oneshot::channel(); - { - let mut sessions = sessions.borrow_mut(); - let session = sessions - .get_mut(¶ms.session_id) - .context("Session not found")?; - session.cancel_tx.replace(new_cancel_tx); - } - - let result = client - .request_with::( - context_server::types::CallToolParams { - name: acp::AGENT_METHODS.prompt.into(), - arguments: Some(serde_json::to_value(params)?), - meta: None, - }, - Some(cancel_rx), - None, - ) - .await; - - if let Err(err) = &result - && err.is::() - { - return Ok(()); - } - - let response = result?; - - if response.is_error.unwrap_or_default() { - return Err(anyhow!(response.text_contents())); - } - - Ok(()) - }) - } - - fn cancel(&self, session_id: &agent_client_protocol::SessionId, _cx: &mut App) { - let mut sessions = self.sessions.borrow_mut(); - - if let Some(cancel_tx) = sessions - .get_mut(session_id) - .and_then(|session| session.cancel_tx.take()) - { - cancel_tx.send(()).ok(); - } - } -} - -impl CodexConnection { - pub fn handle_session_notification( - notification: acp::SessionNotification, - threads: Rc>>, - cx: &mut AsyncApp, - ) { - let threads = threads.borrow(); - let Some(thread) = threads - .get(¬ification.session_id) - .and_then(|session| session.thread.upgrade()) - else { - log::error!( - "Thread not found for session ID: {}", - notification.session_id - ); - return; - }; - - thread - .update(cx, |thread, cx| { - thread.handle_session_update(notification.update, cx) - }) - .log_err(); - } -} - -impl Drop for CodexConnection { - fn drop(&mut self) { - self.client.stop().log_err(); - } -} - #[cfg(test)] pub(crate) mod tests { use super::*; diff --git a/crates/agent_servers/src/gemini.rs b/crates/agent_servers/src/gemini.rs index 8b9fed5777..70c7f8efb5 100644 --- a/crates/agent_servers/src/gemini.rs +++ b/crates/agent_servers/src/gemini.rs @@ -1,25 +1,18 @@ -use anyhow::anyhow; -use std::cell::RefCell; -use std::path::Path; -use std::rc::Rc; -use util::ResultExt as _; - -use crate::{AgentServer, AgentServerCommand, AgentServerVersion}; -use acp_thread::{AgentConnection, LoadError, OldAcpAgentConnection, OldAcpClientDelegate}; -use agentic_coding_protocol as acp_old; -use anyhow::{Context as _, Result}; -use gpui::{AppContext as _, AsyncApp, Entity, Task, WeakEntity}; use project::Project; use settings::SettingsStore; -use ui::App; +use std::path::Path; +use std::rc::Rc; -use crate::AllAgentServersSettings; +use anyhow::Result; +use gpui::{App, Entity, Task}; + +use crate::acp_connection::AcpConnection; +use crate::{AgentServer, AgentServerCommand, AllAgentServersSettings}; +use acp_thread::AgentConnection; #[derive(Clone)] pub struct Gemini; -const ACP_ARG: &str = "--experimental-acp"; - impl AgentServer for Gemini { fn name(&self) -> &'static str { "Gemini" @@ -39,166 +32,49 @@ impl AgentServer for Gemini { fn connect( &self, - root_dir: &Path, + _root_dir: &Path, project: &Entity, cx: &mut App, ) -> Task>> { - let root_dir = root_dir.to_path_buf(); let project = project.clone(); - let this = self.clone(); - let name = self.name(); - + let server_name = self.name(); cx.spawn(async move |cx| { - let command = this.command(&project, cx).await?; + let settings = cx.read_global(|settings: &SettingsStore, _| { + settings.get::(None).gemini.clone() + })?; - let mut child = util::command::new_smol_command(&command.path) - .args(command.args.iter()) - .current_dir(root_dir) - .stdin(std::process::Stdio::piped()) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::inherit()) - .kill_on_drop(true) - .spawn()?; + let Some(command) = AgentServerCommand::resolve( + "gemini", + &["--experimental-mcp"], + settings, + &project, + cx, + ) + .await + else { + anyhow::bail!("Failed to find gemini binary"); + }; - let stdin = child.stdin.take().unwrap(); - let stdout = child.stdout.take().unwrap(); - - let foreground_executor = cx.foreground_executor().clone(); - - let thread_rc = Rc::new(RefCell::new(WeakEntity::new_invalid())); - - let (connection, io_fut) = acp_old::AgentConnection::connect_to_agent( - OldAcpClientDelegate::new(thread_rc.clone(), cx.clone()), - stdin, - stdout, - move |fut| foreground_executor.spawn(fut).detach(), - ); - - let io_task = cx.background_spawn(async move { - io_fut.await.log_err(); - }); - - let child_status = cx.background_spawn(async move { - let result = match child.status().await { - Err(e) => Err(anyhow!(e)), - Ok(result) if result.success() => Ok(()), - Ok(result) => { - if let Some(AgentServerVersion::Unsupported { - error_message, - upgrade_message, - upgrade_command, - }) = this.version(&command).await.log_err() - { - Err(anyhow!(LoadError::Unsupported { - error_message, - upgrade_message, - upgrade_command - })) - } else { - Err(anyhow!(LoadError::Exited(result.code().unwrap_or(-127)))) - } - } - }; - drop(io_task); - result - }); - - let connection: Rc = Rc::new(OldAcpAgentConnection { - name, - connection, - child_status, - }); - - Ok(connection) + let conn = AcpConnection::stdio(server_name, command, cx).await?; + Ok(Rc::new(conn) as _) }) } } -impl Gemini { - async fn command( - &self, - project: &Entity, - cx: &mut AsyncApp, - ) -> Result { - let settings = cx.read_global(|settings: &SettingsStore, _| { - settings.get::(None).gemini.clone() - })?; - - if let Some(command) = - AgentServerCommand::resolve("gemini", &[ACP_ARG], settings, &project, cx).await - { - return Ok(command); - }; - - let (fs, node_runtime) = project.update(cx, |project, _| { - (project.fs().clone(), project.node_runtime().cloned()) - })?; - let node_runtime = node_runtime.context("gemini not found on path")?; - - let directory = ::paths::agent_servers_dir().join("gemini"); - fs.create_dir(&directory).await?; - node_runtime - .npm_install_packages(&directory, &[("@google/gemini-cli", "latest")]) - .await?; - let path = directory.join("node_modules/.bin/gemini"); - - Ok(AgentServerCommand { - path, - args: vec![ACP_ARG.into()], - env: None, - }) - } - - async fn version(&self, command: &AgentServerCommand) -> Result { - let version_fut = util::command::new_smol_command(&command.path) - .args(command.args.iter()) - .arg("--version") - .kill_on_drop(true) - .output(); - - let help_fut = util::command::new_smol_command(&command.path) - .args(command.args.iter()) - .arg("--help") - .kill_on_drop(true) - .output(); - - let (version_output, help_output) = futures::future::join(version_fut, help_fut).await; - - let current_version = String::from_utf8(version_output?.stdout)?; - let supported = String::from_utf8(help_output?.stdout)?.contains(ACP_ARG); - - if supported { - Ok(AgentServerVersion::Supported) - } else { - Ok(AgentServerVersion::Unsupported { - error_message: format!( - "Your installed version of Gemini {} doesn't support the Agentic Coding Protocol (ACP).", - current_version - ).into(), - upgrade_message: "Upgrade Gemini to Latest".into(), - upgrade_command: "npm install -g @google/gemini-cli@latest".into(), - }) - } - } -} - #[cfg(test)] pub(crate) mod tests { use super::*; use crate::AgentServerCommand; use std::path::Path; - crate::common_e2e_tests!(Gemini, allow_option_id = "0"); + crate::common_e2e_tests!(Gemini, allow_option_id = "allow"); pub fn local_command() -> AgentServerCommand { - let cli_path = Path::new(env!("CARGO_MANIFEST_DIR")) - .join("../../../gemini-cli/packages/cli") - .to_string_lossy() - .to_string(); + let cli_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("../../../gemini/packages/cli"); AgentServerCommand { path: "node".into(), - args: vec![cli_path, ACP_ARG.into()], + args: vec![cli_path.to_string_lossy().to_string()], env: None, } } From b48faddaf416f2597bbec9bf30823017cd1d1931 Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Mon, 28 Jul 2025 18:45:05 -0300 Subject: [PATCH 03/10] Restore gemini change --- crates/agent_servers/src/gemini.rs | 180 ++++++++++++++++++++++++----- 1 file changed, 152 insertions(+), 28 deletions(-) diff --git a/crates/agent_servers/src/gemini.rs b/crates/agent_servers/src/gemini.rs index 70c7f8efb5..8b9fed5777 100644 --- a/crates/agent_servers/src/gemini.rs +++ b/crates/agent_servers/src/gemini.rs @@ -1,18 +1,25 @@ -use project::Project; -use settings::SettingsStore; +use anyhow::anyhow; +use std::cell::RefCell; use std::path::Path; use std::rc::Rc; +use util::ResultExt as _; -use anyhow::Result; -use gpui::{App, Entity, Task}; +use crate::{AgentServer, AgentServerCommand, AgentServerVersion}; +use acp_thread::{AgentConnection, LoadError, OldAcpAgentConnection, OldAcpClientDelegate}; +use agentic_coding_protocol as acp_old; +use anyhow::{Context as _, Result}; +use gpui::{AppContext as _, AsyncApp, Entity, Task, WeakEntity}; +use project::Project; +use settings::SettingsStore; +use ui::App; -use crate::acp_connection::AcpConnection; -use crate::{AgentServer, AgentServerCommand, AllAgentServersSettings}; -use acp_thread::AgentConnection; +use crate::AllAgentServersSettings; #[derive(Clone)] pub struct Gemini; +const ACP_ARG: &str = "--experimental-acp"; + impl AgentServer for Gemini { fn name(&self) -> &'static str { "Gemini" @@ -32,49 +39,166 @@ impl AgentServer for Gemini { fn connect( &self, - _root_dir: &Path, + root_dir: &Path, project: &Entity, cx: &mut App, ) -> Task>> { + let root_dir = root_dir.to_path_buf(); let project = project.clone(); - let server_name = self.name(); + let this = self.clone(); + let name = self.name(); + cx.spawn(async move |cx| { - let settings = cx.read_global(|settings: &SettingsStore, _| { - settings.get::(None).gemini.clone() - })?; + let command = this.command(&project, cx).await?; - let Some(command) = AgentServerCommand::resolve( - "gemini", - &["--experimental-mcp"], - settings, - &project, - cx, - ) - .await - else { - anyhow::bail!("Failed to find gemini binary"); - }; + let mut child = util::command::new_smol_command(&command.path) + .args(command.args.iter()) + .current_dir(root_dir) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::inherit()) + .kill_on_drop(true) + .spawn()?; - let conn = AcpConnection::stdio(server_name, command, cx).await?; - Ok(Rc::new(conn) as _) + let stdin = child.stdin.take().unwrap(); + let stdout = child.stdout.take().unwrap(); + + let foreground_executor = cx.foreground_executor().clone(); + + let thread_rc = Rc::new(RefCell::new(WeakEntity::new_invalid())); + + let (connection, io_fut) = acp_old::AgentConnection::connect_to_agent( + OldAcpClientDelegate::new(thread_rc.clone(), cx.clone()), + stdin, + stdout, + move |fut| foreground_executor.spawn(fut).detach(), + ); + + let io_task = cx.background_spawn(async move { + io_fut.await.log_err(); + }); + + let child_status = cx.background_spawn(async move { + let result = match child.status().await { + Err(e) => Err(anyhow!(e)), + Ok(result) if result.success() => Ok(()), + Ok(result) => { + if let Some(AgentServerVersion::Unsupported { + error_message, + upgrade_message, + upgrade_command, + }) = this.version(&command).await.log_err() + { + Err(anyhow!(LoadError::Unsupported { + error_message, + upgrade_message, + upgrade_command + })) + } else { + Err(anyhow!(LoadError::Exited(result.code().unwrap_or(-127)))) + } + } + }; + drop(io_task); + result + }); + + let connection: Rc = Rc::new(OldAcpAgentConnection { + name, + connection, + child_status, + }); + + Ok(connection) }) } } +impl Gemini { + async fn command( + &self, + project: &Entity, + cx: &mut AsyncApp, + ) -> Result { + let settings = cx.read_global(|settings: &SettingsStore, _| { + settings.get::(None).gemini.clone() + })?; + + if let Some(command) = + AgentServerCommand::resolve("gemini", &[ACP_ARG], settings, &project, cx).await + { + return Ok(command); + }; + + let (fs, node_runtime) = project.update(cx, |project, _| { + (project.fs().clone(), project.node_runtime().cloned()) + })?; + let node_runtime = node_runtime.context("gemini not found on path")?; + + let directory = ::paths::agent_servers_dir().join("gemini"); + fs.create_dir(&directory).await?; + node_runtime + .npm_install_packages(&directory, &[("@google/gemini-cli", "latest")]) + .await?; + let path = directory.join("node_modules/.bin/gemini"); + + Ok(AgentServerCommand { + path, + args: vec![ACP_ARG.into()], + env: None, + }) + } + + async fn version(&self, command: &AgentServerCommand) -> Result { + let version_fut = util::command::new_smol_command(&command.path) + .args(command.args.iter()) + .arg("--version") + .kill_on_drop(true) + .output(); + + let help_fut = util::command::new_smol_command(&command.path) + .args(command.args.iter()) + .arg("--help") + .kill_on_drop(true) + .output(); + + let (version_output, help_output) = futures::future::join(version_fut, help_fut).await; + + let current_version = String::from_utf8(version_output?.stdout)?; + let supported = String::from_utf8(help_output?.stdout)?.contains(ACP_ARG); + + if supported { + Ok(AgentServerVersion::Supported) + } else { + Ok(AgentServerVersion::Unsupported { + error_message: format!( + "Your installed version of Gemini {} doesn't support the Agentic Coding Protocol (ACP).", + current_version + ).into(), + upgrade_message: "Upgrade Gemini to Latest".into(), + upgrade_command: "npm install -g @google/gemini-cli@latest".into(), + }) + } + } +} + #[cfg(test)] pub(crate) mod tests { use super::*; use crate::AgentServerCommand; use std::path::Path; - crate::common_e2e_tests!(Gemini, allow_option_id = "allow"); + crate::common_e2e_tests!(Gemini, allow_option_id = "0"); pub fn local_command() -> AgentServerCommand { - let cli_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("../../../gemini/packages/cli"); + let cli_path = Path::new(env!("CARGO_MANIFEST_DIR")) + .join("../../../gemini-cli/packages/cli") + .to_string_lossy() + .to_string(); AgentServerCommand { path: "node".into(), - args: vec![cli_path.to_string_lossy().to_string()], + args: vec![cli_path, ACP_ARG.into()], env: None, } } From 912ab505b2677a90de74c73d6258cd4f709aee2d Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Mon, 28 Jul 2025 20:04:32 -0300 Subject: [PATCH 04/10] Connect to gemini over MCP --- crates/agent_servers/src/codex.rs | 1 + crates/agent_servers/src/gemini.rs | 159 ++++------------------------- 2 files changed, 19 insertions(+), 141 deletions(-) diff --git a/crates/agent_servers/src/codex.rs b/crates/agent_servers/src/codex.rs index 1909781efa..06d8d10a91 100644 --- a/crates/agent_servers/src/codex.rs +++ b/crates/agent_servers/src/codex.rs @@ -48,6 +48,7 @@ impl AgentServer for Codex { else { anyhow::bail!("Failed to find codex binary"); }; + // todo! check supported version let conn = AcpConnection::stdio(server_name, command, cx).await?; Ok(Rc::new(conn) as _) diff --git a/crates/agent_servers/src/gemini.rs b/crates/agent_servers/src/gemini.rs index 8b9fed5777..07c4e1b539 100644 --- a/crates/agent_servers/src/gemini.rs +++ b/crates/agent_servers/src/gemini.rs @@ -1,14 +1,10 @@ -use anyhow::anyhow; -use std::cell::RefCell; use std::path::Path; use std::rc::Rc; -use util::ResultExt as _; -use crate::{AgentServer, AgentServerCommand, AgentServerVersion}; -use acp_thread::{AgentConnection, LoadError, OldAcpAgentConnection, OldAcpClientDelegate}; -use agentic_coding_protocol as acp_old; -use anyhow::{Context as _, Result}; -use gpui::{AppContext as _, AsyncApp, Entity, Task, WeakEntity}; +use crate::{AgentServer, AgentServerCommand, acp_connection::AcpConnection}; +use acp_thread::AgentConnection; +use anyhow::Result; +use gpui::{Entity, Task}; use project::Project; use settings::SettingsStore; use ui::App; @@ -39,149 +35,30 @@ impl AgentServer for Gemini { fn connect( &self, - root_dir: &Path, + _root_dir: &Path, project: &Entity, cx: &mut App, ) -> Task>> { - let root_dir = root_dir.to_path_buf(); let project = project.clone(); - let this = self.clone(); - let name = self.name(); - + let server_name = self.name(); cx.spawn(async move |cx| { - let command = this.command(&project, cx).await?; + let settings = cx.read_global(|settings: &SettingsStore, _| { + settings.get::(None).gemini.clone() + })?; - let mut child = util::command::new_smol_command(&command.path) - .args(command.args.iter()) - .current_dir(root_dir) - .stdin(std::process::Stdio::piped()) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::inherit()) - .kill_on_drop(true) - .spawn()?; + let Some(command) = + AgentServerCommand::resolve("gemini", &[ACP_ARG], settings, &project, cx).await + else { + anyhow::bail!("Failed to find gemini binary"); + }; + // todo! check supported version - let stdin = child.stdin.take().unwrap(); - let stdout = child.stdout.take().unwrap(); - - let foreground_executor = cx.foreground_executor().clone(); - - let thread_rc = Rc::new(RefCell::new(WeakEntity::new_invalid())); - - let (connection, io_fut) = acp_old::AgentConnection::connect_to_agent( - OldAcpClientDelegate::new(thread_rc.clone(), cx.clone()), - stdin, - stdout, - move |fut| foreground_executor.spawn(fut).detach(), - ); - - let io_task = cx.background_spawn(async move { - io_fut.await.log_err(); - }); - - let child_status = cx.background_spawn(async move { - let result = match child.status().await { - Err(e) => Err(anyhow!(e)), - Ok(result) if result.success() => Ok(()), - Ok(result) => { - if let Some(AgentServerVersion::Unsupported { - error_message, - upgrade_message, - upgrade_command, - }) = this.version(&command).await.log_err() - { - Err(anyhow!(LoadError::Unsupported { - error_message, - upgrade_message, - upgrade_command - })) - } else { - Err(anyhow!(LoadError::Exited(result.code().unwrap_or(-127)))) - } - } - }; - drop(io_task); - result - }); - - let connection: Rc = Rc::new(OldAcpAgentConnection { - name, - connection, - child_status, - }); - - Ok(connection) + let conn = AcpConnection::stdio(server_name, command, cx).await?; + Ok(Rc::new(conn) as _) }) } } -impl Gemini { - async fn command( - &self, - project: &Entity, - cx: &mut AsyncApp, - ) -> Result { - let settings = cx.read_global(|settings: &SettingsStore, _| { - settings.get::(None).gemini.clone() - })?; - - if let Some(command) = - AgentServerCommand::resolve("gemini", &[ACP_ARG], settings, &project, cx).await - { - return Ok(command); - }; - - let (fs, node_runtime) = project.update(cx, |project, _| { - (project.fs().clone(), project.node_runtime().cloned()) - })?; - let node_runtime = node_runtime.context("gemini not found on path")?; - - let directory = ::paths::agent_servers_dir().join("gemini"); - fs.create_dir(&directory).await?; - node_runtime - .npm_install_packages(&directory, &[("@google/gemini-cli", "latest")]) - .await?; - let path = directory.join("node_modules/.bin/gemini"); - - Ok(AgentServerCommand { - path, - args: vec![ACP_ARG.into()], - env: None, - }) - } - - async fn version(&self, command: &AgentServerCommand) -> Result { - let version_fut = util::command::new_smol_command(&command.path) - .args(command.args.iter()) - .arg("--version") - .kill_on_drop(true) - .output(); - - let help_fut = util::command::new_smol_command(&command.path) - .args(command.args.iter()) - .arg("--help") - .kill_on_drop(true) - .output(); - - let (version_output, help_output) = futures::future::join(version_fut, help_fut).await; - - let current_version = String::from_utf8(version_output?.stdout)?; - let supported = String::from_utf8(help_output?.stdout)?.contains(ACP_ARG); - - if supported { - Ok(AgentServerVersion::Supported) - } else { - Ok(AgentServerVersion::Unsupported { - error_message: format!( - "Your installed version of Gemini {} doesn't support the Agentic Coding Protocol (ACP).", - current_version - ).into(), - upgrade_message: "Upgrade Gemini to Latest".into(), - upgrade_command: "npm install -g @google/gemini-cli@latest".into(), - }) - } - } -} - #[cfg(test)] pub(crate) mod tests { use super::*; @@ -198,7 +75,7 @@ pub(crate) mod tests { AgentServerCommand { path: "node".into(), - args: vec![cli_path, ACP_ARG.into()], + args: vec![cli_path], env: None, } } From 254c6be42b69048bc5e1689e5438d1880fa8034c Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Tue, 29 Jul 2025 10:12:57 +0200 Subject: [PATCH 05/10] Fix broken test --- crates/acp_thread/src/acp_thread.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/acp_thread/src/acp_thread.rs b/crates/acp_thread/src/acp_thread.rs index 1c4b0ec06f..841d320796 100644 --- a/crates/acp_thread/src/acp_thread.rs +++ b/crates/acp_thread/src/acp_thread.rs @@ -1601,6 +1601,7 @@ mod tests { }; AcpThread::new( + "test", Rc::new(connection), project, acp::SessionId("test".into()), From 6656403ce85602e159262765654269297c95e630 Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Tue, 29 Jul 2025 21:15:00 -0300 Subject: [PATCH 06/10] Auth WIP --- Cargo.lock | 2 - Cargo.toml | 2 +- crates/acp_thread/src/acp_thread.rs | 4 - crates/acp_thread/src/connection.rs | 8 +- crates/acp_thread/src/old_acp_support.rs | 15 +++- crates/agent_servers/src/acp_connection.rs | 100 ++++++++++++++++----- crates/agent_servers/src/claude.rs | 10 ++- crates/agent_ui/src/acp/thread_view.rs | 55 ++++++++---- 8 files changed, 140 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1682b80a8c..f68136d978 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -139,8 +139,6 @@ dependencies = [ [[package]] name = "agent-client-protocol" version = "0.0.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4255a06cc2414033d1fe4baf1968bcc8f16d7e5814f272b97779b5806d129142" dependencies = [ "schemars", "serde", diff --git a/Cargo.toml b/Cargo.toml index 81da82cbb7..d733f2242e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -413,7 +413,7 @@ zlog_settings = { path = "crates/zlog_settings" } # agentic-coding-protocol = "0.0.10" -agent-client-protocol = "0.0.13" +agent-client-protocol = {path="../agent-client-protocol"} aho-corasick = "1.1" alacritty_terminal = { git = "https://github.com/zed-industries/alacritty.git", branch = "add-hush-login-flag" } any_vec = "0.14" diff --git a/crates/acp_thread/src/acp_thread.rs b/crates/acp_thread/src/acp_thread.rs index 841d320796..3b9f0842bd 100644 --- a/crates/acp_thread/src/acp_thread.rs +++ b/crates/acp_thread/src/acp_thread.rs @@ -958,10 +958,6 @@ impl AcpThread { cx.notify(); } - pub fn authenticate(&self, cx: &mut App) -> impl use<> + Future> { - self.connection.authenticate(cx) - } - #[cfg(any(test, feature = "test-support"))] pub fn send_raw( &mut self, diff --git a/crates/acp_thread/src/connection.rs b/crates/acp_thread/src/connection.rs index 97161a19c0..2e7deaf7df 100644 --- a/crates/acp_thread/src/connection.rs +++ b/crates/acp_thread/src/connection.rs @@ -1,6 +1,6 @@ -use std::{path::Path, rc::Rc}; +use std::{cell::Ref, path::Path, rc::Rc}; -use agent_client_protocol as acp; +use agent_client_protocol::{self as acp}; use anyhow::Result; use gpui::{AsyncApp, Entity, Task}; use project::Project; @@ -16,7 +16,9 @@ pub trait AgentConnection { cx: &mut AsyncApp, ) -> Task>>; - fn authenticate(&self, cx: &mut App) -> Task>; + fn state(&self) -> Ref<'_, acp::AgentState>; + + fn authenticate(&self, method: acp::AuthMethodId, cx: &mut App) -> Task>; fn prompt(&self, params: acp::PromptArguments, cx: &mut App) -> Task>; diff --git a/crates/acp_thread/src/old_acp_support.rs b/crates/acp_thread/src/old_acp_support.rs index d7ef1b73da..4d06f81d06 100644 --- a/crates/acp_thread/src/old_acp_support.rs +++ b/crates/acp_thread/src/old_acp_support.rs @@ -5,7 +5,13 @@ use anyhow::{Context as _, Result}; use futures::channel::oneshot; use gpui::{AppContext as _, AsyncApp, Entity, Task, WeakEntity}; use project::Project; -use std::{cell::RefCell, error::Error, fmt, path::Path, rc::Rc}; +use std::{ + cell::{Ref, RefCell}, + error::Error, + fmt, + path::Path, + rc::Rc, +}; use ui::App; use crate::{AcpThread, AgentConnection}; @@ -364,6 +370,7 @@ pub struct OldAcpAgentConnection { pub name: &'static str, pub connection: acp_old::AgentConnection, pub child_status: Task>, + pub agent_state: Rc>, } impl AgentConnection for OldAcpAgentConnection { @@ -397,7 +404,11 @@ impl AgentConnection for OldAcpAgentConnection { }) } - fn authenticate(&self, cx: &mut App) -> Task> { + fn state(&self) -> Ref<'_, acp::AgentState> { + self.agent_state.borrow() + } + + fn authenticate(&self, _method_id: acp::AuthMethodId, cx: &mut App) -> Task> { let task = self .connection .request_any(acp_old::AuthenticateParams.into_any()); diff --git a/crates/agent_servers/src/acp_connection.rs b/crates/agent_servers/src/acp_connection.rs index 9139d62c38..96067fe520 100644 --- a/crates/agent_servers/src/acp_connection.rs +++ b/crates/agent_servers/src/acp_connection.rs @@ -7,10 +7,10 @@ use context_server::{ContextServer, ContextServerCommand, ContextServerId}; use futures::channel::{mpsc, oneshot}; use project::Project; use smol::stream::StreamExt as _; -use std::cell::RefCell; +use std::cell::{Ref, RefCell}; use std::rc::Rc; use std::{path::Path, sync::Arc}; -use util::ResultExt; +use util::{ResultExt, TryFutureExt}; use anyhow::{Context, Result}; use gpui::{App, AppContext as _, AsyncApp, Entity, Task, WeakEntity}; @@ -20,10 +20,12 @@ use crate::{AgentServerCommand, mcp_server}; use acp_thread::{AcpThread, AgentConnection}; pub struct AcpConnection { + agent_state: Rc>, server_name: &'static str, client: Arc, sessions: Rc>>, - _notification_handler_task: Task<()>, + _agent_state_task: Task<()>, + _session_update_task: Task<()>, } impl AcpConnection { @@ -43,29 +45,55 @@ impl AcpConnection { .into(); ContextServer::start(client.clone(), cx).await?; - let (notification_tx, mut notification_rx) = mpsc::unbounded(); - client - .client() - .context("Failed to subscribe")? - .on_notification(acp::AGENT_METHODS.session_update, { - move |notification, _cx| { - let notification_tx = notification_tx.clone(); - log::trace!( - "ACP Notification: {}", - serde_json::to_string_pretty(¬ification).unwrap() - ); + let (mut state_tx, mut state_rx) = watch::channel(acp::AgentState::default()); + let mcp_client = client.client().context("Failed to subscribe")?; - if let Some(notification) = - serde_json::from_value::(notification).log_err() - { - notification_tx.unbounded_send(notification).ok(); - } + mcp_client.on_notification(acp::AGENT_METHODS.agent_state, { + move |notification, _cx| { + log::trace!( + "ACP Notification: {}", + serde_json::to_string_pretty(¬ification).unwrap() + ); + + if let Some(state) = + serde_json::from_value::(notification).log_err() + { + state_tx.send(state).log_err(); } - }); + } + }); + + let (notification_tx, mut notification_rx) = mpsc::unbounded(); + mcp_client.on_notification(acp::AGENT_METHODS.session_update, { + move |notification, _cx| { + let notification_tx = notification_tx.clone(); + log::trace!( + "ACP Notification: {}", + serde_json::to_string_pretty(¬ification).unwrap() + ); + + if let Some(notification) = + serde_json::from_value::(notification).log_err() + { + notification_tx.unbounded_send(notification).ok(); + } + } + }); let sessions = Rc::new(RefCell::new(HashMap::default())); + let initial_state = state_rx.recv().await?; + let agent_state = Rc::new(RefCell::new(initial_state)); - let notification_handler_task = cx.spawn({ + let agent_state_task = cx.foreground_executor().spawn({ + let agent_state = agent_state.clone(); + async move { + while let Some(state) = state_rx.recv().log_err().await { + agent_state.replace(state); + } + } + }); + + let session_update_handler_task = cx.spawn({ let sessions = sessions.clone(); async move |cx| { while let Some(notification) = notification_rx.next().await { @@ -78,7 +106,9 @@ impl AcpConnection { server_name, client, sessions, - _notification_handler_task: notification_handler_task, + agent_state, + _agent_state_task: agent_state_task, + _session_update_task: session_update_handler_task, }) } @@ -185,8 +215,30 @@ impl AgentConnection for AcpConnection { }) } - fn authenticate(&self, _cx: &mut App) -> Task> { - Task::ready(Err(anyhow!("Authentication not supported"))) + fn state(&self) -> Ref<'_, acp::AgentState> { + self.agent_state.borrow() + } + + fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task> { + let client = self.client.client(); + cx.foreground_executor().spawn(async move { + let params = acp::AuthenticateArguments { method_id }; + + let response = client + .context("MCP server is not initialized yet")? + .request::(context_server::types::CallToolParams { + name: acp::AGENT_METHODS.authenticate.into(), + arguments: Some(serde_json::to_value(params)?), + meta: None, + }) + .await?; + + if response.is_error.unwrap_or_default() { + Err(anyhow!(response.text_contents())) + } else { + Ok(()) + } + }) } fn prompt( diff --git a/crates/agent_servers/src/claude.rs b/crates/agent_servers/src/claude.rs index 590da69cd8..0f49403a0b 100644 --- a/crates/agent_servers/src/claude.rs +++ b/crates/agent_servers/src/claude.rs @@ -6,7 +6,7 @@ use context_server::listener::McpServerTool; use project::Project; use settings::SettingsStore; use smol::process::Child; -use std::cell::RefCell; +use std::cell::{Ref, RefCell}; use std::fmt::Display; use std::path::Path; use std::rc::Rc; @@ -58,6 +58,7 @@ impl AgentServer for ClaudeCode { _cx: &mut App, ) -> Task>> { let connection = ClaudeAgentConnection { + agent_state: Default::default(), sessions: Default::default(), }; @@ -66,6 +67,7 @@ impl AgentServer for ClaudeCode { } struct ClaudeAgentConnection { + agent_state: Rc>, sessions: Rc>>, } @@ -183,7 +185,11 @@ impl AgentConnection for ClaudeAgentConnection { }) } - fn authenticate(&self, _cx: &mut App) -> Task> { + fn state(&self) -> Ref<'_, acp::AgentState> { + self.agent_state.borrow() + } + + fn authenticate(&self, _: acp::AuthMethodId, _cx: &mut App) -> Task> { Task::ready(Err(anyhow!("Authentication not supported"))) } diff --git a/crates/agent_ui/src/acp/thread_view.rs b/crates/agent_ui/src/acp/thread_view.rs index e46e1ae3ab..824748a0aa 100644 --- a/crates/agent_ui/src/acp/thread_view.rs +++ b/crates/agent_ui/src/acp/thread_view.rs @@ -216,6 +216,15 @@ impl AcpThreadView { } }; + if connection.state().needs_authentication { + this.update(cx, |this, cx| { + this.thread_state = ThreadState::Unauthenticated { connection }; + cx.notify(); + }) + .ok(); + return; + } + let result = match connection .clone() .new_thread(project.clone(), &root_dir, cx) @@ -223,6 +232,7 @@ impl AcpThreadView { { Err(e) => { let mut cx = cx.clone(); + // todo! remove duplication if e.downcast_ref::().is_some() { this.update(&mut cx, |this, cx| { this.thread_state = ThreadState::Unauthenticated { connection }; @@ -640,13 +650,18 @@ impl AcpThreadView { Some(entry.diffs().map(|diff| diff.multibuffer.clone())) } - fn authenticate(&mut self, window: &mut Window, cx: &mut Context) { + fn authenticate( + &mut self, + method: acp::AuthMethodId, + window: &mut Window, + cx: &mut Context, + ) { let ThreadState::Unauthenticated { ref connection } = self.thread_state else { return; }; self.last_error.take(); - let authenticate = connection.authenticate(cx); + let authenticate = connection.authenticate(method, cx); self.auth_task = Some(cx.spawn_in(window, { let project = self.project.clone(); let agent = self.agent.clone(); @@ -2197,22 +2212,26 @@ impl Render for AcpThreadView { .on_action(cx.listener(Self::next_history_message)) .on_action(cx.listener(Self::open_agent_diff)) .child(match &self.thread_state { - ThreadState::Unauthenticated { .. } => { - v_flex() - .p_2() - .flex_1() - .items_center() - .justify_center() - .child(self.render_pending_auth_state()) - .child( - h_flex().mt_1p5().justify_center().child( - Button::new("sign-in", format!("Sign in to {}", self.agent.name())) - .on_click(cx.listener(|this, _, window, cx| { - this.authenticate(window, cx) - })), - ), - ) - } + ThreadState::Unauthenticated { connection } => v_flex() + .p_2() + .flex_1() + .items_center() + .justify_center() + .child(self.render_pending_auth_state()) + .child(h_flex().mt_1p5().justify_center().children( + connection.state().auth_methods.iter().map(|method| { + Button::new( + SharedString::from(method.id.0.clone()), + method.label.clone(), + ) + .on_click({ + let method_id = method.id.clone(); + cx.listener(move |this, _, window, cx| { + this.authenticate(method_id.clone(), window, cx) + }) + }) + }), + )), ThreadState::Loading { .. } => v_flex().flex_1().child(self.render_empty_state(cx)), ThreadState::LoadError(e) => v_flex() .p_2() From 81c111510f43241cef933567fc2905051dfc5fa3 Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Wed, 30 Jul 2025 15:48:40 +0200 Subject: [PATCH 07/10] Refactor handling of ContextServer notifications The notification handler registration is now more explicit, with handlers set up before server initialization to avoid potential race conditions. --- crates/agent_servers/src/acp_connection.rs | 85 +++++++++++---------- crates/context_server/src/client.rs | 14 ++-- crates/context_server/src/context_server.rs | 27 ++++++- crates/context_server/src/protocol.rs | 9 ++- 4 files changed, 79 insertions(+), 56 deletions(-) diff --git a/crates/agent_servers/src/acp_connection.rs b/crates/agent_servers/src/acp_connection.rs index 95c09e2c52..5883f6ac45 100644 --- a/crates/agent_servers/src/acp_connection.rs +++ b/crates/agent_servers/src/acp_connection.rs @@ -22,7 +22,7 @@ use acp_thread::{AcpThread, AgentConnection}; pub struct AcpConnection { agent_state: Rc>, server_name: &'static str, - client: Arc, + context_server: Arc, sessions: Rc>>, _agent_state_task: Task<()>, _session_update_task: Task<()>, @@ -35,7 +35,7 @@ impl AcpConnection { working_directory: Option>, cx: &mut AsyncApp, ) -> Result { - let client: Arc = ContextServer::stdio( + let context_server: Arc = ContextServer::stdio( ContextServerId(format!("{}-mcp-server", server_name).into()), ContextServerCommand { path: command.path, @@ -45,42 +45,9 @@ impl AcpConnection { working_directory, ) .into(); - ContextServer::start(client.clone(), cx).await?; let (mut state_tx, mut state_rx) = watch::channel(acp::AgentState::default()); - let mcp_client = client.client().context("Failed to subscribe")?; - - mcp_client.on_notification(acp::AGENT_METHODS.agent_state, { - move |notification, _cx| { - log::trace!( - "ACP Notification: {}", - serde_json::to_string_pretty(¬ification).unwrap() - ); - - if let Some(state) = - serde_json::from_value::(notification).log_err() - { - state_tx.send(state).log_err(); - } - } - }); - let (notification_tx, mut notification_rx) = mpsc::unbounded(); - mcp_client.on_notification(acp::AGENT_METHODS.session_update, { - move |notification, _cx| { - let notification_tx = notification_tx.clone(); - log::trace!( - "ACP Notification: {}", - serde_json::to_string_pretty(¬ification).unwrap() - ); - - if let Some(notification) = - serde_json::from_value::(notification).log_err() - { - notification_tx.unbounded_send(notification).ok(); - } - } - }); let sessions = Rc::new(RefCell::new(HashMap::default())); let initial_state = state_rx.recv().await?; @@ -104,9 +71,47 @@ impl AcpConnection { } }); + context_server + .start_with_handlers( + vec![ + (acp::AGENT_METHODS.agent_state, { + Box::new(move |notification, _cx| { + log::trace!( + "ACP Notification: {}", + serde_json::to_string_pretty(¬ification).unwrap() + ); + + if let Some(state) = + serde_json::from_value::(notification).log_err() + { + state_tx.send(state).log_err(); + } + }) + }), + (acp::AGENT_METHODS.session_update, { + Box::new(move |notification, _cx| { + let notification_tx = notification_tx.clone(); + log::trace!( + "ACP Notification: {}", + serde_json::to_string_pretty(¬ification).unwrap() + ); + + if let Some(notification) = + serde_json::from_value::(notification) + .log_err() + { + notification_tx.unbounded_send(notification).ok(); + } + }) + }), + ], + cx, + ) + .await?; + Ok(Self { server_name, - client, + context_server, sessions, agent_state, _agent_state_task: agent_state_task, @@ -152,7 +157,7 @@ impl AgentConnection for AcpConnection { cwd: &Path, cx: &mut AsyncApp, ) -> Task>> { - let client = self.client.client(); + let client = self.context_server.client(); let sessions = self.sessions.clone(); let cwd = cwd.to_path_buf(); cx.spawn(async move |cx| { @@ -222,7 +227,7 @@ impl AgentConnection for AcpConnection { } fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task> { - let client = self.client.client(); + let client = self.context_server.client(); cx.foreground_executor().spawn(async move { let params = acp::AuthenticateArguments { method_id }; @@ -248,7 +253,7 @@ impl AgentConnection for AcpConnection { params: agent_client_protocol::PromptArguments, cx: &mut App, ) -> Task> { - let client = self.client.client(); + let client = self.context_server.client(); let sessions = self.sessions.clone(); cx.foreground_executor().spawn(async move { @@ -305,6 +310,6 @@ impl AgentConnection for AcpConnection { impl Drop for AcpConnection { fn drop(&mut self) { - self.client.stop().log_err(); + self.context_server.stop().log_err(); } } diff --git a/crates/context_server/src/client.rs b/crates/context_server/src/client.rs index 1eb29bbbf9..65283afa87 100644 --- a/crates/context_server/src/client.rs +++ b/crates/context_server/src/client.rs @@ -441,14 +441,12 @@ impl Client { Ok(()) } - #[allow(unused)] - pub fn on_notification(&self, method: &'static str, f: F) - where - F: 'static + Send + FnMut(Value, AsyncApp), - { - self.notification_handlers - .lock() - .insert(method, Box::new(f)); + pub fn on_notification( + &self, + method: &'static str, + f: Box, + ) { + self.notification_handlers.lock().insert(method, f); } } diff --git a/crates/context_server/src/context_server.rs b/crates/context_server/src/context_server.rs index e76e7972f7..34fa29678d 100644 --- a/crates/context_server/src/context_server.rs +++ b/crates/context_server/src/context_server.rs @@ -95,8 +95,28 @@ impl ContextServer { self.client.read().clone() } - pub async fn start(self: Arc, cx: &AsyncApp) -> Result<()> { - let client = match &self.configuration { + pub async fn start(&self, cx: &AsyncApp) -> Result<()> { + self.initialize(self.new_client(cx)?).await + } + + /// Starts the context server, making sure handlers are registered before initialization happens + pub async fn start_with_handlers( + &self, + notification_handlers: Vec<( + &'static str, + Box, + )>, + cx: &AsyncApp, + ) -> Result<()> { + let client = self.new_client(cx)?; + for (method, handler) in notification_handlers { + client.on_notification(method, handler); + } + self.initialize(client).await + } + + fn new_client(&self, cx: &AsyncApp) -> Result { + Ok(match &self.configuration { ContextServerTransport::Stdio(command, working_directory) => Client::stdio( client::ContextServerId(self.id.0.clone()), client::ModelContextServerBinary { @@ -113,8 +133,7 @@ impl ContextServer { transport.clone(), cx.clone(), )?, - }; - self.initialize(client).await + }) } async fn initialize(&self, client: Client) -> Result<()> { diff --git a/crates/context_server/src/protocol.rs b/crates/context_server/src/protocol.rs index 9ccbc8a553..5355f20f62 100644 --- a/crates/context_server/src/protocol.rs +++ b/crates/context_server/src/protocol.rs @@ -115,10 +115,11 @@ impl InitializedContextServerProtocol { self.inner.notify(T::METHOD, params) } - pub fn on_notification(&self, method: &'static str, f: F) - where - F: 'static + Send + FnMut(Value, AsyncApp), - { + pub fn on_notification( + &self, + method: &'static str, + f: Box, + ) { self.inner.on_notification(method, f); } } From 738296345eaa880ebda8adf247e9974c7a20c880 Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Wed, 30 Jul 2025 11:46:11 -0300 Subject: [PATCH 08/10] Inline tool schemas --- crates/context_server/src/listener.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/context_server/src/listener.rs b/crates/context_server/src/listener.rs index 34e3a9a78c..0e85fb2129 100644 --- a/crates/context_server/src/listener.rs +++ b/crates/context_server/src/listener.rs @@ -83,14 +83,18 @@ impl McpServer { } pub fn add_tool(&mut self, tool: T) { - let output_schema = schemars::schema_for!(T::Output); - let unit_schema = schemars::schema_for!(()); + let mut settings = schemars::generate::SchemaSettings::draft07(); + settings.inline_subschemas = true; + let mut generator = settings.into_generator(); + + let output_schema = generator.root_schema_for::(); + let unit_schema = generator.root_schema_for::(); let registered_tool = RegisteredTool { tool: Tool { name: T::NAME.into(), description: Some(tool.description().into()), - input_schema: schemars::schema_for!(T::Input).into(), + input_schema: generator.root_schema_for::().into(), output_schema: if output_schema == unit_schema { None } else { From 27708143ecac70b963bbe70153d426fe5f061277 Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Wed, 30 Jul 2025 13:30:50 -0300 Subject: [PATCH 09/10] Fix auth --- crates/acp_thread/src/acp_thread.rs | 1 - crates/acp_thread/src/connection.rs | 14 +++++- crates/acp_thread/src/old_acp_support.rs | 31 ++++--------- crates/agent_servers/src/acp_connection.rs | 54 +++++++--------------- crates/agent_servers/src/claude.rs | 8 ++-- crates/agent_ui/src/acp/thread_view.rs | 28 ++++------- 6 files changed, 48 insertions(+), 88 deletions(-) diff --git a/crates/acp_thread/src/acp_thread.rs b/crates/acp_thread/src/acp_thread.rs index 8aa07da330..bc2f8c756a 100644 --- a/crates/acp_thread/src/acp_thread.rs +++ b/crates/acp_thread/src/acp_thread.rs @@ -1595,7 +1595,6 @@ mod tests { connection, child_status: io_task, current_thread: thread_rc, - agent_state: Default::default(), }; AcpThread::new( diff --git a/crates/acp_thread/src/connection.rs b/crates/acp_thread/src/connection.rs index 2e7deaf7df..11f1fcc94c 100644 --- a/crates/acp_thread/src/connection.rs +++ b/crates/acp_thread/src/connection.rs @@ -1,4 +1,4 @@ -use std::{cell::Ref, path::Path, rc::Rc}; +use std::{error::Error, fmt, path::Path, rc::Rc}; use agent_client_protocol::{self as acp}; use anyhow::Result; @@ -16,7 +16,7 @@ pub trait AgentConnection { cx: &mut AsyncApp, ) -> Task>>; - fn state(&self) -> Ref<'_, acp::AgentState>; + fn auth_methods(&self) -> Vec; fn authenticate(&self, method: acp::AuthMethodId, cx: &mut App) -> Task>; @@ -24,3 +24,13 @@ pub trait AgentConnection { fn cancel(&self, session_id: &acp::SessionId, cx: &mut App); } + +#[derive(Debug)] +pub struct AuthRequired; + +impl Error for AuthRequired {} +impl fmt::Display for AuthRequired { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "AuthRequired") + } +} diff --git a/crates/acp_thread/src/old_acp_support.rs b/crates/acp_thread/src/old_acp_support.rs index 718ad0da03..88313e0fd5 100644 --- a/crates/acp_thread/src/old_acp_support.rs +++ b/crates/acp_thread/src/old_acp_support.rs @@ -5,17 +5,11 @@ use anyhow::{Context as _, Result}; use futures::channel::oneshot; use gpui::{AppContext as _, AsyncApp, Entity, Task, WeakEntity}; use project::Project; -use std::{ - cell::{Ref, RefCell}, - error::Error, - fmt, - path::Path, - rc::Rc, -}; +use std::{cell::RefCell, path::Path, rc::Rc}; use ui::App; use util::ResultExt as _; -use crate::{AcpThread, AgentConnection}; +use crate::{AcpThread, AgentConnection, AuthRequired}; #[derive(Clone)] pub struct OldAcpClientDelegate { @@ -357,21 +351,10 @@ fn into_new_plan_status(status: acp_old::PlanEntryStatus) -> acp::PlanEntryStatu } } -#[derive(Debug)] -pub struct Unauthenticated; - -impl Error for Unauthenticated {} -impl fmt::Display for Unauthenticated { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Unauthenticated") - } -} - pub struct OldAcpAgentConnection { pub name: &'static str, pub connection: acp_old::AgentConnection, pub child_status: Task>, - pub agent_state: Rc>, pub current_thread: Rc>>, } @@ -394,7 +377,7 @@ impl AgentConnection for OldAcpAgentConnection { let result = acp_old::InitializeParams::response_from_any(result)?; if !result.is_authenticated { - anyhow::bail!(Unauthenticated) + anyhow::bail!(AuthRequired) } cx.update(|cx| { @@ -408,8 +391,12 @@ impl AgentConnection for OldAcpAgentConnection { }) } - fn state(&self) -> Ref<'_, acp::AgentState> { - self.agent_state.borrow() + fn auth_methods(&self) -> Vec { + vec![acp::AuthMethod { + id: acp::AuthMethodId("acp-old-no-id".into()), + label: "Log in".into(), + description: None, + }] } fn authenticate(&self, _method_id: acp::AuthMethodId, cx: &mut App) -> Task> { diff --git a/crates/agent_servers/src/acp_connection.rs b/crates/agent_servers/src/acp_connection.rs index 95c09e2c52..c19a145196 100644 --- a/crates/agent_servers/src/acp_connection.rs +++ b/crates/agent_servers/src/acp_connection.rs @@ -7,24 +7,23 @@ use context_server::{ContextServer, ContextServerCommand, ContextServerId}; use futures::channel::{mpsc, oneshot}; use project::Project; use smol::stream::StreamExt as _; -use std::cell::{Ref, RefCell}; +use std::cell::RefCell; use std::rc::Rc; use std::{path::Path, sync::Arc}; -use util::{ResultExt, TryFutureExt}; +use util::ResultExt; use anyhow::{Context, Result}; use gpui::{App, AppContext as _, AsyncApp, Entity, Task, WeakEntity}; use crate::mcp_server::ZedMcpServer; use crate::{AgentServerCommand, mcp_server}; -use acp_thread::{AcpThread, AgentConnection}; +use acp_thread::{AcpThread, AgentConnection, AuthRequired}; pub struct AcpConnection { - agent_state: Rc>, + auth_methods: Rc>>, server_name: &'static str, client: Arc, sessions: Rc>>, - _agent_state_task: Task<()>, _session_update_task: Task<()>, } @@ -47,24 +46,8 @@ impl AcpConnection { .into(); ContextServer::start(client.clone(), cx).await?; - let (mut state_tx, mut state_rx) = watch::channel(acp::AgentState::default()); let mcp_client = client.client().context("Failed to subscribe")?; - mcp_client.on_notification(acp::AGENT_METHODS.agent_state, { - move |notification, _cx| { - log::trace!( - "ACP Notification: {}", - serde_json::to_string_pretty(¬ification).unwrap() - ); - - if let Some(state) = - serde_json::from_value::(notification).log_err() - { - state_tx.send(state).log_err(); - } - } - }); - let (notification_tx, mut notification_rx) = mpsc::unbounded(); mcp_client.on_notification(acp::AGENT_METHODS.session_update, { move |notification, _cx| { @@ -83,17 +66,6 @@ impl AcpConnection { }); let sessions = Rc::new(RefCell::new(HashMap::default())); - let initial_state = state_rx.recv().await?; - let agent_state = Rc::new(RefCell::new(initial_state)); - - let agent_state_task = cx.foreground_executor().spawn({ - let agent_state = agent_state.clone(); - async move { - while let Some(state) = state_rx.recv().log_err().await { - agent_state.replace(state); - } - } - }); let session_update_handler_task = cx.spawn({ let sessions = sessions.clone(); @@ -105,11 +77,10 @@ impl AcpConnection { }); Ok(Self { + auth_methods: Default::default(), server_name, client, sessions, - agent_state, - _agent_state_task: agent_state_task, _session_update_task: session_update_handler_task, }) } @@ -154,6 +125,7 @@ impl AgentConnection for AcpConnection { ) -> Task>> { let client = self.client.client(); let sessions = self.sessions.clone(); + let auth_methods = self.auth_methods.clone(); let cwd = cwd.to_path_buf(); cx.spawn(async move |cx| { let client = client.context("MCP server is not initialized yet")?; @@ -194,12 +166,18 @@ impl AgentConnection for AcpConnection { response.structured_content.context("Empty response")?, )?; + auth_methods.replace(result.auth_methods); + + let Some(session_id) = result.session_id else { + anyhow::bail!(AuthRequired); + }; + let thread = cx.new(|cx| { AcpThread::new( self.server_name, self.clone(), project, - result.session_id.clone(), + session_id.clone(), cx, ) })?; @@ -211,14 +189,14 @@ impl AgentConnection for AcpConnection { cancel_tx: None, _mcp_server: mcp_server, }; - sessions.borrow_mut().insert(result.session_id, session); + sessions.borrow_mut().insert(session_id, session); Ok(thread) }) } - fn state(&self) -> Ref<'_, acp::AgentState> { - self.agent_state.borrow() + fn auth_methods(&self) -> Vec { + self.auth_methods.borrow().clone() } fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task> { diff --git a/crates/agent_servers/src/claude.rs b/crates/agent_servers/src/claude.rs index 0f49403a0b..736fdd2726 100644 --- a/crates/agent_servers/src/claude.rs +++ b/crates/agent_servers/src/claude.rs @@ -6,7 +6,7 @@ use context_server::listener::McpServerTool; use project::Project; use settings::SettingsStore; use smol::process::Child; -use std::cell::{Ref, RefCell}; +use std::cell::RefCell; use std::fmt::Display; use std::path::Path; use std::rc::Rc; @@ -58,7 +58,6 @@ impl AgentServer for ClaudeCode { _cx: &mut App, ) -> Task>> { let connection = ClaudeAgentConnection { - agent_state: Default::default(), sessions: Default::default(), }; @@ -67,7 +66,6 @@ impl AgentServer for ClaudeCode { } struct ClaudeAgentConnection { - agent_state: Rc>, sessions: Rc>>, } @@ -185,8 +183,8 @@ impl AgentConnection for ClaudeAgentConnection { }) } - fn state(&self) -> Ref<'_, acp::AgentState> { - self.agent_state.borrow() + fn auth_methods(&self) -> Vec { + vec![] } fn authenticate(&self, _: acp::AuthMethodId, _cx: &mut App) -> Task> { diff --git a/crates/agent_ui/src/acp/thread_view.rs b/crates/agent_ui/src/acp/thread_view.rs index 824748a0aa..6d7684bbfc 100644 --- a/crates/agent_ui/src/acp/thread_view.rs +++ b/crates/agent_ui/src/acp/thread_view.rs @@ -216,15 +216,6 @@ impl AcpThreadView { } }; - if connection.state().needs_authentication { - this.update(cx, |this, cx| { - this.thread_state = ThreadState::Unauthenticated { connection }; - cx.notify(); - }) - .ok(); - return; - } - let result = match connection .clone() .new_thread(project.clone(), &root_dir, cx) @@ -233,7 +224,7 @@ impl AcpThreadView { Err(e) => { let mut cx = cx.clone(); // todo! remove duplication - if e.downcast_ref::().is_some() { + if e.downcast_ref::().is_some() { this.update(&mut cx, |this, cx| { this.thread_state = ThreadState::Unauthenticated { connection }; cx.notify(); @@ -2219,17 +2210,14 @@ impl Render for AcpThreadView { .justify_center() .child(self.render_pending_auth_state()) .child(h_flex().mt_1p5().justify_center().children( - connection.state().auth_methods.iter().map(|method| { - Button::new( - SharedString::from(method.id.0.clone()), - method.label.clone(), - ) - .on_click({ - let method_id = method.id.clone(); - cx.listener(move |this, _, window, cx| { - this.authenticate(method_id.clone(), window, cx) + connection.auth_methods().into_iter().map(|method| { + Button::new(SharedString::from(method.id.0.clone()), method.label) + .on_click({ + let method_id = method.id.clone(); + cx.listener(move |this, _, window, cx| { + this.authenticate(method_id.clone(), window, cx) + }) }) - }) }), )), ThreadState::Loading { .. } => v_flex().flex_1().child(self.render_empty_state(cx)), From 02d3043ec560a7e7bdc1df47efc70c4a2e4fb378 Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Wed, 30 Jul 2025 14:28:01 -0300 Subject: [PATCH 10/10] Rename arg to experimental-mcp --- crates/agent_servers/src/gemini.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/agent_servers/src/gemini.rs b/crates/agent_servers/src/gemini.rs index 9b7fde42bf..77e7d1063f 100644 --- a/crates/agent_servers/src/gemini.rs +++ b/crates/agent_servers/src/gemini.rs @@ -14,7 +14,7 @@ use crate::AllAgentServersSettings; #[derive(Clone)] pub struct Gemini; -const ACP_ARG: &str = "--experimental-acp"; +const MCP_ARG: &str = "--experimental-mcp"; impl AgentServer for Gemini { fn name(&self) -> &'static str { @@ -48,7 +48,7 @@ impl AgentServer for Gemini { })?; let Some(command) = - AgentServerCommand::resolve("gemini", &[ACP_ARG], settings, &project, cx).await + AgentServerCommand::resolve("gemini", &[MCP_ARG], settings, &project, cx).await else { anyhow::bail!("Failed to find gemini binary"); };