Compare commits

...
Sign in to create a new pull request.

11 commits

Author SHA1 Message Date
Agus Zubiaga
5de2c28f75 Build Zed 2025-08-07 14:59:45 -03:00
Agus Zubiaga
2ecb5b2ff6 Use tempfile tempdir instead of hardcoding /private/tmp 2025-08-07 13:35:21 -03:00
Agus Zubiaga
f1af9d5fbd Use same machine as eval 2025-08-07 12:27:58 -03:00
Agus Zubiaga
d97e15dcaf Raise timeout 2025-08-07 12:24:03 -03:00
Agus Zubiaga
5db22c9440 Install nextest 2025-08-07 12:02:51 -03:00
Agus Zubiaga
49ef4b5024 Run claude e2e tests in CI - attempt 1 2025-08-07 12:00:46 -03:00
Agus Zubiaga
cace7de723 Fix double take 2025-08-07 11:43:45 -03:00
Agus Zubiaga
3925aa9b29 Remove CI workflow for now 2025-08-07 11:37:24 -03:00
Agus Zubiaga
7f9adae3a3 Combine end_turn_tx and cancellation_state into one enum 2025-08-07 11:36:29 -03:00
Agus Zubiaga
4b94e90899 Use replace 2025-08-07 01:21:03 -03:00
Agus Zubiaga
63cc3291e3 Fix CC tool state on cancel 2025-08-07 01:11:13 -03:00
3 changed files with 246 additions and 74 deletions

132
.github/workflows/agent_servers_e2e.yml vendored Normal file
View file

@ -0,0 +1,132 @@
name: Agent Servers E2E Tests
on:
schedule:
- cron: "0 12 * * *"
push:
branches:
- as-e2e-ci
pull_request:
branches:
- "**"
paths:
- "crates/agent_servers/**"
- "crates/acp_thread/**"
- ".github/workflows/agent_servers_e2e.yml"
workflow_dispatch:
concurrency:
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
env:
CARGO_TERM_COLOR: always
CARGO_INCREMENTAL: 0
RUST_BACKTRACE: 1
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
# GEMINI_API_KEY: ${{ secrets.GEMINI_API_KEY }}
jobs:
e2e-tests:
name: Run Agent Servers E2E Tests
if: github.repository_owner == 'zed-industries'
timeout-minutes: 20
runs-on:
- buildjet-16vcpu-ubuntu-2204
steps:
- name: Add Rust to the PATH
run: echo "$HOME/.cargo/bin" >> "$GITHUB_PATH"
- name: Checkout repo
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4
with:
clean: false
# - name: Checkout gemini-cli repo
# uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4
# with:
# repository: zed-industries/gemini-cli
# ref: migrate-acp
# path: gemini-cli
# clean: false
- name: Cache dependencies
uses: swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2
with:
save-if: ${{ github.ref == 'refs/heads/main' }}
cache-provider: "buildjet"
- name: Install Linux dependencies
run: ./script/linux
- name: Configure CI
run: |
mkdir -p ./../.cargo
cp ./.cargo/ci-config.toml ./../.cargo/config.toml
- name: Install Node
uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4
with:
node-version: "18"
- name: Install Claude Code CLI
shell: bash -euxo pipefail {0}
run: |
npm install -g @anthropic-ai/claude-code
# Verify installation
which claude || echo "Claude CLI not found in PATH"
# Skip authentication if API key is not set (tests may use mock)
if [ -n "$ANTHROPIC_API_KEY" ]; then
echo "Anthropic API key is configured"
fi
# - name: Install and setup Gemini CLI
# shell: bash -euxo pipefail {0}
# run: |
# # Also install dependencies for local gemini-cli repo
# pushd gemini-cli
# npm install
# npm run build
# popd
# # Verify installations
# which gemini || echo "Gemini CLI not found in PATH"
# # Skip authentication if API key is not set (tests may use mock)
# if [ -n "$GEMINI_API_KEY" ]; then
# echo "Gemini API key is configured"
# fi
- name: Limit target directory size
shell: bash -euxo pipefail {0}
run: script/clear-target-dir-if-larger-than 100
- name: Install nextest
shell: bash -euxo pipefail {0}
run: |
cargo install cargo-nextest --locked
- name: Build Zed
shell: bash -euxo pipefail {0}
run: |
cargo build
- name: Run E2E tests
shell: bash -euxo pipefail {0}
run: |
cargo nextest run \
--package agent_servers \
--features e2e \
--no-fail-fast \
claude
# Even the Linux runner is not stateful, in theory there is no need to do this cleanup.
# But, to avoid potential issues in the future if we choose to use a stateful Linux runner and forget to add code
# to clean up the config file, Ive included the cleanup code here as a precaution.
# While its not strictly necessary at this moment, I believe its better to err on the side of caution.
- name: Clean CI config file
if: always()
run: rm -rf ./../.cargo

