Refactor to use new ACP crate (#35043)

This will prepare us for running the protocol over MCP

Release Notes:

- N/A

---------

Co-authored-by: Ben Brandt <benjamin.j.brandt@gmail.com>
Co-authored-by: Conrad Irwin <conrad.irwin@gmail.com>
Co-authored-by: Richard Feldman <oss@rtfeldman.com>
This commit is contained in:
Agus Zubiaga 2025-07-24 14:39:29 -03:00 committed by GitHub
parent 45ddf32a1d
commit 2d0f10c48a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 1830 additions and 1748 deletions

View file

@ -1,5 +1,5 @@
mod mcp_server;
mod tools;
pub mod tools;
use collections::HashMap;
use project::Project;
@ -12,28 +12,24 @@ use std::pin::pin;
use std::rc::Rc;
use uuid::Uuid;
use agentic_coding_protocol::{
self as acp, AnyAgentRequest, AnyAgentResult, Client, ProtocolVersion,
StreamAssistantMessageChunkParams, ToolCallContent, UpdateToolCallParams,
};
use agent_client_protocol as acp;
use anyhow::{Result, anyhow};
use futures::channel::oneshot;
use futures::future::LocalBoxFuture;
use futures::{AsyncBufReadExt, AsyncWriteExt, SinkExt};
use futures::{AsyncBufReadExt, AsyncWriteExt};
use futures::{
AsyncRead, AsyncWrite, FutureExt, StreamExt,
channel::mpsc::{self, UnboundedReceiver, UnboundedSender},
io::BufReader,
select_biased,
};
use gpui::{App, AppContext, Entity, Task};
use gpui::{App, AppContext, AsyncApp, Entity, Task, WeakEntity};
use serde::{Deserialize, Serialize};
use util::ResultExt;
use crate::claude::mcp_server::ClaudeMcpServer;
use crate::claude::mcp_server::{ClaudeZedMcpServer, McpConfig};
use crate::claude::tools::ClaudeTool;
use crate::{AgentServer, AgentServerCommand, AllAgentServersSettings};
use acp_thread::{AcpClientDelegate, AcpThread, AgentConnection};
use acp_thread::{AcpThread, AgentConnection};
#[derive(Clone)]
pub struct ClaudeCode;
@ -55,29 +51,57 @@ impl AgentServer for ClaudeCode {
ui::IconName::AiClaude
}
fn supports_always_allow(&self) -> bool {
false
fn connect(
&self,
_root_dir: &Path,
_project: &Entity<Project>,
_cx: &mut App,
) -> Task<Result<Rc<dyn AgentConnection>>> {
let connection = ClaudeAgentConnection {
sessions: Default::default(),
};
Task::ready(Ok(Rc::new(connection) as _))
}
}
#[cfg(unix)]
fn send_interrupt(pid: libc::pid_t) -> anyhow::Result<()> {
let pid = nix::unistd::Pid::from_raw(pid);
nix::sys::signal::kill(pid, nix::sys::signal::SIGINT)
.map_err(|e| anyhow!("Failed to interrupt process: {}", e))
}
#[cfg(windows)]
fn send_interrupt(_pid: i32) -> anyhow::Result<()> {
panic!("Cancel not implemented on Windows")
}
struct ClaudeAgentConnection {
sessions: Rc<RefCell<HashMap<acp::SessionId, ClaudeAgentSession>>>,
}
impl AgentConnection for ClaudeAgentConnection {
fn name(&self) -> &'static str {
ClaudeCode.name()
}
fn new_thread(
&self,
root_dir: &Path,
project: &Entity<Project>,
cx: &mut App,
self: Rc<Self>,
project: Entity<Project>,
cwd: &Path,
cx: &mut AsyncApp,
) -> Task<Result<Entity<AcpThread>>> {
let project = project.clone();
let root_dir = root_dir.to_path_buf();
let title = self.name().into();
let cwd = cwd.to_owned();
cx.spawn(async move |cx| {
let (mut delegate_tx, delegate_rx) = watch::channel(None);
let tool_id_map = Rc::new(RefCell::new(HashMap::default()));
let mcp_server = ClaudeMcpServer::new(delegate_rx, tool_id_map.clone(), cx).await?;
let (mut thread_tx, thread_rx) = watch::channel(WeakEntity::new_invalid());
let permission_mcp_server = ClaudeZedMcpServer::new(thread_rx.clone(), cx).await?;
let mut mcp_servers = HashMap::default();
mcp_servers.insert(
mcp_server::SERVER_NAME.to_string(),
mcp_server.server_config()?,
permission_mcp_server.server_config()?,
);
let mcp_config = McpConfig { mcp_servers };
@ -104,177 +128,180 @@ impl AgentServer for ClaudeCode {
let (outgoing_tx, outgoing_rx) = mpsc::unbounded();
let (cancel_tx, mut cancel_rx) = mpsc::unbounded::<oneshot::Sender<Result<()>>>();
let session_id = Uuid::new_v4();
let session_id = acp::SessionId(Uuid::new_v4().to_string().into());
log::trace!("Starting session with id: {}", session_id);
cx.background_spawn(async move {
let mut outgoing_rx = Some(outgoing_rx);
let mut mode = ClaudeSessionMode::Start;
cx.background_spawn({
let session_id = session_id.clone();
async move {
let mut outgoing_rx = Some(outgoing_rx);
let mut mode = ClaudeSessionMode::Start;
loop {
let mut child =
spawn_claude(&command, mode, session_id, &mcp_config_path, &root_dir)
.await?;
mode = ClaudeSessionMode::Resume;
let pid = child.id();
log::trace!("Spawned (pid: {})", pid);
let mut io_fut = pin!(
ClaudeAgentConnection::handle_io(
outgoing_rx.take().unwrap(),
incoming_message_tx.clone(),
child.stdin.take().unwrap(),
child.stdout.take().unwrap(),
loop {
let mut child = spawn_claude(
&command,
mode,
session_id.clone(),
&mcp_config_path,
&cwd,
)
.fuse()
);
.await?;
mode = ClaudeSessionMode::Resume;
select_biased! {
done_tx = cancel_rx.next() => {
if let Some(done_tx) = done_tx {
log::trace!("Interrupted (pid: {})", pid);
let result = send_interrupt(pid as i32);
outgoing_rx.replace(io_fut.await?);
done_tx.send(result).log_err();
continue;
let pid = child.id();
log::trace!("Spawned (pid: {})", pid);
let mut io_fut = pin!(
ClaudeAgentSession::handle_io(
outgoing_rx.take().unwrap(),
incoming_message_tx.clone(),
child.stdin.take().unwrap(),
child.stdout.take().unwrap(),
)
.fuse()
);
select_biased! {
done_tx = cancel_rx.next() => {
if let Some(done_tx) = done_tx {
log::trace!("Interrupted (pid: {})", pid);
let result = send_interrupt(pid as i32);
outgoing_rx.replace(io_fut.await?);
done_tx.send(result).log_err();
continue;
}
}
result = io_fut => {
result?;
}
}
result = io_fut => {
result?;
}
log::trace!("Stopped (pid: {})", pid);
break;
}
log::trace!("Stopped (pid: {})", pid);
break;
drop(mcp_config_path);
anyhow::Ok(())
}
drop(mcp_config_path);
anyhow::Ok(())
})
.detach();
cx.new(|cx| {
let end_turn_tx = Rc::new(RefCell::new(None));
let delegate = AcpClientDelegate::new(cx.entity().downgrade(), cx.to_async());
delegate_tx.send(Some(delegate.clone())).log_err();
let handler_task = cx.foreground_executor().spawn({
let end_turn_tx = end_turn_tx.clone();
let tool_id_map = tool_id_map.clone();
let delegate = delegate.clone();
async move {
while let Some(message) = incoming_message_rx.next().await {
ClaudeAgentConnection::handle_message(
delegate.clone(),
message,
end_turn_tx.clone(),
tool_id_map.clone(),
)
.await
}
let end_turn_tx = Rc::new(RefCell::new(None));
let handler_task = cx.spawn({
let end_turn_tx = end_turn_tx.clone();
let thread_rx = thread_rx.clone();
async move |cx| {
while let Some(message) = incoming_message_rx.next().await {
ClaudeAgentSession::handle_message(
thread_rx.clone(),
message,
end_turn_tx.clone(),
cx,
)
.await
}
});
}
});
let mut connection = ClaudeAgentConnection {
delegate,
outgoing_tx,
end_turn_tx,
cancel_tx,
session_id,
_handler_task: handler_task,
_mcp_server: None,
};
let thread =
cx.new(|cx| AcpThread::new(self.clone(), project, session_id.clone(), cx))?;
connection._mcp_server = Some(mcp_server);
acp_thread::AcpThread::new(connection, title, None, project.clone(), cx)
})
thread_tx.send(thread.downgrade())?;
let session = ClaudeAgentSession {
outgoing_tx,
end_turn_tx,
cancel_tx,
_handler_task: handler_task,
_mcp_server: Some(permission_mcp_server),
};
self.sessions.borrow_mut().insert(session_id, session);
Ok(thread)
})
}
}
#[cfg(unix)]
fn send_interrupt(pid: libc::pid_t) -> anyhow::Result<()> {
let pid = nix::unistd::Pid::from_raw(pid);
fn authenticate(&self, _cx: &mut App) -> Task<Result<()>> {
Task::ready(Err(anyhow!("Authentication not supported")))
}
nix::sys::signal::kill(pid, nix::sys::signal::SIGINT)
.map_err(|e| anyhow!("Failed to interrupt process: {}", e))
}
fn prompt(&self, params: acp::PromptToolArguments, cx: &mut App) -> Task<Result<()>> {
let sessions = self.sessions.borrow();
let Some(session) = sessions.get(&params.session_id) else {
return Task::ready(Err(anyhow!(
"Attempted to send message to nonexistent session {}",
params.session_id
)));
};
#[cfg(windows)]
fn send_interrupt(_pid: i32) -> anyhow::Result<()> {
panic!("Cancel not implemented on Windows")
}
let (tx, rx) = oneshot::channel();
session.end_turn_tx.borrow_mut().replace(tx);
impl AgentConnection for ClaudeAgentConnection {
/// Send a request to the agent and wait for a response.
fn request_any(
&self,
params: AnyAgentRequest,
) -> LocalBoxFuture<'static, Result<acp::AnyAgentResult>> {
let delegate = self.delegate.clone();
let end_turn_tx = self.end_turn_tx.clone();
let outgoing_tx = self.outgoing_tx.clone();
let mut cancel_tx = self.cancel_tx.clone();
let session_id = self.session_id;
async move {
match params {
// todo: consider sending an empty request so we get the init response?
AnyAgentRequest::InitializeParams(_) => Ok(AnyAgentResult::InitializeResponse(
acp::InitializeResponse {
is_authenticated: true,
protocol_version: ProtocolVersion::latest(),
},
)),
AnyAgentRequest::AuthenticateParams(_) => {
Err(anyhow!("Authentication not supported"))
let mut content = String::new();
for chunk in params.prompt {
match chunk {
acp::ContentBlock::Text(text_content) => {
content.push_str(&text_content.text);
}
AnyAgentRequest::SendUserMessageParams(message) => {
delegate.clear_completed_plan_entries().await?;
let (tx, rx) = oneshot::channel();
end_turn_tx.borrow_mut().replace(tx);
let mut content = String::new();
for chunk in message.chunks {
match chunk {
agentic_coding_protocol::UserMessageChunk::Text { text } => {
content.push_str(&text)
}
agentic_coding_protocol::UserMessageChunk::Path { path } => {
content.push_str(&format!("@{path:?}"))
}
}
}
outgoing_tx.unbounded_send(SdkMessage::User {
message: Message {
role: Role::User,
content: Content::UntaggedText(content),
id: None,
model: None,
stop_reason: None,
stop_sequence: None,
usage: None,
},
session_id: Some(session_id),
})?;
rx.await??;
Ok(AnyAgentResult::SendUserMessageResponse(
acp::SendUserMessageResponse,
))
acp::ContentBlock::ResourceLink(resource_link) => {
content.push_str(&format!("@{}", resource_link.uri));
}
AnyAgentRequest::CancelSendMessageParams(_) => {
let (done_tx, done_rx) = oneshot::channel();
cancel_tx.send(done_tx).await?;
done_rx.await??;
Ok(AnyAgentResult::CancelSendMessageResponse(
acp::CancelSendMessageResponse,
))
acp::ContentBlock::Audio(_)
| acp::ContentBlock::Image(_)
| acp::ContentBlock::Resource(_) => {
// TODO
}
}
}
.boxed_local()
if let Err(err) = session.outgoing_tx.unbounded_send(SdkMessage::User {
message: Message {
role: Role::User,
content: Content::UntaggedText(content),
id: None,
model: None,
stop_reason: None,
stop_sequence: None,
usage: None,
},
session_id: Some(params.session_id.to_string()),
}) {
return Task::ready(Err(anyhow!(err)));
}
cx.foreground_executor().spawn(async move {
rx.await??;
Ok(())
})
}
fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
let sessions = self.sessions.borrow();
let Some(session) = sessions.get(&session_id) else {
log::warn!("Attempted to cancel nonexistent session {}", session_id);
return;
};
let (done_tx, done_rx) = oneshot::channel();
if session
.cancel_tx
.unbounded_send(done_tx)
.log_err()
.is_some()
{
let end_turn_tx = session.end_turn_tx.clone();
cx.foreground_executor()
.spawn(async move {
done_rx.await??;
if let Some(end_turn_tx) = end_turn_tx.take() {
end_turn_tx.send(Ok(())).ok();
}
anyhow::Ok(())
})
.detach_and_log_err(cx);
}
}
}
@ -287,7 +314,7 @@ enum ClaudeSessionMode {
async fn spawn_claude(
command: &AgentServerCommand,
mode: ClaudeSessionMode,
session_id: Uuid,
session_id: acp::SessionId,
mcp_config_path: &Path,
root_dir: &Path,
) -> Result<Child> {
@ -327,88 +354,103 @@ async fn spawn_claude(
Ok(child)
}
struct ClaudeAgentConnection {
delegate: AcpClientDelegate,
session_id: Uuid,
struct ClaudeAgentSession {
outgoing_tx: UnboundedSender<SdkMessage>,
end_turn_tx: Rc<RefCell<Option<oneshot::Sender<Result<()>>>>>,
cancel_tx: UnboundedSender<oneshot::Sender<Result<()>>>,
_mcp_server: Option<ClaudeMcpServer>,
_mcp_server: Option<ClaudeZedMcpServer>,
_handler_task: Task<()>,
}
impl ClaudeAgentConnection {
impl ClaudeAgentSession {
async fn handle_message(
delegate: AcpClientDelegate,
mut thread_rx: watch::Receiver<WeakEntity<AcpThread>>,
message: SdkMessage,
end_turn_tx: Rc<RefCell<Option<oneshot::Sender<Result<()>>>>>,
tool_id_map: Rc<RefCell<HashMap<String, acp::ToolCallId>>>,
cx: &mut AsyncApp,
) {
match message {
SdkMessage::Assistant { message, .. } | SdkMessage::User { message, .. } => {
SdkMessage::Assistant {
message,
session_id: _,
}
| SdkMessage::User {
message,
session_id: _,
} => {
let Some(thread) = thread_rx
.recv()
.await
.log_err()
.and_then(|entity| entity.upgrade())
else {
log::error!("Received an SDK message but thread is gone");
return;
};
for chunk in message.content.chunks() {
match chunk {
ContentChunk::Text { text } | ContentChunk::UntaggedText(text) => {
delegate
.stream_assistant_message_chunk(StreamAssistantMessageChunkParams {
chunk: acp::AssistantMessageChunk::Text { text },
thread
.update(cx, |thread, cx| {
thread.push_assistant_chunk(text.into(), false, cx)
})
.await
.log_err();
}
ContentChunk::ToolUse { id, name, input } => {
let claude_tool = ClaudeTool::infer(&name, input);
if let ClaudeTool::TodoWrite(Some(params)) = claude_tool {
delegate
.update_plan(acp::UpdatePlanParams {
entries: params.todos.into_iter().map(Into::into).collect(),
})
.await
.log_err();
} else if let Some(resp) = delegate
.push_tool_call(claude_tool.as_acp())
.await
.log_err()
{
tool_id_map.borrow_mut().insert(id, resp.id);
}
thread
.update(cx, |thread, cx| {
if let ClaudeTool::TodoWrite(Some(params)) = claude_tool {
thread.update_plan(
acp::Plan {
entries: params
.todos
.into_iter()
.map(Into::into)
.collect(),
},
cx,
)
} else {
thread.upsert_tool_call(
claude_tool.as_acp(acp::ToolCallId(id.into())),
cx,
);
}
})
.log_err();
}
ContentChunk::ToolResult {
content,
tool_use_id,
} => {
let id = tool_id_map.borrow_mut().remove(&tool_use_id);
if let Some(id) = id {
let content = content.to_string();
delegate
.update_tool_call(UpdateToolCallParams {
tool_call_id: id,
status: acp::ToolCallStatus::Finished,
// Don't unset existing content
content: (!content.is_empty()).then_some(
ToolCallContent::Markdown {
// For now we only include text content
markdown: content,
},
),
})
.await
.log_err();
}
let content = content.to_string();
thread
.update(cx, |thread, cx| {
thread.update_tool_call(
acp::ToolCallId(tool_use_id.into()),
acp::ToolCallStatus::Completed,
(!content.is_empty()).then(|| vec![content.into()]),
cx,
)
})
.log_err();
}
ContentChunk::Image
| ContentChunk::Document
| ContentChunk::Thinking
| ContentChunk::RedactedThinking
| ContentChunk::WebSearchToolResult => {
delegate
.stream_assistant_message_chunk(StreamAssistantMessageChunkParams {
chunk: acp::AssistantMessageChunk::Text {
text: format!("Unsupported content: {:?}", chunk),
},
thread
.update(cx, |thread, cx| {
thread.push_assistant_chunk(
format!("Unsupported content: {:?}", chunk).into(),
false,
cx,
)
})
.await
.log_err();
}
}
@ -592,14 +634,14 @@ enum SdkMessage {
Assistant {
message: Message, // from Anthropic SDK
#[serde(skip_serializing_if = "Option::is_none")]
session_id: Option<Uuid>,
session_id: Option<String>,
},
// A user message
User {
message: Message, // from Anthropic SDK
#[serde(skip_serializing_if = "Option::is_none")]
session_id: Option<Uuid>,
session_id: Option<String>,
},
// Emitted as the last message in a conversation
@ -661,21 +703,6 @@ enum PermissionMode {
Plan,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct McpConfig {
mcp_servers: HashMap<String, McpServerConfig>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct McpServerConfig {
command: String,
args: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
env: Option<HashMap<String, String>>,
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;