diff --git a/Cargo.lock b/Cargo.lock index 64470b5abe..767d9a4c9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -137,10 +137,12 @@ dependencies = [ [[package]] name = "agent-client-protocol" -version = "0.0.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72ec54650c1fc2d63498bab47eeeaa9eddc7d239d53f615b797a0e84f7ccc87b" +version = "0.0.13" dependencies = [ + "anyhow", + "futures 0.3.31", + "log", + "parking_lot", "schemars", "serde", "serde_json", @@ -9570,9 +9572,9 @@ dependencies = [ [[package]] name = "lock_api" -version = "0.4.12" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +checksum = "96936507f153605bddfcda068dd804796c84324ed2510809e5b2a624c81da765" dependencies = [ "autocfg", "scopeguard", @@ -11286,9 +11288,9 @@ checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" [[package]] name = "parking_lot" -version = "0.12.3" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13" dependencies = [ "lock_api", "parking_lot_core", @@ -11296,9 +11298,9 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.10" +version = "0.9.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5" dependencies = [ "cfg-if", "libc", diff --git a/Cargo.toml b/Cargo.toml index 5b97596d0c..902c7e8d19 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -421,7 +421,7 @@ zlog_settings = { path = "crates/zlog_settings" } # agentic-coding-protocol = "0.0.10" -agent-client-protocol = "0.0.11" +agent-client-protocol = {path="../agent-client-protocol"} aho-corasick = "1.1" alacritty_terminal = { git = "https://github.com/zed-industries/alacritty.git", branch = "add-hush-login-flag" } any_vec = "0.14" diff --git a/crates/acp_thread/src/acp_thread.rs b/crates/acp_thread/src/acp_thread.rs index 7a10f3bd72..d10fecdb28 100644 --- a/crates/acp_thread/src/acp_thread.rs +++ b/crates/acp_thread/src/acp_thread.rs @@ -391,7 +391,7 @@ impl ToolCallContent { cx: &mut App, ) -> Self { match content { - acp::ToolCallContent::ContentBlock(content) => Self::ContentBlock { + acp::ToolCallContent::Content { content } => Self::ContentBlock { content: ContentBlock::new(content, &language_registry, cx), }, acp::ToolCallContent::Diff { diff } => Self::Diff { @@ -619,6 +619,7 @@ impl Error for LoadError {} impl AcpThread { pub fn new( + title: impl Into, connection: Rc, project: Entity, session_id: acp::SessionId, @@ -631,7 +632,7 @@ impl AcpThread { shared_buffers: Default::default(), entries: Default::default(), plan: Default::default(), - title: connection.name().into(), + title: title.into(), project, send_task: None, connection, @@ -697,14 +698,14 @@ impl AcpThread { cx: &mut Context, ) -> Result<()> { match update { - acp::SessionUpdate::UserMessage(content_block) => { - self.push_user_content_block(content_block, cx); + acp::SessionUpdate::UserMessageChunk { content } => { + self.push_user_content_block(content, cx); } - acp::SessionUpdate::AgentMessageChunk(content_block) => { - self.push_assistant_content_block(content_block, false, cx); + acp::SessionUpdate::AgentMessageChunk { content } => { + self.push_assistant_content_block(content, false, cx); } - acp::SessionUpdate::AgentThoughtChunk(content_block) => { - self.push_assistant_content_block(content_block, true, cx); + acp::SessionUpdate::AgentThoughtChunk { content } => { + self.push_assistant_content_block(content, true, cx); } acp::SessionUpdate::ToolCall(tool_call) => { self.upsert_tool_call(tool_call, cx); @@ -973,10 +974,6 @@ impl AcpThread { cx.notify(); } - pub fn authenticate(&self, cx: &mut App) -> impl use<> + Future> { - self.connection.authenticate(cx) - } - #[cfg(any(test, feature = "test-support"))] pub fn send_raw( &mut self, @@ -1018,7 +1015,7 @@ impl AcpThread { let result = this .update(cx, |this, cx| { this.connection.prompt( - acp::PromptArguments { + acp::PromptRequest { prompt: message, session_id: this.session_id.clone(), }, @@ -1620,9 +1617,15 @@ mod tests { connection, child_status: io_task, current_thread: thread_rc, + auth_methods: [acp::AuthMethod { + id: acp::AuthMethodId("acp-old-no-id".into()), + label: "Log in".into(), + description: None, + }], }; AcpThread::new( + "test", Rc::new(connection), project, acp::SessionId("test".into()), diff --git a/crates/acp_thread/src/connection.rs b/crates/acp_thread/src/connection.rs index 5b25b71863..929500a67b 100644 --- a/crates/acp_thread/src/connection.rs +++ b/crates/acp_thread/src/connection.rs @@ -1,6 +1,6 @@ -use std::{path::Path, rc::Rc}; +use std::{error::Error, fmt, path::Path, rc::Rc}; -use agent_client_protocol as acp; +use agent_client_protocol::{self as acp}; use anyhow::Result; use gpui::{AsyncApp, Entity, Task}; use project::Project; @@ -9,8 +9,6 @@ use ui::App; use crate::AcpThread; pub trait AgentConnection { - fn name(&self) -> &'static str; - fn new_thread( self: Rc, project: Entity, @@ -18,9 +16,21 @@ pub trait AgentConnection { cx: &mut AsyncApp, ) -> Task>>; - fn authenticate(&self, cx: &mut App) -> Task>; + fn auth_methods(&self) -> &[acp::AuthMethod]; - fn prompt(&self, params: acp::PromptArguments, cx: &mut App) -> Task>; + fn authenticate(&self, method: acp::AuthMethodId, cx: &mut App) -> Task>; + + fn prompt(&self, params: acp::PromptRequest, cx: &mut App) -> Task>; fn cancel(&self, session_id: &acp::SessionId, cx: &mut App); } + +#[derive(Debug)] +pub struct AuthRequired; + +impl Error for AuthRequired {} +impl fmt::Display for AuthRequired { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "AuthRequired") + } +} diff --git a/crates/acp_thread/src/old_acp_support.rs b/crates/acp_thread/src/old_acp_support.rs index 571023239f..adb27e21c4 100644 --- a/crates/acp_thread/src/old_acp_support.rs +++ b/crates/acp_thread/src/old_acp_support.rs @@ -5,11 +5,11 @@ use anyhow::{Context as _, Result}; use futures::channel::oneshot; use gpui::{AppContext as _, AsyncApp, Entity, Task, WeakEntity}; use project::Project; -use std::{cell::RefCell, error::Error, fmt, path::Path, rc::Rc}; +use std::{cell::RefCell, path::Path, rc::Rc}; use ui::App; use util::ResultExt as _; -use crate::{AcpThread, AgentConnection}; +use crate::{AcpThread, AgentConnection, AuthRequired}; #[derive(Clone)] pub struct OldAcpClientDelegate { @@ -351,28 +351,15 @@ fn into_new_plan_status(status: acp_old::PlanEntryStatus) -> acp::PlanEntryStatu } } -#[derive(Debug)] -pub struct Unauthenticated; - -impl Error for Unauthenticated {} -impl fmt::Display for Unauthenticated { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Unauthenticated") - } -} - pub struct OldAcpAgentConnection { pub name: &'static str, pub connection: acp_old::AgentConnection, pub child_status: Task>, pub current_thread: Rc>>, + pub auth_methods: [acp::AuthMethod; 1], } impl AgentConnection for OldAcpAgentConnection { - fn name(&self) -> &'static str { - self.name - } - fn new_thread( self: Rc, project: Entity, @@ -391,13 +378,13 @@ impl AgentConnection for OldAcpAgentConnection { let result = acp_old::InitializeParams::response_from_any(result)?; if !result.is_authenticated { - anyhow::bail!(Unauthenticated) + anyhow::bail!(AuthRequired) } cx.update(|cx| { let thread = cx.new(|cx| { let session_id = acp::SessionId("acp-old-no-id".into()); - AcpThread::new(self.clone(), project, session_id, cx) + AcpThread::new("Gemini", self.clone(), project, session_id, cx) }); current_thread.replace(thread.downgrade()); thread @@ -405,7 +392,11 @@ impl AgentConnection for OldAcpAgentConnection { }) } - fn authenticate(&self, cx: &mut App) -> Task> { + fn auth_methods(&self) -> &[acp::AuthMethod] { + &self.auth_methods + } + + fn authenticate(&self, _method_id: acp::AuthMethodId, cx: &mut App) -> Task> { let task = self .connection .request_any(acp_old::AuthenticateParams.into_any()); @@ -415,7 +406,7 @@ impl AgentConnection for OldAcpAgentConnection { }) } - fn prompt(&self, params: acp::PromptArguments, cx: &mut App) -> Task> { + fn prompt(&self, params: acp::PromptRequest, cx: &mut App) -> Task> { let chunks = params .prompt .into_iter() diff --git a/crates/agent_servers/acp b/crates/agent_servers/acp new file mode 100644 index 0000000000..e69de29bb2 diff --git a/crates/agent_servers/src/acp_connection.rs b/crates/agent_servers/src/acp_connection.rs new file mode 100644 index 0000000000..0ced22fc65 --- /dev/null +++ b/crates/agent_servers/src/acp_connection.rs @@ -0,0 +1,245 @@ +use agent_client_protocol::{self as acp, Agent as _}; +use collections::HashMap; +use futures::channel::oneshot; +use project::Project; +use std::cell::RefCell; +use std::path::Path; +use std::rc::Rc; + +use anyhow::{Context as _, Result}; +use gpui::{App, AppContext as _, AsyncApp, Entity, Task, WeakEntity}; + +use crate::AgentServerCommand; +use acp_thread::{AcpThread, AgentConnection, AuthRequired}; + +pub struct AcpConnection { + server_name: &'static str, + connection: Rc, + sessions: Rc>>, + auth_methods: Vec, + _io_task: Task>, +} + +pub struct AcpSession { + thread: WeakEntity, +} + +impl AcpConnection { + pub async fn stdio( + server_name: &'static str, + command: AgentServerCommand, + root_dir: &Path, + cx: &mut AsyncApp, + ) -> Result { + let mut child = util::command::new_smol_command(&command.path) + .args(command.args.iter().map(|arg| arg.as_str())) + .envs(command.env.iter().flatten()) + .current_dir(root_dir) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::inherit()) + .kill_on_drop(true) + .spawn()?; + + let stdout = child.stdout.take().expect("Failed to take stdout"); + let stdin = child.stdin.take().expect("Failed to take stdin"); + + let sessions = Rc::new(RefCell::new(HashMap::default())); + + let client = ClientDelegate { + sessions: sessions.clone(), + cx: cx.clone(), + }; + let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, { + let foreground_executor = cx.foreground_executor().clone(); + move |fut| { + foreground_executor.spawn(fut).detach(); + } + }); + + let io_task = cx.background_spawn(io_task); + + let response = connection + .initialize(acp::InitializeRequest { + protocol_version: acp::VERSION, + client_capabilities: acp::ClientCapabilities { + fs: acp::FileSystemCapability { + read_text_file: true, + write_text_file: true, + }, + }, + }) + .await?; + + // todo! check version + + Ok(Self { + auth_methods: response.auth_methods, + connection: connection.into(), + server_name, + sessions, + _io_task: io_task, + }) + } +} + +impl AgentConnection for AcpConnection { + fn new_thread( + self: Rc, + project: Entity, + cwd: &Path, + cx: &mut AsyncApp, + ) -> Task>> { + let conn = self.connection.clone(); + let sessions = self.sessions.clone(); + let cwd = cwd.to_path_buf(); + cx.spawn(async move |cx| { + let response = conn + .new_session(acp::NewSessionRequest { + // todo! Zed MCP server? + mcp_servers: vec![], + cwd, + }) + .await?; + + let Some(session_id) = response.session_id else { + anyhow::bail!(AuthRequired); + }; + + let thread = cx.new(|cx| { + AcpThread::new( + self.server_name, + self.clone(), + project, + session_id.clone(), + cx, + ) + })?; + + let session = AcpSession { + thread: thread.downgrade(), + }; + sessions.borrow_mut().insert(session_id, session); + + Ok(thread) + }) + } + + fn auth_methods(&self) -> &[acp::AuthMethod] { + &self.auth_methods + } + + fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task> { + let conn = self.connection.clone(); + cx.foreground_executor().spawn(async move { + let result = conn + .authenticate(acp::AuthenticateRequest { + method_id: method_id.clone(), + }) + .await?; + + Ok(result) + }) + } + + fn prompt(&self, params: acp::PromptRequest, cx: &mut App) -> Task> { + let conn = self.connection.clone(); + cx.foreground_executor() + .spawn(async move { Ok(conn.prompt(params).await?) }) + } + + fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) { + let conn = self.connection.clone(); + let params = acp::CancelledNotification { + session_id: session_id.clone(), + }; + cx.foreground_executor() + .spawn(async move { conn.cancelled(params).await }) + .detach(); + } +} + +struct ClientDelegate { + sessions: Rc>>, + cx: AsyncApp, +} + +impl acp::Client for ClientDelegate { + async fn request_permission( + &self, + arguments: acp::RequestPermissionRequest, + ) -> Result { + let cx = &mut self.cx.clone(); + let result = self + .sessions + .borrow() + .get(&arguments.session_id) + .context("Failed to get session")? + .thread + .update(cx, |thread, cx| { + thread.request_tool_call_permission(arguments.tool_call, arguments.options, cx) + })? + .await; + + let outcome = match result { + Ok(option) => acp::RequestPermissionOutcome::Selected { option_id: option }, + Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled, + }; + + Ok(acp::RequestPermissionResponse { outcome }) + } + + async fn write_text_file( + &self, + arguments: acp::WriteTextFileRequest, + ) -> Result<(), acp::Error> { + let cx = &mut self.cx.clone(); + self.sessions + .borrow() + .get(&arguments.session_id) + .context("Failed to get session")? + .thread + .update(cx, |thread, cx| { + thread.write_text_file(arguments.path, arguments.content, cx) + })? + .await?; + + Ok(()) + } + + async fn read_text_file( + &self, + arguments: acp::ReadTextFileRequest, + ) -> Result { + let cx = &mut self.cx.clone(); + let content = self + .sessions + .borrow() + .get(&arguments.session_id) + .context("Failed to get session")? + .thread + .update(cx, |thread, cx| { + thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx) + })? + .await?; + + Ok(acp::ReadTextFileResponse { content }) + } + + async fn session_notification( + &self, + notification: acp::SessionNotification, + ) -> Result<(), acp::Error> { + let cx = &mut self.cx.clone(); + let sessions = self.sessions.borrow(); + let session = sessions + .get(¬ification.session_id) + .context("Failed to get session")?; + + session.thread.update(cx, |thread, cx| { + thread.handle_session_update(notification.update, cx) + })??; + + Ok(()) + } +} diff --git a/crates/agent_servers/src/agent_servers.rs b/crates/agent_servers/src/agent_servers.rs index 212bb74d8a..13bad53cd9 100644 --- a/crates/agent_servers/src/agent_servers.rs +++ b/crates/agent_servers/src/agent_servers.rs @@ -1,14 +1,12 @@ +mod acp_connection; mod claude; -mod codex; mod gemini; -mod mcp_server; mod settings; #[cfg(test)] mod e2e_tests; pub use claude::*; -pub use codex::*; pub use gemini::*; pub use settings::*; @@ -38,7 +36,6 @@ pub trait AgentServer: Send { fn connect( &self, - // these will go away when old_acp is fully removed root_dir: &Path, project: &Entity, cx: &mut App, diff --git a/crates/agent_servers/src/claude.rs b/crates/agent_servers/src/claude.rs index 6565786204..9040b83085 100644 --- a/crates/agent_servers/src/claude.rs +++ b/crates/agent_servers/src/claude.rs @@ -70,10 +70,6 @@ struct ClaudeAgentConnection { } impl AgentConnection for ClaudeAgentConnection { - fn name(&self) -> &'static str { - ClaudeCode.name() - } - fn new_thread( self: Rc, project: Entity, @@ -168,8 +164,9 @@ impl AgentConnection for ClaudeAgentConnection { } }); - let thread = - cx.new(|cx| AcpThread::new(self.clone(), project, session_id.clone(), cx))?; + let thread = cx.new(|cx| { + AcpThread::new("Claude Code", self.clone(), project, session_id.clone(), cx) + })?; thread_tx.send(thread.downgrade())?; @@ -186,11 +183,15 @@ impl AgentConnection for ClaudeAgentConnection { }) } - fn authenticate(&self, _cx: &mut App) -> Task> { + fn auth_methods(&self) -> &[acp::AuthMethod] { + &[] + } + + fn authenticate(&self, _: acp::AuthMethodId, _cx: &mut App) -> Task> { Task::ready(Err(anyhow!("Authentication not supported"))) } - fn prompt(&self, params: acp::PromptArguments, cx: &mut App) -> Task> { + fn prompt(&self, params: acp::PromptRequest, cx: &mut App) -> Task> { let sessions = self.sessions.borrow(); let Some(session) = sessions.get(¶ms.session_id) else { return Task::ready(Err(anyhow!( diff --git a/crates/agent_servers/src/codex.rs b/crates/agent_servers/src/codex.rs deleted file mode 100644 index 712c333221..0000000000 --- a/crates/agent_servers/src/codex.rs +++ /dev/null @@ -1,319 +0,0 @@ -use agent_client_protocol as acp; -use anyhow::anyhow; -use collections::HashMap; -use context_server::listener::McpServerTool; -use context_server::types::requests; -use context_server::{ContextServer, ContextServerCommand, ContextServerId}; -use futures::channel::{mpsc, oneshot}; -use project::Project; -use settings::SettingsStore; -use smol::stream::StreamExt as _; -use std::cell::RefCell; -use std::rc::Rc; -use std::{path::Path, sync::Arc}; -use util::ResultExt; - -use anyhow::{Context, Result}; -use gpui::{App, AppContext as _, AsyncApp, Entity, Task, WeakEntity}; - -use crate::mcp_server::ZedMcpServer; -use crate::{AgentServer, AgentServerCommand, AllAgentServersSettings, mcp_server}; -use acp_thread::{AcpThread, AgentConnection}; - -#[derive(Clone)] -pub struct Codex; - -impl AgentServer for Codex { - fn name(&self) -> &'static str { - "Codex" - } - - fn empty_state_headline(&self) -> &'static str { - "Welcome to Codex" - } - - fn empty_state_message(&self) -> &'static str { - "What can I help with?" - } - - fn logo(&self) -> ui::IconName { - ui::IconName::AiOpenAi - } - - fn connect( - &self, - _root_dir: &Path, - project: &Entity, - cx: &mut App, - ) -> Task>> { - let project = project.clone(); - let working_directory = project.read(cx).active_project_directory(cx); - cx.spawn(async move |cx| { - let settings = cx.read_global(|settings: &SettingsStore, _| { - settings.get::(None).codex.clone() - })?; - - let Some(command) = - AgentServerCommand::resolve("codex", &["mcp"], settings, &project, cx).await - else { - anyhow::bail!("Failed to find codex binary"); - }; - - let client: Arc = ContextServer::stdio( - ContextServerId("codex-mcp-server".into()), - ContextServerCommand { - path: command.path, - args: command.args, - env: command.env, - }, - working_directory, - ) - .into(); - ContextServer::start(client.clone(), cx).await?; - - let (notification_tx, mut notification_rx) = mpsc::unbounded(); - client - .client() - .context("Failed to subscribe")? - .on_notification(acp::SESSION_UPDATE_METHOD_NAME, { - move |notification, _cx| { - let notification_tx = notification_tx.clone(); - log::trace!( - "ACP Notification: {}", - serde_json::to_string_pretty(¬ification).unwrap() - ); - - if let Some(notification) = - serde_json::from_value::(notification) - .log_err() - { - notification_tx.unbounded_send(notification).ok(); - } - } - }); - - let sessions = Rc::new(RefCell::new(HashMap::default())); - - let notification_handler_task = cx.spawn({ - let sessions = sessions.clone(); - async move |cx| { - while let Some(notification) = notification_rx.next().await { - CodexConnection::handle_session_notification( - notification, - sessions.clone(), - cx, - ) - } - } - }); - - let connection = CodexConnection { - client, - sessions, - _notification_handler_task: notification_handler_task, - }; - Ok(Rc::new(connection) as _) - }) - } -} - -struct CodexConnection { - client: Arc, - sessions: Rc>>, - _notification_handler_task: Task<()>, -} - -struct CodexSession { - thread: WeakEntity, - cancel_tx: Option>, - _mcp_server: ZedMcpServer, -} - -impl AgentConnection for CodexConnection { - fn name(&self) -> &'static str { - "Codex" - } - - fn new_thread( - self: Rc, - project: Entity, - cwd: &Path, - cx: &mut AsyncApp, - ) -> Task>> { - let client = self.client.client(); - let sessions = self.sessions.clone(); - let cwd = cwd.to_path_buf(); - cx.spawn(async move |cx| { - let client = client.context("MCP server is not initialized yet")?; - let (mut thread_tx, thread_rx) = watch::channel(WeakEntity::new_invalid()); - - let mcp_server = ZedMcpServer::new(thread_rx, cx).await?; - - let response = client - .request::(context_server::types::CallToolParams { - name: acp::NEW_SESSION_TOOL_NAME.into(), - arguments: Some(serde_json::to_value(acp::NewSessionArguments { - mcp_servers: [( - mcp_server::SERVER_NAME.to_string(), - mcp_server.server_config()?, - )] - .into(), - client_tools: acp::ClientTools { - request_permission: Some(acp::McpToolId { - mcp_server: mcp_server::SERVER_NAME.into(), - tool_name: mcp_server::RequestPermissionTool::NAME.into(), - }), - read_text_file: Some(acp::McpToolId { - mcp_server: mcp_server::SERVER_NAME.into(), - tool_name: mcp_server::ReadTextFileTool::NAME.into(), - }), - write_text_file: Some(acp::McpToolId { - mcp_server: mcp_server::SERVER_NAME.into(), - tool_name: mcp_server::WriteTextFileTool::NAME.into(), - }), - }, - cwd, - })?), - meta: None, - }) - .await?; - - if response.is_error.unwrap_or_default() { - return Err(anyhow!(response.text_contents())); - } - - let result = serde_json::from_value::( - response.structured_content.context("Empty response")?, - )?; - - let thread = - cx.new(|cx| AcpThread::new(self.clone(), project, result.session_id.clone(), cx))?; - - thread_tx.send(thread.downgrade())?; - - let session = CodexSession { - thread: thread.downgrade(), - cancel_tx: None, - _mcp_server: mcp_server, - }; - sessions.borrow_mut().insert(result.session_id, session); - - Ok(thread) - }) - } - - fn authenticate(&self, _cx: &mut App) -> Task> { - Task::ready(Err(anyhow!("Authentication not supported"))) - } - - fn prompt( - &self, - params: agent_client_protocol::PromptArguments, - cx: &mut App, - ) -> Task> { - let client = self.client.client(); - let sessions = self.sessions.clone(); - - cx.foreground_executor().spawn(async move { - let client = client.context("MCP server is not initialized yet")?; - - let (new_cancel_tx, cancel_rx) = oneshot::channel(); - { - let mut sessions = sessions.borrow_mut(); - let session = sessions - .get_mut(¶ms.session_id) - .context("Session not found")?; - session.cancel_tx.replace(new_cancel_tx); - } - - let result = client - .request_with::( - context_server::types::CallToolParams { - name: acp::PROMPT_TOOL_NAME.into(), - arguments: Some(serde_json::to_value(params)?), - meta: None, - }, - Some(cancel_rx), - None, - ) - .await; - - if let Err(err) = &result - && err.is::() - { - return Ok(()); - } - - let response = result?; - - if response.is_error.unwrap_or_default() { - return Err(anyhow!(response.text_contents())); - } - - Ok(()) - }) - } - - fn cancel(&self, session_id: &agent_client_protocol::SessionId, _cx: &mut App) { - let mut sessions = self.sessions.borrow_mut(); - - if let Some(cancel_tx) = sessions - .get_mut(session_id) - .and_then(|session| session.cancel_tx.take()) - { - cancel_tx.send(()).ok(); - } - } -} - -impl CodexConnection { - pub fn handle_session_notification( - notification: acp::SessionNotification, - threads: Rc>>, - cx: &mut AsyncApp, - ) { - let threads = threads.borrow(); - let Some(thread) = threads - .get(¬ification.session_id) - .and_then(|session| session.thread.upgrade()) - else { - log::error!( - "Thread not found for session ID: {}", - notification.session_id - ); - return; - }; - - thread - .update(cx, |thread, cx| { - thread.handle_session_update(notification.update, cx) - }) - .log_err(); - } -} - -impl Drop for CodexConnection { - fn drop(&mut self) { - self.client.stop().log_err(); - } -} - -#[cfg(test)] -pub(crate) mod tests { - use super::*; - use crate::AgentServerCommand; - use std::path::Path; - - crate::common_e2e_tests!(Codex, allow_option_id = "approve"); - - pub fn local_command() -> AgentServerCommand { - let cli_path = Path::new(env!("CARGO_MANIFEST_DIR")) - .join("../../../codex/codex-rs/target/debug/codex"); - - AgentServerCommand { - path: cli_path, - args: vec![], - env: None, - } - } -} diff --git a/crates/agent_servers/src/e2e_tests.rs b/crates/agent_servers/src/e2e_tests.rs index e9c72eabc9..16bf1e6b47 100644 --- a/crates/agent_servers/src/e2e_tests.rs +++ b/crates/agent_servers/src/e2e_tests.rs @@ -375,9 +375,6 @@ pub async fn init_test(cx: &mut TestAppContext) -> Arc { gemini: Some(AgentServerSettings { command: crate::gemini::tests::local_command(), }), - codex: Some(AgentServerSettings { - command: crate::codex::tests::local_command(), - }), }, cx, ); diff --git a/crates/agent_servers/src/gemini.rs b/crates/agent_servers/src/gemini.rs index a97ff3f462..372ce76aa9 100644 --- a/crates/agent_servers/src/gemini.rs +++ b/crates/agent_servers/src/gemini.rs @@ -1,14 +1,10 @@ -use anyhow::anyhow; -use std::cell::RefCell; use std::path::Path; use std::rc::Rc; -use util::ResultExt as _; -use crate::{AgentServer, AgentServerCommand, AgentServerVersion}; -use acp_thread::{AgentConnection, LoadError, OldAcpAgentConnection, OldAcpClientDelegate}; -use agentic_coding_protocol as acp_old; -use anyhow::{Context as _, Result}; -use gpui::{AppContext as _, AsyncApp, Entity, Task, WeakEntity}; +use crate::{AgentServer, AgentServerCommand, acp_connection::AcpConnection}; +use acp_thread::AgentConnection; +use anyhow::Result; +use gpui::{Entity, Task}; use project::Project; use settings::SettingsStore; use ui::App; @@ -43,146 +39,27 @@ impl AgentServer for Gemini { project: &Entity, cx: &mut App, ) -> Task>> { - let root_dir = root_dir.to_path_buf(); let project = project.clone(); - let this = self.clone(); - let name = self.name(); - + let server_name = self.name(); + let root_dir = root_dir.to_path_buf(); cx.spawn(async move |cx| { - let command = this.command(&project, cx).await?; + let settings = cx.read_global(|settings: &SettingsStore, _| { + settings.get::(None).gemini.clone() + })?; - let mut child = util::command::new_smol_command(&command.path) - .args(command.args.iter()) - .current_dir(root_dir) - .stdin(std::process::Stdio::piped()) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::inherit()) - .kill_on_drop(true) - .spawn()?; + let Some(command) = + AgentServerCommand::resolve("gemini", &[ACP_ARG], settings, &project, cx).await + else { + anyhow::bail!("Failed to find gemini binary"); + }; + // todo! check supported version - let stdin = child.stdin.take().unwrap(); - let stdout = child.stdout.take().unwrap(); - - let foreground_executor = cx.foreground_executor().clone(); - - let thread_rc = Rc::new(RefCell::new(WeakEntity::new_invalid())); - - let (connection, io_fut) = acp_old::AgentConnection::connect_to_agent( - OldAcpClientDelegate::new(thread_rc.clone(), cx.clone()), - stdin, - stdout, - move |fut| foreground_executor.spawn(fut).detach(), - ); - - let io_task = cx.background_spawn(async move { - io_fut.await.log_err(); - }); - - let child_status = cx.background_spawn(async move { - let result = match child.status().await { - Err(e) => Err(anyhow!(e)), - Ok(result) if result.success() => Ok(()), - Ok(result) => { - if let Some(AgentServerVersion::Unsupported { - error_message, - upgrade_message, - upgrade_command, - }) = this.version(&command).await.log_err() - { - Err(anyhow!(LoadError::Unsupported { - error_message, - upgrade_message, - upgrade_command - })) - } else { - Err(anyhow!(LoadError::Exited(result.code().unwrap_or(-127)))) - } - } - }; - drop(io_task); - result - }); - - let connection: Rc = Rc::new(OldAcpAgentConnection { - name, - connection, - child_status, - current_thread: thread_rc, - }); - - Ok(connection) + let conn = AcpConnection::stdio(server_name, command, &root_dir, cx).await?; + Ok(Rc::new(conn) as _) }) } } -impl Gemini { - async fn command( - &self, - project: &Entity, - cx: &mut AsyncApp, - ) -> Result { - let settings = cx.read_global(|settings: &SettingsStore, _| { - settings.get::(None).gemini.clone() - })?; - - if let Some(command) = - AgentServerCommand::resolve("gemini", &[ACP_ARG], settings, &project, cx).await - { - return Ok(command); - }; - - let (fs, node_runtime) = project.update(cx, |project, _| { - (project.fs().clone(), project.node_runtime().cloned()) - })?; - let node_runtime = node_runtime.context("gemini not found on path")?; - - let directory = ::paths::agent_servers_dir().join("gemini"); - fs.create_dir(&directory).await?; - node_runtime - .npm_install_packages(&directory, &[("@google/gemini-cli", "latest")]) - .await?; - let path = directory.join("node_modules/.bin/gemini"); - - Ok(AgentServerCommand { - path, - args: vec![ACP_ARG.into()], - env: None, - }) - } - - async fn version(&self, command: &AgentServerCommand) -> Result { - let version_fut = util::command::new_smol_command(&command.path) - .args(command.args.iter()) - .arg("--version") - .kill_on_drop(true) - .output(); - - let help_fut = util::command::new_smol_command(&command.path) - .args(command.args.iter()) - .arg("--help") - .kill_on_drop(true) - .output(); - - let (version_output, help_output) = futures::future::join(version_fut, help_fut).await; - - let current_version = String::from_utf8(version_output?.stdout)?; - let supported = String::from_utf8(help_output?.stdout)?.contains(ACP_ARG); - - if supported { - Ok(AgentServerVersion::Supported) - } else { - Ok(AgentServerVersion::Unsupported { - error_message: format!( - "Your installed version of Gemini {} doesn't support the Agentic Coding Protocol (ACP).", - current_version - ).into(), - upgrade_message: "Upgrade Gemini to Latest".into(), - upgrade_command: "npm install -g @google/gemini-cli@latest".into(), - }) - } - } -} - #[cfg(test)] pub(crate) mod tests { use super::*; @@ -199,7 +76,7 @@ pub(crate) mod tests { AgentServerCommand { path: "node".into(), - args: vec![cli_path, ACP_ARG.into()], + args: vec![cli_path], env: None, } } diff --git a/crates/agent_servers/src/mcp_server.rs b/crates/agent_servers/src/mcp_server.rs deleted file mode 100644 index 055b89dfe2..0000000000 --- a/crates/agent_servers/src/mcp_server.rs +++ /dev/null @@ -1,207 +0,0 @@ -use acp_thread::AcpThread; -use agent_client_protocol as acp; -use anyhow::Result; -use context_server::listener::{McpServerTool, ToolResponse}; -use context_server::types::{ - Implementation, InitializeParams, InitializeResponse, ProtocolVersion, ServerCapabilities, - ToolsCapabilities, requests, -}; -use futures::channel::oneshot; -use gpui::{App, AsyncApp, Task, WeakEntity}; -use indoc::indoc; - -pub struct ZedMcpServer { - server: context_server::listener::McpServer, -} - -pub const SERVER_NAME: &str = "zed"; - -impl ZedMcpServer { - pub async fn new( - thread_rx: watch::Receiver>, - cx: &AsyncApp, - ) -> Result { - let mut mcp_server = context_server::listener::McpServer::new(cx).await?; - mcp_server.handle_request::(Self::handle_initialize); - - mcp_server.add_tool(RequestPermissionTool { - thread_rx: thread_rx.clone(), - }); - mcp_server.add_tool(ReadTextFileTool { - thread_rx: thread_rx.clone(), - }); - mcp_server.add_tool(WriteTextFileTool { - thread_rx: thread_rx.clone(), - }); - - Ok(Self { server: mcp_server }) - } - - pub fn server_config(&self) -> Result { - #[cfg(not(test))] - let zed_path = anyhow::Context::context( - std::env::current_exe(), - "finding current executable path for use in mcp_server", - )?; - - #[cfg(test)] - let zed_path = crate::e2e_tests::get_zed_path(); - - Ok(acp::McpServerConfig { - command: zed_path, - args: vec![ - "--nc".into(), - self.server.socket_path().display().to_string(), - ], - env: None, - }) - } - - fn handle_initialize(_: InitializeParams, cx: &App) -> Task> { - cx.foreground_executor().spawn(async move { - Ok(InitializeResponse { - protocol_version: ProtocolVersion("2025-06-18".into()), - capabilities: ServerCapabilities { - experimental: None, - logging: None, - completions: None, - prompts: None, - resources: None, - tools: Some(ToolsCapabilities { - list_changed: Some(false), - }), - }, - server_info: Implementation { - name: SERVER_NAME.into(), - version: "0.1.0".into(), - }, - meta: None, - }) - }) - } -} - -// Tools - -#[derive(Clone)] -pub struct RequestPermissionTool { - thread_rx: watch::Receiver>, -} - -impl McpServerTool for RequestPermissionTool { - type Input = acp::RequestPermissionArguments; - type Output = acp::RequestPermissionOutput; - - const NAME: &'static str = "Confirmation"; - - fn description(&self) -> &'static str { - indoc! {" - Request permission for tool calls. - - This tool is meant to be called programmatically by the agent loop, not the LLM. - "} - } - - async fn run( - &self, - input: Self::Input, - cx: &mut AsyncApp, - ) -> Result> { - let mut thread_rx = self.thread_rx.clone(); - let Some(thread) = thread_rx.recv().await?.upgrade() else { - anyhow::bail!("Thread closed"); - }; - - let result = thread - .update(cx, |thread, cx| { - thread.request_tool_call_permission(input.tool_call, input.options, cx) - })? - .await; - - let outcome = match result { - Ok(option_id) => acp::RequestPermissionOutcome::Selected { option_id }, - Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Canceled, - }; - - Ok(ToolResponse { - content: vec![], - structured_content: acp::RequestPermissionOutput { outcome }, - }) - } -} - -#[derive(Clone)] -pub struct ReadTextFileTool { - thread_rx: watch::Receiver>, -} - -impl McpServerTool for ReadTextFileTool { - type Input = acp::ReadTextFileArguments; - type Output = acp::ReadTextFileOutput; - - const NAME: &'static str = "Read"; - - fn description(&self) -> &'static str { - "Reads the content of the given file in the project including unsaved changes." - } - - async fn run( - &self, - input: Self::Input, - cx: &mut AsyncApp, - ) -> Result> { - let mut thread_rx = self.thread_rx.clone(); - let Some(thread) = thread_rx.recv().await?.upgrade() else { - anyhow::bail!("Thread closed"); - }; - - let content = thread - .update(cx, |thread, cx| { - thread.read_text_file(input.path, input.line, input.limit, false, cx) - })? - .await?; - - Ok(ToolResponse { - content: vec![], - structured_content: acp::ReadTextFileOutput { content }, - }) - } -} - -#[derive(Clone)] -pub struct WriteTextFileTool { - thread_rx: watch::Receiver>, -} - -impl McpServerTool for WriteTextFileTool { - type Input = acp::WriteTextFileArguments; - type Output = (); - - const NAME: &'static str = "Write"; - - fn description(&self) -> &'static str { - "Write to a file replacing its contents" - } - - async fn run( - &self, - input: Self::Input, - cx: &mut AsyncApp, - ) -> Result> { - let mut thread_rx = self.thread_rx.clone(); - let Some(thread) = thread_rx.recv().await?.upgrade() else { - anyhow::bail!("Thread closed"); - }; - - thread - .update(cx, |thread, cx| { - thread.write_text_file(input.path, input.content, cx) - })? - .await?; - - Ok(ToolResponse { - content: vec![], - structured_content: (), - }) - } -} diff --git a/crates/agent_servers/src/settings.rs b/crates/agent_servers/src/settings.rs index aeb34a5e61..645674b5f1 100644 --- a/crates/agent_servers/src/settings.rs +++ b/crates/agent_servers/src/settings.rs @@ -13,7 +13,6 @@ pub fn init(cx: &mut App) { pub struct AllAgentServersSettings { pub gemini: Option, pub claude: Option, - pub codex: Option, } #[derive(Deserialize, Serialize, Clone, JsonSchema, Debug)] @@ -30,21 +29,13 @@ impl settings::Settings for AllAgentServersSettings { fn load(sources: SettingsSources, _: &mut App) -> Result { let mut settings = AllAgentServersSettings::default(); - for AllAgentServersSettings { - gemini, - claude, - codex, - } in sources.defaults_and_customizations() - { + for AllAgentServersSettings { gemini, claude } in sources.defaults_and_customizations() { if gemini.is_some() { settings.gemini = gemini.clone(); } if claude.is_some() { settings.claude = claude.clone(); } - if codex.is_some() { - settings.codex = codex.clone(); - } } Ok(settings) diff --git a/crates/agent_ui/src/acp/thread_view.rs b/crates/agent_ui/src/acp/thread_view.rs index 8820e4a73d..26166b6960 100644 --- a/crates/agent_ui/src/acp/thread_view.rs +++ b/crates/agent_ui/src/acp/thread_view.rs @@ -232,7 +232,8 @@ impl AcpThreadView { { Err(e) => { let mut cx = cx.clone(); - if e.downcast_ref::().is_some() { + // todo! remove duplication + if e.downcast_ref::().is_some() { this.update(&mut cx, |this, cx| { this.thread_state = ThreadState::Unauthenticated { connection }; cx.notify(); @@ -675,13 +676,18 @@ impl AcpThreadView { Some(entry.diffs().map(|diff| diff.multibuffer.clone())) } - fn authenticate(&mut self, window: &mut Window, cx: &mut Context) { + fn authenticate( + &mut self, + method: acp::AuthMethodId, + window: &mut Window, + cx: &mut Context, + ) { let ThreadState::Unauthenticated { ref connection } = self.thread_state else { return; }; self.last_error.take(); - let authenticate = connection.authenticate(cx); + let authenticate = connection.authenticate(method, cx); self.auth_task = Some(cx.spawn_in(window, { let project = self.project.clone(); let agent = self.agent.clone(); @@ -2380,22 +2386,26 @@ impl Render for AcpThreadView { .on_action(cx.listener(Self::next_history_message)) .on_action(cx.listener(Self::open_agent_diff)) .child(match &self.thread_state { - ThreadState::Unauthenticated { .. } => { - v_flex() - .p_2() - .flex_1() - .items_center() - .justify_center() - .child(self.render_pending_auth_state()) - .child( - h_flex().mt_1p5().justify_center().child( - Button::new("sign-in", format!("Sign in to {}", self.agent.name())) - .on_click(cx.listener(|this, _, window, cx| { - this.authenticate(window, cx) - })), - ), - ) - } + ThreadState::Unauthenticated { connection } => v_flex() + .p_2() + .flex_1() + .items_center() + .justify_center() + .child(self.render_pending_auth_state()) + .child(h_flex().mt_1p5().justify_center().children( + connection.auth_methods().into_iter().map(|method| { + Button::new( + SharedString::from(method.id.0.clone()), + method.label.clone(), + ) + .on_click({ + let method_id = method.id.clone(); + cx.listener(move |this, _, window, cx| { + this.authenticate(method_id.clone(), window, cx) + }) + }) + }), + )), ThreadState::Loading { .. } => v_flex().flex_1().child(self.render_empty_state(cx)), ThreadState::LoadError(e) => v_flex() .p_2() @@ -2834,10 +2844,6 @@ mod tests { } impl AgentConnection for StubAgentConnection { - fn name(&self) -> &'static str { - "StubAgentConnection" - } - fn new_thread( self: Rc, project: Entity, @@ -2853,17 +2859,27 @@ mod tests { .into(), ); let thread = cx - .new(|cx| AcpThread::new(self.clone(), project, session_id.clone(), cx)) + .new(|cx| { + AcpThread::new("New Thread", self.clone(), project, session_id.clone(), cx) + }) .unwrap(); self.sessions.lock().insert(session_id, thread.downgrade()); Task::ready(Ok(thread)) } - fn authenticate(&self, _cx: &mut App) -> Task> { + fn auth_methods(&self) -> &[agent_client_protocol::AuthMethod] { + todo!() + } + + fn authenticate( + &self, + _method: acp::AuthMethodId, + _cx: &mut App, + ) -> Task> { unimplemented!() } - fn prompt(&self, params: acp::PromptArguments, cx: &mut App) -> Task> { + fn prompt(&self, params: acp::PromptRequest, cx: &mut App) -> Task> { let sessions = self.sessions.lock(); let thread = sessions.get(¶ms.session_id).unwrap(); let mut tasks = vec![]; @@ -2910,10 +2926,6 @@ mod tests { struct SaboteurAgentConnection; impl AgentConnection for SaboteurAgentConnection { - fn name(&self) -> &'static str { - "SaboteurAgentConnection" - } - fn new_thread( self: Rc, project: Entity, @@ -2921,15 +2933,23 @@ mod tests { cx: &mut gpui::AsyncApp, ) -> Task>> { Task::ready(Ok(cx - .new(|cx| AcpThread::new(self, project, SessionId("test".into()), cx)) + .new(|cx| AcpThread::new("New Thread", self, project, SessionId("test".into()), cx)) .unwrap())) } - fn authenticate(&self, _cx: &mut App) -> Task> { + fn auth_methods(&self) -> &[agent_client_protocol::AuthMethod] { + todo!() + } + + fn authenticate( + &self, + _method: acp::AuthMethodId, + _cx: &mut App, + ) -> Task> { unimplemented!() } - fn prompt(&self, _params: acp::PromptArguments, _cx: &mut App) -> Task> { + fn prompt(&self, _params: acp::PromptRequest, _cx: &mut App) -> Task> { Task::ready(Err(anyhow::anyhow!("Error prompting"))) } diff --git a/crates/agent_ui/src/agent_panel.rs b/crates/agent_ui/src/agent_panel.rs index fcb8dfbac2..a09c669769 100644 --- a/crates/agent_ui/src/agent_panel.rs +++ b/crates/agent_ui/src/agent_panel.rs @@ -1987,20 +1987,6 @@ impl AgentPanel { ); }), ) - .item( - ContextMenuEntry::new("New Codex Thread") - .icon(IconName::AiOpenAi) - .icon_color(Color::Muted) - .handler(move |window, cx| { - window.dispatch_action( - NewExternalAgentThread { - agent: Some(crate::ExternalAgent::Codex), - } - .boxed_clone(), - cx, - ); - }), - ) }); menu })) @@ -2662,25 +2648,6 @@ impl AgentPanel { ) }, ), - ) - .child( - NewThreadButton::new( - "new-codex-thread-btn", - "New Codex Thread", - IconName::AiOpenAi, - ) - .on_click( - |window, cx| { - window.dispatch_action( - Box::new(NewExternalAgentThread { - agent: Some( - crate::ExternalAgent::Codex, - ), - }), - cx, - ) - }, - ), ), ) }), diff --git a/crates/agent_ui/src/agent_ui.rs b/crates/agent_ui/src/agent_ui.rs index 0800031abe..c5574c2371 100644 --- a/crates/agent_ui/src/agent_ui.rs +++ b/crates/agent_ui/src/agent_ui.rs @@ -150,7 +150,6 @@ enum ExternalAgent { #[default] Gemini, ClaudeCode, - Codex, } impl ExternalAgent { @@ -158,7 +157,6 @@ impl ExternalAgent { match self { ExternalAgent::Gemini => Rc::new(agent_servers::Gemini), ExternalAgent::ClaudeCode => Rc::new(agent_servers::ClaudeCode), - ExternalAgent::Codex => Rc::new(agent_servers::Codex), } } } diff --git a/crates/context_server/src/client.rs b/crates/context_server/src/client.rs index 1eb29bbbf9..65283afa87 100644 --- a/crates/context_server/src/client.rs +++ b/crates/context_server/src/client.rs @@ -441,14 +441,12 @@ impl Client { Ok(()) } - #[allow(unused)] - pub fn on_notification(&self, method: &'static str, f: F) - where - F: 'static + Send + FnMut(Value, AsyncApp), - { - self.notification_handlers - .lock() - .insert(method, Box::new(f)); + pub fn on_notification( + &self, + method: &'static str, + f: Box, + ) { + self.notification_handlers.lock().insert(method, f); } } diff --git a/crates/context_server/src/context_server.rs b/crates/context_server/src/context_server.rs index e76e7972f7..34fa29678d 100644 --- a/crates/context_server/src/context_server.rs +++ b/crates/context_server/src/context_server.rs @@ -95,8 +95,28 @@ impl ContextServer { self.client.read().clone() } - pub async fn start(self: Arc, cx: &AsyncApp) -> Result<()> { - let client = match &self.configuration { + pub async fn start(&self, cx: &AsyncApp) -> Result<()> { + self.initialize(self.new_client(cx)?).await + } + + /// Starts the context server, making sure handlers are registered before initialization happens + pub async fn start_with_handlers( + &self, + notification_handlers: Vec<( + &'static str, + Box, + )>, + cx: &AsyncApp, + ) -> Result<()> { + let client = self.new_client(cx)?; + for (method, handler) in notification_handlers { + client.on_notification(method, handler); + } + self.initialize(client).await + } + + fn new_client(&self, cx: &AsyncApp) -> Result { + Ok(match &self.configuration { ContextServerTransport::Stdio(command, working_directory) => Client::stdio( client::ContextServerId(self.id.0.clone()), client::ModelContextServerBinary { @@ -113,8 +133,7 @@ impl ContextServer { transport.clone(), cx.clone(), )?, - }; - self.initialize(client).await + }) } async fn initialize(&self, client: Client) -> Result<()> { diff --git a/crates/context_server/src/listener.rs b/crates/context_server/src/listener.rs index 34e3a9a78c..0e85fb2129 100644 --- a/crates/context_server/src/listener.rs +++ b/crates/context_server/src/listener.rs @@ -83,14 +83,18 @@ impl McpServer { } pub fn add_tool(&mut self, tool: T) { - let output_schema = schemars::schema_for!(T::Output); - let unit_schema = schemars::schema_for!(()); + let mut settings = schemars::generate::SchemaSettings::draft07(); + settings.inline_subschemas = true; + let mut generator = settings.into_generator(); + + let output_schema = generator.root_schema_for::(); + let unit_schema = generator.root_schema_for::(); let registered_tool = RegisteredTool { tool: Tool { name: T::NAME.into(), description: Some(tool.description().into()), - input_schema: schemars::schema_for!(T::Input).into(), + input_schema: generator.root_schema_for::().into(), output_schema: if output_schema == unit_schema { None } else { diff --git a/crates/context_server/src/protocol.rs b/crates/context_server/src/protocol.rs index 9ccbc8a553..5355f20f62 100644 --- a/crates/context_server/src/protocol.rs +++ b/crates/context_server/src/protocol.rs @@ -115,10 +115,11 @@ impl InitializedContextServerProtocol { self.inner.notify(T::METHOD, params) } - pub fn on_notification(&self, method: &'static str, f: F) - where - F: 'static + Send + FnMut(Value, AsyncApp), - { + pub fn on_notification( + &self, + method: &'static str, + f: Box, + ) { self.inner.on_notification(method, f); } }