View file

@ -6,7 +6,7 @@ use context_server::listener::McpServerTool;
use project::Project;
use settings::SettingsStore;
use smol::process::Child;
use std::cell::{Cell, RefCell};
use std::cell::RefCell;
use std::fmt::Display;
use std::path::Path;
use std::rc::Rc;
@ -153,20 +153,17 @@ impl AgentConnection for ClaudeAgentConnection {
})
.detach();
let pending_cancellation = Rc::new(Cell::new(PendingCancellation::None));
let turn_state = Rc::new(RefCell::new(TurnState::None));
let end_turn_tx = Rc::new(RefCell::new(None));
let handler_task = cx.spawn({
let end_turn_tx = end_turn_tx.clone();
let turn_state = turn_state.clone();
let mut thread_rx = thread_rx.clone();
let cancellation_state = pending_cancellation.clone();
async move |cx| {
while let Some(message) = incoming_message_rx.next().await {
ClaudeAgentSession::handle_message(
thread_rx.clone(),
message,
end_turn_tx.clone(),
cancellation_state.clone(),
turn_state.clone(),
cx,
)
.await
@ -192,8 +189,7 @@ impl AgentConnection for ClaudeAgentConnection {
let session = ClaudeAgentSession {
outgoing_tx,
end_turn_tx,
pending_cancellation,
turn_state,
_handler_task: handler_task,
_mcp_server: Some(permission_mcp_server),
};
@ -225,8 +221,8 @@ impl AgentConnection for ClaudeAgentConnection {
)));
};
let (tx, rx) = oneshot::channel();
session.end_turn_tx.borrow_mut().replace(tx);
let (end_tx, end_rx) = oneshot::channel();
session.turn_state.replace(TurnState::InProgress { end_tx });
let mut content = String::new();
for chunk in params.prompt {
@ -260,12 +256,7 @@ impl AgentConnection for ClaudeAgentConnection {
return Task::ready(Err(anyhow!(err)));
}
let cancellation_state = session.pending_cancellation.clone();
cx.foreground_executor().spawn(async move {
let result = rx.await??;
cancellation_state.set(PendingCancellation::None);
Ok(result)
})
cx.foreground_executor().spawn(async move { end_rx.await? })
}
fn cancel(&self, session_id: &acp::SessionId, _cx: &mut App) {
@ -277,7 +268,15 @@ impl AgentConnection for ClaudeAgentConnection {
let request_id = new_request_id();
session.pending_cancellation.set(PendingCancellation::Sent {
let turn_state = session.turn_state.take();
let TurnState::InProgress { end_tx } = turn_state else {
// Already cancelled or idle, put it back
session.turn_state.replace(turn_state);
return;
};
session.turn_state.replace(TurnState::CancelRequested {
end_tx,
request_id: request_id.clone(),
});
@ -349,28 +348,56 @@ fn spawn_claude(
struct ClaudeAgentSession {
outgoing_tx: UnboundedSender<SdkMessage>,
end_turn_tx: Rc<RefCell<Option<oneshot::Sender<Result<acp::PromptResponse>>>>>,
pending_cancellation: Rc<Cell<PendingCancellation>>,
turn_state: Rc<RefCell<TurnState>>,
_mcp_server: Option<ClaudeZedMcpServer>,
_handler_task: Task<()>,
}
#[derive(Debug, Default, PartialEq)]
enum PendingCancellation {
#[derive(Debug, Default)]
enum TurnState {
#[default]
None,
Sent {
InProgress {
end_tx: oneshot::Sender<Result<acp::PromptResponse>>,
},
CancelRequested {
end_tx: oneshot::Sender<Result<acp::PromptResponse>>,
request_id: String,
},
Confirmed,
CancelConfirmed {
end_tx: oneshot::Sender<Result<acp::PromptResponse>>,
},
}
impl TurnState {
fn is_cancelled(&self) -> bool {
matches!(self, TurnState::CancelConfirmed { .. })
}
fn end_tx(self) -> Option<oneshot::Sender<Result<acp::PromptResponse>>> {
match self {
TurnState::None => None,
TurnState::InProgress { end_tx, .. } => Some(end_tx),
TurnState::CancelRequested { end_tx, .. } => Some(end_tx),
TurnState::CancelConfirmed { end_tx } => Some(end_tx),
}
}
fn confirm_cancellation(self, id: &str) -> Self {
match self {
TurnState::CancelRequested { request_id, end_tx } if request_id == id => {
TurnState::CancelConfirmed { end_tx }
}
_ => self,
}
}
}
impl ClaudeAgentSession {
async fn handle_message(
mut thread_rx: watch::Receiver<WeakEntity<AcpThread>>,
message: SdkMessage,
end_turn_tx: Rc<RefCell<Option<oneshot::Sender<Result<acp::PromptResponse>>>>>,
pending_cancellation: Rc<Cell<PendingCancellation>>,
turn_state: Rc<RefCell<TurnState>>,
cx: &mut AsyncApp,
) {
match message {
@ -393,15 +420,13 @@ impl ClaudeAgentSession {
for chunk in message.content.chunks() {
match chunk {
ContentChunk::Text { text } | ContentChunk::UntaggedText(text) => {
let state = pending_cancellation.take();
if state != PendingCancellation::Confirmed {
if !turn_state.borrow().is_cancelled() {
thread
.update(cx, |thread, cx| {
thread.push_user_content_block(text.into(), cx)
})
.log_err();
}
pending_cancellation.set(state);
}
ContentChunk::ToolResult {
content,
@ -414,7 +439,12 @@ impl ClaudeAgentSession {
acp::ToolCallUpdate {
id: acp::ToolCallId(tool_use_id.into()),
fields: acp::ToolCallUpdateFields {
status: Some(acp::ToolCallStatus::Completed),
status: if turn_state.borrow().is_cancelled() {
// Do not set to completed if turn was cancelled
None
} else {
Some(acp::ToolCallStatus::Completed)
},
content: (!content.is_empty())
.then(|| vec![content.into()]),
..Default::default()
@ -541,40 +571,38 @@ impl ClaudeAgentSession {
result,
..
} => {
if let Some(end_turn_tx) = end_turn_tx.borrow_mut().take() {
if is_error
|| (subtype == ResultErrorType::ErrorDuringExecution
&& pending_cancellation.take() != PendingCancellation::Confirmed)
{
end_turn_tx
.send(Err(anyhow!(
"Error: {}",
result.unwrap_or_else(|| subtype.to_string())
)))
.ok();
} else {
let stop_reason = match subtype {
ResultErrorType::Success => acp::StopReason::EndTurn,
ResultErrorType::ErrorMaxTurns => acp::StopReason::MaxTurnRequests,
ResultErrorType::ErrorDuringExecution => acp::StopReason::Cancelled,
};
end_turn_tx
.send(Ok(acp::PromptResponse { stop_reason }))
.ok();
}
let turn_state = turn_state.take();
let was_cancelled = turn_state.is_cancelled();
let Some(end_turn_tx) = turn_state.end_tx() else {
debug_panic!("Received `SdkMessage::Result` but there wasn't an active turn");
return;
};
if is_error || (!was_cancelled && subtype == ResultErrorType::ErrorDuringExecution)
{
end_turn_tx
.send(Err(anyhow!(
"Error: {}",
result.unwrap_or_else(|| subtype.to_string())
)))
.ok();
} else {
let stop_reason = match subtype {
ResultErrorType::Success => acp::StopReason::EndTurn,
ResultErrorType::ErrorMaxTurns => acp::StopReason::MaxTurnRequests,
ResultErrorType::ErrorDuringExecution => acp::StopReason::Cancelled,
};
end_turn_tx
.send(Ok(acp::PromptResponse { stop_reason }))
.ok();
}
}
SdkMessage::ControlResponse { response } => {
if matches!(response.subtype, ResultErrorType::Success) {
let pending_cancellation_value = pending_cancellation.take();
if let PendingCancellation::Sent { request_id } = &pending_cancellation_value
&& request_id == &response.request_id
{
pending_cancellation.set(PendingCancellation::Confirmed);
} else {
pending_cancellation.set(pending_cancellation_value);
}
let new_state = turn_state.take().confirm_cancellation(&response.request_id);
turn_state.replace(new_state);
} else {
log::error!("Control response error: {:?}", response);
}
}
SdkMessage::System { .. } => {}
@ -865,9 +893,10 @@ pub(crate) mod tests {
#[cfg_attr(not(feature = "e2e"), ignore)]
async fn test_todo_plan(cx: &mut TestAppContext) {
let fs = e2e_tests::init_test(cx).await;
let tempdir = tempfile::tempdir().unwrap();
let project = Project::test(fs, [], cx).await;
let thread =
e2e_tests::new_test_thread(ClaudeCode, project.clone(), "/private/tmp", cx).await;
e2e_tests::new_test_thread(ClaudeCode, project.clone(), tempdir.path(), cx).await;
thread
.update(cx, |thread, cx| {
@ -921,6 +950,8 @@ pub(crate) mod tests {
));
assert_eq!(thread.plan().entries.len(), entries_len);
});
drop(tempdir);
}
#[test]

View file

@ -13,12 +13,12 @@ use gpui::{Entity, TestAppContext};
use indoc::indoc;
use project::{FakeFs, Project};
use settings::{Settings, SettingsStore};
use util::path;
pub async fn test_basic(server: impl AgentServer + 'static, cx: &mut TestAppContext) {
let fs = init_test(cx).await;
let tempdir = tempfile::tempdir().unwrap();
let project = Project::test(fs, [], cx).await;
let thread = new_test_thread(server, project.clone(), "/private/tmp", cx).await;
let thread = new_test_thread(server, project.clone(), tempdir.path(), cx).await;
thread
.update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
@ -40,6 +40,8 @@ pub async fn test_basic(server: impl AgentServer + 'static, cx: &mut TestAppCont
AgentThreadEntry::AssistantMessage(_)
));
});
drop(tempdir);
}
pub async fn test_path_mentions(server: impl AgentServer + 'static, cx: &mut TestAppContext) {
@ -118,7 +120,7 @@ pub async fn test_tool_call(server: impl AgentServer + 'static, cx: &mut TestApp
std::fs::write(&foo_path, "Lorem ipsum dolor").expect("failed to write file");
let project = Project::example([tempdir.path()], &mut cx.to_async()).await;
let thread = new_test_thread(server, project.clone(), "/private/tmp", cx).await;
let thread = new_test_thread(server, project.clone(), tempdir.path(), cx).await;
thread
.update(cx, |thread, cx| {
@ -156,8 +158,9 @@ pub async fn test_tool_call_with_permission(
cx: &mut TestAppContext,
) {
let fs = init_test(cx).await;
let project = Project::test(fs, [path!("/private/tmp").as_ref()], cx).await;
let thread = new_test_thread(server, project.clone(), "/private/tmp", cx).await;
let tempdir = tempfile::tempdir().unwrap();
let project = Project::test(fs, [tempdir.path()], cx).await;
let thread = new_test_thread(server, project.clone(), tempdir.path(), cx).await;
let full_turn = thread.update(cx, |thread, cx| {
thread.send_raw(
r#"Run exactly `touch hello.txt && echo "Hello, world!" | tee hello.txt` in the terminal."#,
@ -239,14 +242,16 @@ pub async fn test_tool_call_with_permission(
"Expected content to contain 'Hello'"
);
});
drop(tempdir);
}
pub async fn test_cancel(server: impl AgentServer + 'static, cx: &mut TestAppContext) {
let fs = init_test(cx).await;
let project = Project::test(fs, [path!("/private/tmp").as_ref()], cx).await;
let thread = new_test_thread(server, project.clone(), "/private/tmp", cx).await;
let full_turn = thread.update(cx, |thread, cx| {
let tempdir = tempfile::tempdir().unwrap();
let project = Project::test(fs, [tempdir.path()], cx).await;
let thread = new_test_thread(server, project.clone(), tempdir.path(), cx).await;
let _ = thread.update(cx, |thread, cx| {
thread.send_raw(
r#"Run exactly `touch hello.txt && echo "Hello, world!" | tee hello.txt` in the terminal."#,
cx,
@ -285,9 +290,8 @@ pub async fn test_cancel(server: impl AgentServer + 'static, cx: &mut TestAppCon
id.clone()
});
let _ = thread.update(cx, |thread, cx| thread.cancel(cx));
full_turn.await.unwrap();
thread.read_with(cx, |thread, _| {
thread.update(cx, |thread, cx| thread.cancel(cx)).await;
thread.read_with(cx, |thread, _cx| {
let AgentThreadEntry::ToolCall(ToolCall {
status: ToolCallStatus::Canceled,
..
@ -309,12 +313,15 @@ pub async fn test_cancel(server: impl AgentServer + 'static, cx: &mut TestAppCon
AgentThreadEntry::AssistantMessage(..),
))
});
drop(tempdir);
}
pub async fn test_thread_drop(server: impl AgentServer + 'static, cx: &mut TestAppContext) {
let fs = init_test(cx).await;
let tempdir = tempfile::tempdir().unwrap();
let project = Project::test(fs, [], cx).await;
let thread = new_test_thread(server, project.clone(), "/private/tmp", cx).await;
let thread = new_test_thread(server, project.clone(), tempdir.path(), cx).await;
thread
.update(cx, |thread, cx| thread.send_raw("Hello from test!", cx))
@ -330,6 +337,8 @@ pub async fn test_thread_drop(server: impl AgentServer + 'static, cx: &mut TestA
cx.executor().run_until_parked();
assert!(!weak_thread.is_upgradable());
drop(tempdir);
}
#[macro_export]