parent
1460573dd4
commit
f8667a8379
3 changed files with 1 additions and 900 deletions
|
@ -1347,6 +1347,7 @@ async fn test_cancellation(cx: &mut TestAppContext) {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[gpui::test]
|
#[gpui::test]
|
||||||
|
#[cfg_attr(target_os = "windows", ignore)] // TODO: Fix this test on Windows
|
||||||
async fn test_in_progress_send_canceled_by_next_send(cx: &mut TestAppContext) {
|
async fn test_in_progress_send_canceled_by_next_send(cx: &mut TestAppContext) {
|
||||||
let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
|
let ThreadTest { model, thread, .. } = setup(cx, TestModel::Fake).await;
|
||||||
let fake_model = model.as_fake();
|
let fake_model = model.as_fake();
|
||||||
|
|
|
@ -1,524 +0,0 @@
|
||||||
// Translates old acp agents into the new schema
|
|
||||||
use action_log::ActionLog;
|
|
||||||
use agent_client_protocol as acp;
|
|
||||||
use agentic_coding_protocol::{self as acp_old, AgentRequest as _};
|
|
||||||
use anyhow::{Context as _, Result, anyhow};
|
|
||||||
use futures::channel::oneshot;
|
|
||||||
use gpui::{AppContext as _, AsyncApp, Entity, Task, WeakEntity};
|
|
||||||
use project::Project;
|
|
||||||
use std::{any::Any, cell::RefCell, path::Path, rc::Rc};
|
|
||||||
use ui::App;
|
|
||||||
use util::ResultExt as _;
|
|
||||||
|
|
||||||
use crate::AgentServerCommand;
|
|
||||||
use acp_thread::{AcpThread, AgentConnection, AuthRequired};
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
struct OldAcpClientDelegate {
|
|
||||||
thread: Rc<RefCell<WeakEntity<AcpThread>>>,
|
|
||||||
cx: AsyncApp,
|
|
||||||
next_tool_call_id: Rc<RefCell<u64>>,
|
|
||||||
// sent_buffer_versions: HashMap<Entity<Buffer>, HashMap<u64, BufferSnapshot>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl OldAcpClientDelegate {
|
|
||||||
fn new(thread: Rc<RefCell<WeakEntity<AcpThread>>>, cx: AsyncApp) -> Self {
|
|
||||||
Self {
|
|
||||||
thread,
|
|
||||||
cx,
|
|
||||||
next_tool_call_id: Rc::new(RefCell::new(0)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl acp_old::Client for OldAcpClientDelegate {
|
|
||||||
async fn stream_assistant_message_chunk(
|
|
||||||
&self,
|
|
||||||
params: acp_old::StreamAssistantMessageChunkParams,
|
|
||||||
) -> Result<(), acp_old::Error> {
|
|
||||||
let cx = &mut self.cx.clone();
|
|
||||||
|
|
||||||
cx.update(|cx| {
|
|
||||||
self.thread
|
|
||||||
.borrow()
|
|
||||||
.update(cx, |thread, cx| match params.chunk {
|
|
||||||
acp_old::AssistantMessageChunk::Text { text } => {
|
|
||||||
thread.push_assistant_content_block(text.into(), false, cx)
|
|
||||||
}
|
|
||||||
acp_old::AssistantMessageChunk::Thought { thought } => {
|
|
||||||
thread.push_assistant_content_block(thought.into(), true, cx)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.log_err();
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn request_tool_call_confirmation(
|
|
||||||
&self,
|
|
||||||
request: acp_old::RequestToolCallConfirmationParams,
|
|
||||||
) -> Result<acp_old::RequestToolCallConfirmationResponse, acp_old::Error> {
|
|
||||||
let cx = &mut self.cx.clone();
|
|
||||||
|
|
||||||
let old_acp_id = *self.next_tool_call_id.borrow() + 1;
|
|
||||||
self.next_tool_call_id.replace(old_acp_id);
|
|
||||||
|
|
||||||
let tool_call = into_new_tool_call(
|
|
||||||
acp::ToolCallId(old_acp_id.to_string().into()),
|
|
||||||
request.tool_call,
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut options = match request.confirmation {
|
|
||||||
acp_old::ToolCallConfirmation::Edit { .. } => vec![(
|
|
||||||
acp_old::ToolCallConfirmationOutcome::AlwaysAllow,
|
|
||||||
acp::PermissionOptionKind::AllowAlways,
|
|
||||||
"Always Allow Edits".to_string(),
|
|
||||||
)],
|
|
||||||
acp_old::ToolCallConfirmation::Execute { root_command, .. } => vec![(
|
|
||||||
acp_old::ToolCallConfirmationOutcome::AlwaysAllow,
|
|
||||||
acp::PermissionOptionKind::AllowAlways,
|
|
||||||
format!("Always Allow {}", root_command),
|
|
||||||
)],
|
|
||||||
acp_old::ToolCallConfirmation::Mcp {
|
|
||||||
server_name,
|
|
||||||
tool_name,
|
|
||||||
..
|
|
||||||
} => vec![
|
|
||||||
(
|
|
||||||
acp_old::ToolCallConfirmationOutcome::AlwaysAllowMcpServer,
|
|
||||||
acp::PermissionOptionKind::AllowAlways,
|
|
||||||
format!("Always Allow {}", server_name),
|
|
||||||
),
|
|
||||||
(
|
|
||||||
acp_old::ToolCallConfirmationOutcome::AlwaysAllowTool,
|
|
||||||
acp::PermissionOptionKind::AllowAlways,
|
|
||||||
format!("Always Allow {}", tool_name),
|
|
||||||
),
|
|
||||||
],
|
|
||||||
acp_old::ToolCallConfirmation::Fetch { .. } => vec![(
|
|
||||||
acp_old::ToolCallConfirmationOutcome::AlwaysAllow,
|
|
||||||
acp::PermissionOptionKind::AllowAlways,
|
|
||||||
"Always Allow".to_string(),
|
|
||||||
)],
|
|
||||||
acp_old::ToolCallConfirmation::Other { .. } => vec![(
|
|
||||||
acp_old::ToolCallConfirmationOutcome::AlwaysAllow,
|
|
||||||
acp::PermissionOptionKind::AllowAlways,
|
|
||||||
"Always Allow".to_string(),
|
|
||||||
)],
|
|
||||||
};
|
|
||||||
|
|
||||||
options.extend([
|
|
||||||
(
|
|
||||||
acp_old::ToolCallConfirmationOutcome::Allow,
|
|
||||||
acp::PermissionOptionKind::AllowOnce,
|
|
||||||
"Allow".to_string(),
|
|
||||||
),
|
|
||||||
(
|
|
||||||
acp_old::ToolCallConfirmationOutcome::Reject,
|
|
||||||
acp::PermissionOptionKind::RejectOnce,
|
|
||||||
"Reject".to_string(),
|
|
||||||
),
|
|
||||||
]);
|
|
||||||
|
|
||||||
let mut outcomes = Vec::with_capacity(options.len());
|
|
||||||
let mut acp_options = Vec::with_capacity(options.len());
|
|
||||||
|
|
||||||
for (index, (outcome, kind, label)) in options.into_iter().enumerate() {
|
|
||||||
outcomes.push(outcome);
|
|
||||||
acp_options.push(acp::PermissionOption {
|
|
||||||
id: acp::PermissionOptionId(index.to_string().into()),
|
|
||||||
name: label,
|
|
||||||
kind,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
let response = cx
|
|
||||||
.update(|cx| {
|
|
||||||
self.thread.borrow().update(cx, |thread, cx| {
|
|
||||||
thread.request_tool_call_authorization(tool_call.into(), acp_options, cx)
|
|
||||||
})
|
|
||||||
})??
|
|
||||||
.context("Failed to update thread")?
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let outcome = match response {
|
|
||||||
Ok(option_id) => outcomes[option_id.0.parse::<usize>().unwrap_or(0)],
|
|
||||||
Err(oneshot::Canceled) => acp_old::ToolCallConfirmationOutcome::Cancel,
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(acp_old::RequestToolCallConfirmationResponse {
|
|
||||||
id: acp_old::ToolCallId(old_acp_id),
|
|
||||||
outcome,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn push_tool_call(
|
|
||||||
&self,
|
|
||||||
request: acp_old::PushToolCallParams,
|
|
||||||
) -> Result<acp_old::PushToolCallResponse, acp_old::Error> {
|
|
||||||
let cx = &mut self.cx.clone();
|
|
||||||
|
|
||||||
let old_acp_id = *self.next_tool_call_id.borrow() + 1;
|
|
||||||
self.next_tool_call_id.replace(old_acp_id);
|
|
||||||
|
|
||||||
cx.update(|cx| {
|
|
||||||
self.thread.borrow().update(cx, |thread, cx| {
|
|
||||||
thread.upsert_tool_call(
|
|
||||||
into_new_tool_call(acp::ToolCallId(old_acp_id.to_string().into()), request),
|
|
||||||
cx,
|
|
||||||
)
|
|
||||||
})
|
|
||||||
})??
|
|
||||||
.context("Failed to update thread")?;
|
|
||||||
|
|
||||||
Ok(acp_old::PushToolCallResponse {
|
|
||||||
id: acp_old::ToolCallId(old_acp_id),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn update_tool_call(
|
|
||||||
&self,
|
|
||||||
request: acp_old::UpdateToolCallParams,
|
|
||||||
) -> Result<(), acp_old::Error> {
|
|
||||||
let cx = &mut self.cx.clone();
|
|
||||||
|
|
||||||
cx.update(|cx| {
|
|
||||||
self.thread.borrow().update(cx, |thread, cx| {
|
|
||||||
thread.update_tool_call(
|
|
||||||
acp::ToolCallUpdate {
|
|
||||||
id: acp::ToolCallId(request.tool_call_id.0.to_string().into()),
|
|
||||||
fields: acp::ToolCallUpdateFields {
|
|
||||||
status: Some(into_new_tool_call_status(request.status)),
|
|
||||||
content: Some(
|
|
||||||
request
|
|
||||||
.content
|
|
||||||
.into_iter()
|
|
||||||
.map(into_new_tool_call_content)
|
|
||||||
.collect::<Vec<_>>(),
|
|
||||||
),
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
},
|
|
||||||
cx,
|
|
||||||
)
|
|
||||||
})
|
|
||||||
})?
|
|
||||||
.context("Failed to update thread")??;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn update_plan(&self, request: acp_old::UpdatePlanParams) -> Result<(), acp_old::Error> {
|
|
||||||
let cx = &mut self.cx.clone();
|
|
||||||
|
|
||||||
cx.update(|cx| {
|
|
||||||
self.thread.borrow().update(cx, |thread, cx| {
|
|
||||||
thread.update_plan(
|
|
||||||
acp::Plan {
|
|
||||||
entries: request
|
|
||||||
.entries
|
|
||||||
.into_iter()
|
|
||||||
.map(into_new_plan_entry)
|
|
||||||
.collect(),
|
|
||||||
},
|
|
||||||
cx,
|
|
||||||
)
|
|
||||||
})
|
|
||||||
})?
|
|
||||||
.context("Failed to update thread")?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn read_text_file(
|
|
||||||
&self,
|
|
||||||
acp_old::ReadTextFileParams { path, line, limit }: acp_old::ReadTextFileParams,
|
|
||||||
) -> Result<acp_old::ReadTextFileResponse, acp_old::Error> {
|
|
||||||
let content = self
|
|
||||||
.cx
|
|
||||||
.update(|cx| {
|
|
||||||
self.thread.borrow().update(cx, |thread, cx| {
|
|
||||||
thread.read_text_file(path, line, limit, false, cx)
|
|
||||||
})
|
|
||||||
})?
|
|
||||||
.context("Failed to update thread")?
|
|
||||||
.await?;
|
|
||||||
Ok(acp_old::ReadTextFileResponse { content })
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn write_text_file(
|
|
||||||
&self,
|
|
||||||
acp_old::WriteTextFileParams { path, content }: acp_old::WriteTextFileParams,
|
|
||||||
) -> Result<(), acp_old::Error> {
|
|
||||||
self.cx
|
|
||||||
.update(|cx| {
|
|
||||||
self.thread
|
|
||||||
.borrow()
|
|
||||||
.update(cx, |thread, cx| thread.write_text_file(path, content, cx))
|
|
||||||
})?
|
|
||||||
.context("Failed to update thread")?
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn into_new_tool_call(id: acp::ToolCallId, request: acp_old::PushToolCallParams) -> acp::ToolCall {
|
|
||||||
acp::ToolCall {
|
|
||||||
id,
|
|
||||||
title: request.label,
|
|
||||||
kind: acp_kind_from_old_icon(request.icon),
|
|
||||||
status: acp::ToolCallStatus::InProgress,
|
|
||||||
content: request
|
|
||||||
.content
|
|
||||||
.into_iter()
|
|
||||||
.map(into_new_tool_call_content)
|
|
||||||
.collect(),
|
|
||||||
locations: request
|
|
||||||
.locations
|
|
||||||
.into_iter()
|
|
||||||
.map(into_new_tool_call_location)
|
|
||||||
.collect(),
|
|
||||||
raw_input: None,
|
|
||||||
raw_output: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn acp_kind_from_old_icon(icon: acp_old::Icon) -> acp::ToolKind {
|
|
||||||
match icon {
|
|
||||||
acp_old::Icon::FileSearch => acp::ToolKind::Search,
|
|
||||||
acp_old::Icon::Folder => acp::ToolKind::Search,
|
|
||||||
acp_old::Icon::Globe => acp::ToolKind::Search,
|
|
||||||
acp_old::Icon::Hammer => acp::ToolKind::Other,
|
|
||||||
acp_old::Icon::LightBulb => acp::ToolKind::Think,
|
|
||||||
acp_old::Icon::Pencil => acp::ToolKind::Edit,
|
|
||||||
acp_old::Icon::Regex => acp::ToolKind::Search,
|
|
||||||
acp_old::Icon::Terminal => acp::ToolKind::Execute,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn into_new_tool_call_status(status: acp_old::ToolCallStatus) -> acp::ToolCallStatus {
|
|
||||||
match status {
|
|
||||||
acp_old::ToolCallStatus::Running => acp::ToolCallStatus::InProgress,
|
|
||||||
acp_old::ToolCallStatus::Finished => acp::ToolCallStatus::Completed,
|
|
||||||
acp_old::ToolCallStatus::Error => acp::ToolCallStatus::Failed,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn into_new_tool_call_content(content: acp_old::ToolCallContent) -> acp::ToolCallContent {
|
|
||||||
match content {
|
|
||||||
acp_old::ToolCallContent::Markdown { markdown } => markdown.into(),
|
|
||||||
acp_old::ToolCallContent::Diff { diff } => acp::ToolCallContent::Diff {
|
|
||||||
diff: into_new_diff(diff),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn into_new_diff(diff: acp_old::Diff) -> acp::Diff {
|
|
||||||
acp::Diff {
|
|
||||||
path: diff.path,
|
|
||||||
old_text: diff.old_text,
|
|
||||||
new_text: diff.new_text,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn into_new_tool_call_location(location: acp_old::ToolCallLocation) -> acp::ToolCallLocation {
|
|
||||||
acp::ToolCallLocation {
|
|
||||||
path: location.path,
|
|
||||||
line: location.line,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn into_new_plan_entry(entry: acp_old::PlanEntry) -> acp::PlanEntry {
|
|
||||||
acp::PlanEntry {
|
|
||||||
content: entry.content,
|
|
||||||
priority: into_new_plan_priority(entry.priority),
|
|
||||||
status: into_new_plan_status(entry.status),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn into_new_plan_priority(priority: acp_old::PlanEntryPriority) -> acp::PlanEntryPriority {
|
|
||||||
match priority {
|
|
||||||
acp_old::PlanEntryPriority::Low => acp::PlanEntryPriority::Low,
|
|
||||||
acp_old::PlanEntryPriority::Medium => acp::PlanEntryPriority::Medium,
|
|
||||||
acp_old::PlanEntryPriority::High => acp::PlanEntryPriority::High,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn into_new_plan_status(status: acp_old::PlanEntryStatus) -> acp::PlanEntryStatus {
|
|
||||||
match status {
|
|
||||||
acp_old::PlanEntryStatus::Pending => acp::PlanEntryStatus::Pending,
|
|
||||||
acp_old::PlanEntryStatus::InProgress => acp::PlanEntryStatus::InProgress,
|
|
||||||
acp_old::PlanEntryStatus::Completed => acp::PlanEntryStatus::Completed,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct AcpConnection {
|
|
||||||
pub name: &'static str,
|
|
||||||
pub connection: acp_old::AgentConnection,
|
|
||||||
pub _child_status: Task<Result<()>>,
|
|
||||||
pub current_thread: Rc<RefCell<WeakEntity<AcpThread>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AcpConnection {
|
|
||||||
pub fn stdio(
|
|
||||||
name: &'static str,
|
|
||||||
command: AgentServerCommand,
|
|
||||||
root_dir: &Path,
|
|
||||||
cx: &mut AsyncApp,
|
|
||||||
) -> Task<Result<Self>> {
|
|
||||||
let root_dir = root_dir.to_path_buf();
|
|
||||||
|
|
||||||
cx.spawn(async move |cx| {
|
|
||||||
let mut child = util::command::new_smol_command(&command.path)
|
|
||||||
.args(command.args.iter())
|
|
||||||
.current_dir(root_dir)
|
|
||||||
.stdin(std::process::Stdio::piped())
|
|
||||||
.stdout(std::process::Stdio::piped())
|
|
||||||
.stderr(std::process::Stdio::inherit())
|
|
||||||
.kill_on_drop(true)
|
|
||||||
.spawn()?;
|
|
||||||
|
|
||||||
let stdin = child.stdin.take().unwrap();
|
|
||||||
let stdout = child.stdout.take().unwrap();
|
|
||||||
log::trace!("Spawned (pid: {})", child.id());
|
|
||||||
|
|
||||||
let foreground_executor = cx.foreground_executor().clone();
|
|
||||||
|
|
||||||
let thread_rc = Rc::new(RefCell::new(WeakEntity::new_invalid()));
|
|
||||||
|
|
||||||
let (connection, io_fut) = acp_old::AgentConnection::connect_to_agent(
|
|
||||||
OldAcpClientDelegate::new(thread_rc.clone(), cx.clone()),
|
|
||||||
stdin,
|
|
||||||
stdout,
|
|
||||||
move |fut| foreground_executor.spawn(fut).detach(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let io_task = cx.background_spawn(async move {
|
|
||||||
io_fut.await.log_err();
|
|
||||||
});
|
|
||||||
|
|
||||||
let child_status = cx.background_spawn(async move {
|
|
||||||
let result = match child.status().await {
|
|
||||||
Err(e) => Err(anyhow!(e)),
|
|
||||||
Ok(result) if result.success() => Ok(()),
|
|
||||||
Ok(result) => Err(anyhow!(result)),
|
|
||||||
};
|
|
||||||
drop(io_task);
|
|
||||||
result
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(Self {
|
|
||||||
name,
|
|
||||||
connection,
|
|
||||||
_child_status: child_status,
|
|
||||||
current_thread: thread_rc,
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AgentConnection for AcpConnection {
|
|
||||||
fn new_thread(
|
|
||||||
self: Rc<Self>,
|
|
||||||
project: Entity<Project>,
|
|
||||||
_cwd: &Path,
|
|
||||||
cx: &mut App,
|
|
||||||
) -> Task<Result<Entity<AcpThread>>> {
|
|
||||||
let task = self.connection.request_any(
|
|
||||||
acp_old::InitializeParams {
|
|
||||||
protocol_version: acp_old::ProtocolVersion::latest(),
|
|
||||||
}
|
|
||||||
.into_any(),
|
|
||||||
);
|
|
||||||
let current_thread = self.current_thread.clone();
|
|
||||||
cx.spawn(async move |cx| {
|
|
||||||
let result = task.await?;
|
|
||||||
let result = acp_old::InitializeParams::response_from_any(result)?;
|
|
||||||
|
|
||||||
if !result.is_authenticated {
|
|
||||||
anyhow::bail!(AuthRequired::new())
|
|
||||||
}
|
|
||||||
|
|
||||||
cx.update(|cx| {
|
|
||||||
let thread = cx.new(|cx| {
|
|
||||||
let session_id = acp::SessionId("acp-old-no-id".into());
|
|
||||||
let action_log = cx.new(|_| ActionLog::new(project.clone()));
|
|
||||||
AcpThread::new(self.name, self.clone(), project, action_log, session_id)
|
|
||||||
});
|
|
||||||
current_thread.replace(thread.downgrade());
|
|
||||||
thread
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn auth_methods(&self) -> &[acp::AuthMethod] {
|
|
||||||
&[]
|
|
||||||
}
|
|
||||||
|
|
||||||
fn authenticate(&self, _method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
|
|
||||||
let task = self
|
|
||||||
.connection
|
|
||||||
.request_any(acp_old::AuthenticateParams.into_any());
|
|
||||||
cx.foreground_executor().spawn(async move {
|
|
||||||
task.await?;
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn prompt(
|
|
||||||
&self,
|
|
||||||
_id: Option<acp_thread::UserMessageId>,
|
|
||||||
params: acp::PromptRequest,
|
|
||||||
cx: &mut App,
|
|
||||||
) -> Task<Result<acp::PromptResponse>> {
|
|
||||||
let chunks = params
|
|
||||||
.prompt
|
|
||||||
.into_iter()
|
|
||||||
.filter_map(|block| match block {
|
|
||||||
acp::ContentBlock::Text(text) => {
|
|
||||||
Some(acp_old::UserMessageChunk::Text { text: text.text })
|
|
||||||
}
|
|
||||||
acp::ContentBlock::ResourceLink(link) => Some(acp_old::UserMessageChunk::Path {
|
|
||||||
path: link.uri.into(),
|
|
||||||
}),
|
|
||||||
_ => None,
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let task = self
|
|
||||||
.connection
|
|
||||||
.request_any(acp_old::SendUserMessageParams { chunks }.into_any());
|
|
||||||
cx.foreground_executor().spawn(async move {
|
|
||||||
task.await?;
|
|
||||||
anyhow::Ok(acp::PromptResponse {
|
|
||||||
stop_reason: acp::StopReason::EndTurn,
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn prompt_capabilities(&self) -> acp::PromptCapabilities {
|
|
||||||
acp::PromptCapabilities {
|
|
||||||
image: false,
|
|
||||||
audio: false,
|
|
||||||
embedded_context: false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn cancel(&self, _session_id: &acp::SessionId, cx: &mut App) {
|
|
||||||
let task = self
|
|
||||||
.connection
|
|
||||||
.request_any(acp_old::CancelSendMessageParams.into_any());
|
|
||||||
cx.foreground_executor()
|
|
||||||
.spawn(async move {
|
|
||||||
task.await?;
|
|
||||||
anyhow::Ok(())
|
|
||||||
})
|
|
||||||
.detach_and_log_err(cx)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
|
|
||||||
self
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,376 +0,0 @@
|
||||||
use acp_tools::AcpConnectionRegistry;
|
|
||||||
use action_log::ActionLog;
|
|
||||||
use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
|
|
||||||
use anyhow::anyhow;
|
|
||||||
use collections::HashMap;
|
|
||||||
use futures::AsyncBufReadExt as _;
|
|
||||||
use futures::channel::oneshot;
|
|
||||||
use futures::io::BufReader;
|
|
||||||
use project::Project;
|
|
||||||
use serde::Deserialize;
|
|
||||||
use std::path::Path;
|
|
||||||
use std::rc::Rc;
|
|
||||||
use std::{any::Any, cell::RefCell};
|
|
||||||
|
|
||||||
use anyhow::{Context as _, Result};
|
|
||||||
use gpui::{App, AppContext as _, AsyncApp, Entity, Task, WeakEntity};
|
|
||||||
|
|
||||||
use crate::{AgentServerCommand, acp::UnsupportedVersion};
|
|
||||||
use acp_thread::{AcpThread, AgentConnection, AuthRequired, LoadError};
|
|
||||||
|
|
||||||
pub struct AcpConnection {
|
|
||||||
server_name: &'static str,
|
|
||||||
connection: Rc<acp::ClientSideConnection>,
|
|
||||||
sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
|
|
||||||
auth_methods: Vec<acp::AuthMethod>,
|
|
||||||
prompt_capabilities: acp::PromptCapabilities,
|
|
||||||
_io_task: Task<Result<()>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct AcpSession {
|
|
||||||
thread: WeakEntity<AcpThread>,
|
|
||||||
suppress_abort_err: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::V1;
|
|
||||||
|
|
||||||
impl AcpConnection {
|
|
||||||
pub async fn stdio(
|
|
||||||
server_name: &'static str,
|
|
||||||
command: AgentServerCommand,
|
|
||||||
root_dir: &Path,
|
|
||||||
cx: &mut AsyncApp,
|
|
||||||
) -> Result<Self> {
|
|
||||||
let mut child = util::command::new_smol_command(&command.path)
|
|
||||||
.args(command.args.iter().map(|arg| arg.as_str()))
|
|
||||||
.envs(command.env.iter().flatten())
|
|
||||||
.current_dir(root_dir)
|
|
||||||
.stdin(std::process::Stdio::piped())
|
|
||||||
.stdout(std::process::Stdio::piped())
|
|
||||||
.stderr(std::process::Stdio::piped())
|
|
||||||
.kill_on_drop(true)
|
|
||||||
.spawn()?;
|
|
||||||
|
|
||||||
let stdout = child.stdout.take().context("Failed to take stdout")?;
|
|
||||||
let stdin = child.stdin.take().context("Failed to take stdin")?;
|
|
||||||
let stderr = child.stderr.take().context("Failed to take stderr")?;
|
|
||||||
log::trace!("Spawned (pid: {})", child.id());
|
|
||||||
|
|
||||||
let sessions = Rc::new(RefCell::new(HashMap::default()));
|
|
||||||
|
|
||||||
let client = ClientDelegate {
|
|
||||||
sessions: sessions.clone(),
|
|
||||||
cx: cx.clone(),
|
|
||||||
};
|
|
||||||
let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
|
|
||||||
let foreground_executor = cx.foreground_executor().clone();
|
|
||||||
move |fut| {
|
|
||||||
foreground_executor.spawn(fut).detach();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let io_task = cx.background_spawn(io_task);
|
|
||||||
|
|
||||||
cx.background_spawn(async move {
|
|
||||||
let mut stderr = BufReader::new(stderr);
|
|
||||||
let mut line = String::new();
|
|
||||||
while let Ok(n) = stderr.read_line(&mut line).await
|
|
||||||
&& n > 0
|
|
||||||
{
|
|
||||||
log::warn!("agent stderr: {}", &line);
|
|
||||||
line.clear();
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.detach();
|
|
||||||
|
|
||||||
cx.spawn({
|
|
||||||
let sessions = sessions.clone();
|
|
||||||
async move |cx| {
|
|
||||||
let status = child.status().await?;
|
|
||||||
|
|
||||||
for session in sessions.borrow().values() {
|
|
||||||
session
|
|
||||||
.thread
|
|
||||||
.update(cx, |thread, cx| {
|
|
||||||
thread.emit_load_error(LoadError::Exited { status }, cx)
|
|
||||||
})
|
|
||||||
.ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
anyhow::Ok(())
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.detach();
|
|
||||||
|
|
||||||
let connection = Rc::new(connection);
|
|
||||||
|
|
||||||
cx.update(|cx| {
|
|
||||||
AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
|
|
||||||
registry.set_active_connection(server_name, &connection, cx)
|
|
||||||
});
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let response = connection
|
|
||||||
.initialize(acp::InitializeRequest {
|
|
||||||
protocol_version: acp::VERSION,
|
|
||||||
client_capabilities: acp::ClientCapabilities {
|
|
||||||
fs: acp::FileSystemCapability {
|
|
||||||
read_text_file: true,
|
|
||||||
write_text_file: true,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
})
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
|
|
||||||
return Err(UnsupportedVersion.into());
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Self {
|
|
||||||
auth_methods: response.auth_methods,
|
|
||||||
connection,
|
|
||||||
server_name,
|
|
||||||
sessions,
|
|
||||||
prompt_capabilities: response.agent_capabilities.prompt_capabilities,
|
|
||||||
_io_task: io_task,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AgentConnection for AcpConnection {
|
|
||||||
fn new_thread(
|
|
||||||
self: Rc<Self>,
|
|
||||||
project: Entity<Project>,
|
|
||||||
cwd: &Path,
|
|
||||||
cx: &mut App,
|
|
||||||
) -> Task<Result<Entity<AcpThread>>> {
|
|
||||||
let conn = self.connection.clone();
|
|
||||||
let sessions = self.sessions.clone();
|
|
||||||
let cwd = cwd.to_path_buf();
|
|
||||||
cx.spawn(async move |cx| {
|
|
||||||
let response = conn
|
|
||||||
.new_session(acp::NewSessionRequest {
|
|
||||||
mcp_servers: vec![],
|
|
||||||
cwd,
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.map_err(|err| {
|
|
||||||
if err.code == acp::ErrorCode::AUTH_REQUIRED.code {
|
|
||||||
let mut error = AuthRequired::new();
|
|
||||||
|
|
||||||
if err.message != acp::ErrorCode::AUTH_REQUIRED.message {
|
|
||||||
error = error.with_description(err.message);
|
|
||||||
}
|
|
||||||
|
|
||||||
anyhow!(error)
|
|
||||||
} else {
|
|
||||||
anyhow!(err)
|
|
||||||
}
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let session_id = response.session_id;
|
|
||||||
let action_log = cx.new(|_| ActionLog::new(project.clone()))?;
|
|
||||||
let thread = cx.new(|_cx| {
|
|
||||||
AcpThread::new(
|
|
||||||
self.server_name,
|
|
||||||
self.clone(),
|
|
||||||
project,
|
|
||||||
action_log,
|
|
||||||
session_id.clone(),
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let session = AcpSession {
|
|
||||||
thread: thread.downgrade(),
|
|
||||||
suppress_abort_err: false,
|
|
||||||
};
|
|
||||||
sessions.borrow_mut().insert(session_id, session);
|
|
||||||
|
|
||||||
Ok(thread)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn auth_methods(&self) -> &[acp::AuthMethod] {
|
|
||||||
&self.auth_methods
|
|
||||||
}
|
|
||||||
|
|
||||||
fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
|
|
||||||
let conn = self.connection.clone();
|
|
||||||
cx.foreground_executor().spawn(async move {
|
|
||||||
let result = conn
|
|
||||||
.authenticate(acp::AuthenticateRequest {
|
|
||||||
method_id: method_id.clone(),
|
|
||||||
})
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(result)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn prompt(
|
|
||||||
&self,
|
|
||||||
_id: Option<acp_thread::UserMessageId>,
|
|
||||||
params: acp::PromptRequest,
|
|
||||||
cx: &mut App,
|
|
||||||
) -> Task<Result<acp::PromptResponse>> {
|
|
||||||
let conn = self.connection.clone();
|
|
||||||
let sessions = self.sessions.clone();
|
|
||||||
let session_id = params.session_id.clone();
|
|
||||||
cx.foreground_executor().spawn(async move {
|
|
||||||
let result = conn.prompt(params).await;
|
|
||||||
|
|
||||||
let mut suppress_abort_err = false;
|
|
||||||
|
|
||||||
if let Some(session) = sessions.borrow_mut().get_mut(&session_id) {
|
|
||||||
suppress_abort_err = session.suppress_abort_err;
|
|
||||||
session.suppress_abort_err = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
match result {
|
|
||||||
Ok(response) => Ok(response),
|
|
||||||
Err(err) => {
|
|
||||||
if err.code != ErrorCode::INTERNAL_ERROR.code {
|
|
||||||
anyhow::bail!(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
let Some(data) = &err.data else {
|
|
||||||
anyhow::bail!(err)
|
|
||||||
};
|
|
||||||
|
|
||||||
// Temporary workaround until the following PR is generally available:
|
|
||||||
// https://github.com/google-gemini/gemini-cli/pull/6656
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
#[serde(deny_unknown_fields)]
|
|
||||||
struct ErrorDetails {
|
|
||||||
details: Box<str>,
|
|
||||||
}
|
|
||||||
|
|
||||||
match serde_json::from_value(data.clone()) {
|
|
||||||
Ok(ErrorDetails { details }) => {
|
|
||||||
if suppress_abort_err && details.contains("This operation was aborted")
|
|
||||||
{
|
|
||||||
Ok(acp::PromptResponse {
|
|
||||||
stop_reason: acp::StopReason::Cancelled,
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
Err(anyhow!(details))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(_) => Err(anyhow!(err)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn prompt_capabilities(&self) -> acp::PromptCapabilities {
|
|
||||||
self.prompt_capabilities
|
|
||||||
}
|
|
||||||
|
|
||||||
fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
|
|
||||||
if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
|
|
||||||
session.suppress_abort_err = true;
|
|
||||||
}
|
|
||||||
let conn = self.connection.clone();
|
|
||||||
let params = acp::CancelNotification {
|
|
||||||
session_id: session_id.clone(),
|
|
||||||
};
|
|
||||||
cx.foreground_executor()
|
|
||||||
.spawn(async move { conn.cancel(params).await })
|
|
||||||
.detach();
|
|
||||||
}
|
|
||||||
|
|
||||||
fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
|
|
||||||
self
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct ClientDelegate {
|
|
||||||
sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
|
|
||||||
cx: AsyncApp,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl acp::Client for ClientDelegate {
|
|
||||||
async fn request_permission(
|
|
||||||
&self,
|
|
||||||
arguments: acp::RequestPermissionRequest,
|
|
||||||
) -> Result<acp::RequestPermissionResponse, acp::Error> {
|
|
||||||
let cx = &mut self.cx.clone();
|
|
||||||
let rx = self
|
|
||||||
.sessions
|
|
||||||
.borrow()
|
|
||||||
.get(&arguments.session_id)
|
|
||||||
.context("Failed to get session")?
|
|
||||||
.thread
|
|
||||||
.update(cx, |thread, cx| {
|
|
||||||
thread.request_tool_call_authorization(arguments.tool_call, arguments.options, cx)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let result = rx?.await;
|
|
||||||
|
|
||||||
let outcome = match result {
|
|
||||||
Ok(option) => acp::RequestPermissionOutcome::Selected { option_id: option },
|
|
||||||
Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(acp::RequestPermissionResponse { outcome })
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn write_text_file(
|
|
||||||
&self,
|
|
||||||
arguments: acp::WriteTextFileRequest,
|
|
||||||
) -> Result<(), acp::Error> {
|
|
||||||
let cx = &mut self.cx.clone();
|
|
||||||
let task = self
|
|
||||||
.sessions
|
|
||||||
.borrow()
|
|
||||||
.get(&arguments.session_id)
|
|
||||||
.context("Failed to get session")?
|
|
||||||
.thread
|
|
||||||
.update(cx, |thread, cx| {
|
|
||||||
thread.write_text_file(arguments.path, arguments.content, cx)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
task.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn read_text_file(
|
|
||||||
&self,
|
|
||||||
arguments: acp::ReadTextFileRequest,
|
|
||||||
) -> Result<acp::ReadTextFileResponse, acp::Error> {
|
|
||||||
let cx = &mut self.cx.clone();
|
|
||||||
let task = self
|
|
||||||
.sessions
|
|
||||||
.borrow()
|
|
||||||
.get(&arguments.session_id)
|
|
||||||
.context("Failed to get session")?
|
|
||||||
.thread
|
|
||||||
.update(cx, |thread, cx| {
|
|
||||||
thread.read_text_file(arguments.path, arguments.line, arguments.limit, false, cx)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let content = task.await?;
|
|
||||||
|
|
||||||
Ok(acp::ReadTextFileResponse { content })
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn session_notification(
|
|
||||||
&self,
|
|
||||||
notification: acp::SessionNotification,
|
|
||||||
) -> Result<(), acp::Error> {
|
|
||||||
let cx = &mut self.cx.clone();
|
|
||||||
let sessions = self.sessions.borrow();
|
|
||||||
let session = sessions
|
|
||||||
.get(¬ification.session_id)
|
|
||||||
.context("Failed to get session")?;
|
|
||||||
|
|
||||||
session.thread.update(cx, |thread, cx| {
|
|
||||||
thread.handle_session_update(notification.update, cx)
|
|
||||||
})??;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Add table
Add a link
Reference in a new issue