diff --git a/.github/workflows/agent_servers_e2e.yml b/.github/workflows/agent_servers_e2e.yml new file mode 100644 index 0000000000..b2c518409e --- /dev/null +++ b/.github/workflows/agent_servers_e2e.yml @@ -0,0 +1,132 @@ +name: Agent Servers E2E Tests + +on: + schedule: + - cron: "0 12 * * *" + + push: + branches: + - as-e2e-ci + + pull_request: + branches: + - "**" + paths: + - "crates/agent_servers/**" + - "crates/acp_thread/**" + - ".github/workflows/agent_servers_e2e.yml" + + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }} + cancel-in-progress: true + +env: + CARGO_TERM_COLOR: always + CARGO_INCREMENTAL: 0 + RUST_BACKTRACE: 1 + ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }} + # GEMINI_API_KEY: ${{ secrets.GEMINI_API_KEY }} + +jobs: + e2e-tests: + name: Run Agent Servers E2E Tests + if: github.repository_owner == 'zed-industries' + timeout-minutes: 20 + runs-on: + - buildjet-16vcpu-ubuntu-2204 + + steps: + - name: Add Rust to the PATH + run: echo "$HOME/.cargo/bin" >> "$GITHUB_PATH" + + - name: Checkout repo + uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4 + with: + clean: false + + # - name: Checkout gemini-cli repo + # uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4 + # with: + # repository: zed-industries/gemini-cli + # ref: migrate-acp + # path: gemini-cli + # clean: false + + - name: Cache dependencies + uses: swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2 + with: + save-if: ${{ github.ref == 'refs/heads/main' }} + cache-provider: "buildjet" + + - name: Install Linux dependencies + run: ./script/linux + + - name: Configure CI + run: | + mkdir -p ./../.cargo + cp ./.cargo/ci-config.toml ./../.cargo/config.toml + + - name: Install Node + uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4 + with: + node-version: "18" + + - name: Install Claude Code CLI + shell: bash -euxo pipefail {0} + run: | + npm install -g @anthropic-ai/claude-code + # Verify installation + which claude || echo "Claude CLI not found in PATH" + # Skip authentication if API key is not set (tests may use mock) + if [ -n "$ANTHROPIC_API_KEY" ]; then + echo "Anthropic API key is configured" + fi + + # - name: Install and setup Gemini CLI + # shell: bash -euxo pipefail {0} + # run: | + # # Also install dependencies for local gemini-cli repo + # pushd gemini-cli + # npm install + # npm run build + # popd + + # # Verify installations + # which gemini || echo "Gemini CLI not found in PATH" + # # Skip authentication if API key is not set (tests may use mock) + # if [ -n "$GEMINI_API_KEY" ]; then + # echo "Gemini API key is configured" + # fi + + - name: Limit target directory size + shell: bash -euxo pipefail {0} + run: script/clear-target-dir-if-larger-than 100 + + - name: Install nextest + shell: bash -euxo pipefail {0} + run: | + cargo install cargo-nextest --locked + + - name: Build Zed + shell: bash -euxo pipefail {0} + run: | + cargo build + + - name: Run E2E tests + shell: bash -euxo pipefail {0} + run: | + cargo nextest run \ + --package agent_servers \ + --features e2e \ + --no-fail-fast \ + claude + + # Even the Linux runner is not stateful, in theory there is no need to do this cleanup. + # But, to avoid potential issues in the future if we choose to use a stateful Linux runner and forget to add code + # to clean up the config file, I’ve included the cleanup code here as a precaution. + # While it’s not strictly necessary at this moment, I believe it’s better to err on the side of caution. + - name: Clean CI config file + if: always() + run: rm -rf ./../.cargo diff --git a/crates/agent_servers/src/claude.rs b/crates/agent_servers/src/claude.rs index 09d08fdcf8..031ad7727d 100644 --- a/crates/agent_servers/src/claude.rs +++ b/crates/agent_servers/src/claude.rs @@ -6,7 +6,7 @@ use context_server::listener::McpServerTool; use project::Project; use settings::SettingsStore; use smol::process::Child; -use std::cell::{Cell, RefCell}; +use std::cell::RefCell; use std::fmt::Display; use std::path::Path; use std::rc::Rc; @@ -153,20 +153,17 @@ impl AgentConnection for ClaudeAgentConnection { }) .detach(); - let pending_cancellation = Rc::new(Cell::new(PendingCancellation::None)); + let turn_state = Rc::new(RefCell::new(TurnState::None)); - let end_turn_tx = Rc::new(RefCell::new(None)); let handler_task = cx.spawn({ - let end_turn_tx = end_turn_tx.clone(); + let turn_state = turn_state.clone(); let mut thread_rx = thread_rx.clone(); - let cancellation_state = pending_cancellation.clone(); async move |cx| { while let Some(message) = incoming_message_rx.next().await { ClaudeAgentSession::handle_message( thread_rx.clone(), message, - end_turn_tx.clone(), - cancellation_state.clone(), + turn_state.clone(), cx, ) .await @@ -192,8 +189,7 @@ impl AgentConnection for ClaudeAgentConnection { let session = ClaudeAgentSession { outgoing_tx, - end_turn_tx, - pending_cancellation, + turn_state, _handler_task: handler_task, _mcp_server: Some(permission_mcp_server), }; @@ -225,8 +221,8 @@ impl AgentConnection for ClaudeAgentConnection { ))); }; - let (tx, rx) = oneshot::channel(); - session.end_turn_tx.borrow_mut().replace(tx); + let (end_tx, end_rx) = oneshot::channel(); + session.turn_state.replace(TurnState::InProgress { end_tx }); let mut content = String::new(); for chunk in params.prompt { @@ -260,12 +256,7 @@ impl AgentConnection for ClaudeAgentConnection { return Task::ready(Err(anyhow!(err))); } - let cancellation_state = session.pending_cancellation.clone(); - cx.foreground_executor().spawn(async move { - let result = rx.await??; - cancellation_state.set(PendingCancellation::None); - Ok(result) - }) + cx.foreground_executor().spawn(async move { end_rx.await? }) } fn cancel(&self, session_id: &acp::SessionId, _cx: &mut App) { @@ -277,7 +268,15 @@ impl AgentConnection for ClaudeAgentConnection { let request_id = new_request_id(); - session.pending_cancellation.set(PendingCancellation::Sent { + let turn_state = session.turn_state.take(); + let TurnState::InProgress { end_tx } = turn_state else { + // Already cancelled or idle, put it back + session.turn_state.replace(turn_state); + return; + }; + + session.turn_state.replace(TurnState::CancelRequested { + end_tx, request_id: request_id.clone(), }); @@ -349,28 +348,56 @@ fn spawn_claude( struct ClaudeAgentSession { outgoing_tx: UnboundedSender, - end_turn_tx: Rc>>>>, - pending_cancellation: Rc>, + turn_state: Rc>, _mcp_server: Option, _handler_task: Task<()>, } -#[derive(Debug, Default, PartialEq)] -enum PendingCancellation { +#[derive(Debug, Default)] +enum TurnState { #[default] None, - Sent { + InProgress { + end_tx: oneshot::Sender>, + }, + CancelRequested { + end_tx: oneshot::Sender>, request_id: String, }, - Confirmed, + CancelConfirmed { + end_tx: oneshot::Sender>, + }, +} + +impl TurnState { + fn is_cancelled(&self) -> bool { + matches!(self, TurnState::CancelConfirmed { .. }) + } + + fn end_tx(self) -> Option>> { + match self { + TurnState::None => None, + TurnState::InProgress { end_tx, .. } => Some(end_tx), + TurnState::CancelRequested { end_tx, .. } => Some(end_tx), + TurnState::CancelConfirmed { end_tx } => Some(end_tx), + } + } + + fn confirm_cancellation(self, id: &str) -> Self { + match self { + TurnState::CancelRequested { request_id, end_tx } if request_id == id => { + TurnState::CancelConfirmed { end_tx } + } + _ => self, + } + } } impl ClaudeAgentSession { async fn handle_message( mut thread_rx: watch::Receiver>, message: SdkMessage, - end_turn_tx: Rc>>>>, - pending_cancellation: Rc>, + turn_state: Rc>, cx: &mut AsyncApp, ) { match message { @@ -393,15 +420,13 @@ impl ClaudeAgentSession { for chunk in message.content.chunks() { match chunk { ContentChunk::Text { text } | ContentChunk::UntaggedText(text) => { - let state = pending_cancellation.take(); - if state != PendingCancellation::Confirmed { + if !turn_state.borrow().is_cancelled() { thread .update(cx, |thread, cx| { thread.push_user_content_block(text.into(), cx) }) .log_err(); } - pending_cancellation.set(state); } ContentChunk::ToolResult { content, @@ -414,7 +439,12 @@ impl ClaudeAgentSession { acp::ToolCallUpdate { id: acp::ToolCallId(tool_use_id.into()), fields: acp::ToolCallUpdateFields { - status: Some(acp::ToolCallStatus::Completed), + status: if turn_state.borrow().is_cancelled() { + // Do not set to completed if turn was cancelled + None + } else { + Some(acp::ToolCallStatus::Completed) + }, content: (!content.is_empty()) .then(|| vec![content.into()]), ..Default::default() @@ -541,40 +571,38 @@ impl ClaudeAgentSession { result, .. } => { - if let Some(end_turn_tx) = end_turn_tx.borrow_mut().take() { - if is_error - || (subtype == ResultErrorType::ErrorDuringExecution - && pending_cancellation.take() != PendingCancellation::Confirmed) - { - end_turn_tx - .send(Err(anyhow!( - "Error: {}", - result.unwrap_or_else(|| subtype.to_string()) - ))) - .ok(); - } else { - let stop_reason = match subtype { - ResultErrorType::Success => acp::StopReason::EndTurn, - ResultErrorType::ErrorMaxTurns => acp::StopReason::MaxTurnRequests, - ResultErrorType::ErrorDuringExecution => acp::StopReason::Cancelled, - }; - end_turn_tx - .send(Ok(acp::PromptResponse { stop_reason })) - .ok(); - } + let turn_state = turn_state.take(); + let was_cancelled = turn_state.is_cancelled(); + let Some(end_turn_tx) = turn_state.end_tx() else { + debug_panic!("Received `SdkMessage::Result` but there wasn't an active turn"); + return; + }; + + if is_error || (!was_cancelled && subtype == ResultErrorType::ErrorDuringExecution) + { + end_turn_tx + .send(Err(anyhow!( + "Error: {}", + result.unwrap_or_else(|| subtype.to_string()) + ))) + .ok(); + } else { + let stop_reason = match subtype { + ResultErrorType::Success => acp::StopReason::EndTurn, + ResultErrorType::ErrorMaxTurns => acp::StopReason::MaxTurnRequests, + ResultErrorType::ErrorDuringExecution => acp::StopReason::Cancelled, + }; + end_turn_tx + .send(Ok(acp::PromptResponse { stop_reason })) + .ok(); } } SdkMessage::ControlResponse { response } => { if matches!(response.subtype, ResultErrorType::Success) { - let pending_cancellation_value = pending_cancellation.take(); - - if let PendingCancellation::Sent { request_id } = &pending_cancellation_value - && request_id == &response.request_id - { - pending_cancellation.set(PendingCancellation::Confirmed); - } else { - pending_cancellation.set(pending_cancellation_value); - } + let new_state = turn_state.take().confirm_cancellation(&response.request_id); + turn_state.replace(new_state); + } else { + log::error!("Control response error: {:?}", response); } } SdkMessage::System { .. } => {} @@ -865,9 +893,10 @@ pub(crate) mod tests { #[cfg_attr(not(feature = "e2e"), ignore)] async fn test_todo_plan(cx: &mut TestAppContext) { let fs = e2e_tests::init_test(cx).await; + let tempdir = tempfile::tempdir().unwrap(); let project = Project::test(fs, [], cx).await; let thread = - e2e_tests::new_test_thread(ClaudeCode, project.clone(), "/private/tmp", cx).await; + e2e_tests::new_test_thread(ClaudeCode, project.clone(), tempdir.path(), cx).await; thread .update(cx, |thread, cx| { @@ -921,6 +950,8 @@ pub(crate) mod tests { )); assert_eq!(thread.plan().entries.len(), entries_len); }); + + drop(tempdir); } #[test] diff --git a/crates/agent_servers/src/e2e_tests.rs b/crates/agent_servers/src/e2e_tests.rs index 05f874bd30..bab1d22a9a 100644 --- a/crates/agent_servers/src/e2e_tests.rs +++ b/crates/agent_servers/src/e2e_tests.rs @@ -13,12 +13,12 @@ use gpui::{Entity, TestAppContext}; use indoc::indoc; use project::{FakeFs, Project}; use settings::{Settings, SettingsStore}; -use util::path; pub async fn test_basic(server: impl AgentServer + 'static, cx: &mut TestAppContext) { let fs = init_test(cx).await; + let tempdir = tempfile::tempdir().unwrap(); let project = Project::test(fs, [], cx).await; - let thread = new_test_thread(server, project.clone(), "/private/tmp", cx).await; + let thread = new_test_thread(server, project.clone(), tempdir.path(), cx).await; thread .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx)) @@ -40,6 +40,8 @@ pub async fn test_basic(server: impl AgentServer + 'static, cx: &mut TestAppCont AgentThreadEntry::AssistantMessage(_) )); }); + + drop(tempdir); } pub async fn test_path_mentions(server: impl AgentServer + 'static, cx: &mut TestAppContext) { @@ -118,7 +120,7 @@ pub async fn test_tool_call(server: impl AgentServer + 'static, cx: &mut TestApp std::fs::write(&foo_path, "Lorem ipsum dolor").expect("failed to write file"); let project = Project::example([tempdir.path()], &mut cx.to_async()).await; - let thread = new_test_thread(server, project.clone(), "/private/tmp", cx).await; + let thread = new_test_thread(server, project.clone(), tempdir.path(), cx).await; thread .update(cx, |thread, cx| { @@ -156,8 +158,9 @@ pub async fn test_tool_call_with_permission( cx: &mut TestAppContext, ) { let fs = init_test(cx).await; - let project = Project::test(fs, [path!("/private/tmp").as_ref()], cx).await; - let thread = new_test_thread(server, project.clone(), "/private/tmp", cx).await; + let tempdir = tempfile::tempdir().unwrap(); + let project = Project::test(fs, [tempdir.path()], cx).await; + let thread = new_test_thread(server, project.clone(), tempdir.path(), cx).await; let full_turn = thread.update(cx, |thread, cx| { thread.send_raw( r#"Run exactly `touch hello.txt && echo "Hello, world!" | tee hello.txt` in the terminal."#, @@ -239,14 +242,16 @@ pub async fn test_tool_call_with_permission( "Expected content to contain 'Hello'" ); }); + + drop(tempdir); } pub async fn test_cancel(server: impl AgentServer + 'static, cx: &mut TestAppContext) { let fs = init_test(cx).await; - - let project = Project::test(fs, [path!("/private/tmp").as_ref()], cx).await; - let thread = new_test_thread(server, project.clone(), "/private/tmp", cx).await; - let full_turn = thread.update(cx, |thread, cx| { + let tempdir = tempfile::tempdir().unwrap(); + let project = Project::test(fs, [tempdir.path()], cx).await; + let thread = new_test_thread(server, project.clone(), tempdir.path(), cx).await; + let _ = thread.update(cx, |thread, cx| { thread.send_raw( r#"Run exactly `touch hello.txt && echo "Hello, world!" | tee hello.txt` in the terminal."#, cx, @@ -285,9 +290,8 @@ pub async fn test_cancel(server: impl AgentServer + 'static, cx: &mut TestAppCon id.clone() }); - let _ = thread.update(cx, |thread, cx| thread.cancel(cx)); - full_turn.await.unwrap(); - thread.read_with(cx, |thread, _| { + thread.update(cx, |thread, cx| thread.cancel(cx)).await; + thread.read_with(cx, |thread, _cx| { let AgentThreadEntry::ToolCall(ToolCall { status: ToolCallStatus::Canceled, .. @@ -309,12 +313,15 @@ pub async fn test_cancel(server: impl AgentServer + 'static, cx: &mut TestAppCon AgentThreadEntry::AssistantMessage(..), )) }); + + drop(tempdir); } pub async fn test_thread_drop(server: impl AgentServer + 'static, cx: &mut TestAppContext) { let fs = init_test(cx).await; + let tempdir = tempfile::tempdir().unwrap(); let project = Project::test(fs, [], cx).await; - let thread = new_test_thread(server, project.clone(), "/private/tmp", cx).await; + let thread = new_test_thread(server, project.clone(), tempdir.path(), cx).await; thread .update(cx, |thread, cx| thread.send_raw("Hello from test!", cx)) @@ -330,6 +337,8 @@ pub async fn test_thread_drop(server: impl AgentServer + 'static, cx: &mut TestA cx.executor().run_until_parked(); assert!(!weak_thread.is_upgradable()); + + drop(tempdir); } #[macro_export]