From f81993574ea0b0d96fc85c44d0139a8701c4f817 Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Sat, 2 Aug 2025 09:58:33 -0600 Subject: [PATCH] Connect Native Agent responses to UI display User-visible improvements: - Native Agent now shows AI responses in the chat interface - Uses configured default model from settings instead of random selection - Streams responses in real-time as the model generates them Technical changes: - Implemented response stream forwarding from Thread to AcpThread - Created Session struct to manage Thread and AcpThread together - Added proper SessionUpdate handling for text chunks and tool calls - Fixed model selection to use LanguageModelRegistry's default - Added comprehensive logging for debugging model interactions - Removed unused cwd parameter - native agent captures context differently than external agents --- crates/acp_thread/src/connection.rs | 2 +- crates/agent2/src/agent.rs | 242 ++++++++++++++++++++--- crates/agent2/src/native_agent_server.rs | 7 + crates/agent2/src/thread.rs | 111 ++++++++--- 4 files changed, 307 insertions(+), 55 deletions(-) diff --git a/crates/acp_thread/src/connection.rs b/crates/acp_thread/src/connection.rs index b99e4949d8..9659433edb 100644 --- a/crates/acp_thread/src/connection.rs +++ b/crates/acp_thread/src/connection.rs @@ -36,7 +36,7 @@ pub trait ModelSelector: 'static { /// A task resolving to `Ok(())` on success or an error. fn select_model( &self, - session_id: &acp::SessionId, + session_id: acp::SessionId, model: Arc, cx: &mut AsyncApp, ) -> Task>; diff --git a/crates/agent2/src/agent.rs b/crates/agent2/src/agent.rs index 60c154971b..0491f7c81a 100644 --- a/crates/agent2/src/agent.rs +++ b/crates/agent2/src/agent.rs @@ -1,6 +1,6 @@ use acp_thread::ModelSelector; use agent_client_protocol as acp; -use anyhow::Result; +use anyhow::{anyhow, Result}; use gpui::{App, AppContext, AsyncApp, Entity, Task}; use language_model::{LanguageModel, LanguageModelRegistry}; use project::Project; @@ -11,15 +11,25 @@ use std::sync::Arc; use crate::{templates::Templates, Thread}; +/// Holds both the internal Thread and the AcpThread for a session +#[derive(Clone)] +struct Session { + /// The internal thread that processes messages + thread: Entity, + /// The ACP thread that handles protocol communication + acp_thread: Entity, +} + pub struct NativeAgent { - /// Session ID -> Thread entity mapping - sessions: HashMap>, + /// Session ID -> Session mapping + sessions: HashMap, /// Shared templates for all threads templates: Arc, } impl NativeAgent { pub fn new(templates: Arc) -> Self { + log::info!("Creating new NativeAgent"); Self { sessions: HashMap::new(), templates, @@ -33,10 +43,12 @@ pub struct NativeAgentConnection(pub Entity); impl ModelSelector for NativeAgentConnection { fn list_models(&self, cx: &mut AsyncApp) -> Task>>> { + log::debug!("NativeAgentConnection::list_models called"); cx.spawn(async move |cx| { cx.update(|cx| { let registry = LanguageModelRegistry::read_global(cx); let models = registry.available_models(cx).collect::>(); + log::info!("Found {} available models", models.len()); if models.is_empty() { Err(anyhow::anyhow!("No models available")) } else { @@ -48,21 +60,26 @@ impl ModelSelector for NativeAgentConnection { fn select_model( &self, - session_id: &acp::SessionId, + session_id: acp::SessionId, model: Arc, cx: &mut AsyncApp, ) -> Task> { + log::info!( + "Setting model for session {}: {:?}", + session_id, + model.name() + ); let agent = self.0.clone(); - let session_id = session_id.clone(); + cx.spawn(async move |cx| { agent.update(cx, |agent, cx| { - if let Some(thread) = agent.sessions.get(&session_id) { - thread.update(cx, |thread, _| { + if let Some(session) = agent.sessions.get(&session_id) { + session.thread.update(cx, |thread, _cx| { thread.selected_model = model; }); Ok(()) } else { - Err(anyhow::anyhow!("Session not found")) + Err(anyhow!("Session not found")) } })? }) @@ -76,10 +93,12 @@ impl ModelSelector for NativeAgentConnection { let agent = self.0.clone(); let session_id = session_id.clone(); cx.spawn(async move |cx| { - let thread = agent + let session = agent .read_with(cx, |agent, _| agent.sessions.get(&session_id).cloned())? .ok_or_else(|| anyhow::anyhow!("Session not found"))?; - let selected = thread.read_with(cx, |thread, _| thread.selected_model.clone())?; + let selected = session + .thread + .read_with(cx, |thread, _| thread.selected_model.clone())?; Ok(selected) }) } @@ -92,32 +111,64 @@ impl acp_thread::AgentConnection for NativeAgentConnection { cwd: &Path, cx: &mut AsyncApp, ) -> Task>> { - let _cwd = cwd.to_owned(); let agent = self.0.clone(); + log::info!("Creating new thread for project at: {:?}", cwd); cx.spawn(async move |cx| { - // Create Thread and store in Agent - let (session_id, _thread) = - agent.update(cx, |agent, cx: &mut gpui::Context| { - // Fetch default model - let default_model = LanguageModelRegistry::read_global(cx) - .available_models(cx) - .next() - .unwrap_or_else(|| panic!("No default model available")); + log::debug!("Starting thread creation in async context"); + // Create Thread + let (session_id, thread) = agent.update( + cx, + |agent, cx: &mut gpui::Context| -> Result<_> { + // Fetch default model from registry settings + let registry = LanguageModelRegistry::read_global(cx); + + // Log available models for debugging + let available_count = registry.available_models(cx).count(); + log::debug!("Total available models: {}", available_count); + + let default_model = registry + .default_model() + .map(|configured| { + log::info!( + "Using configured default model: {:?} from provider: {:?}", + configured.model.name(), + configured.provider.name() + ); + configured.model + }) + .ok_or_else(|| { + log::warn!("No default model configured in settings"); + anyhow!("No default model configured. Please configure a default model in settings.") + })?; let thread = cx.new(|_| Thread::new(agent.templates.clone(), default_model)); + + // Generate session ID let session_id = acp::SessionId(uuid::Uuid::new_v4().to_string().into()); - agent.sessions.insert(session_id.clone(), thread.clone()); - (session_id, thread) - })?; + log::info!("Created session with ID: {}", session_id); + Ok((session_id, thread)) + }, + )??; // Create AcpThread let acp_thread = cx.update(|cx| { cx.new(|cx| { - acp_thread::AcpThread::new("agent2", self.clone(), project, session_id, cx) + acp_thread::AcpThread::new("agent2", self.clone(), project, session_id.clone(), cx) }) })?; + // Store the session + agent.update(cx, |agent, _cx| { + agent.sessions.insert( + session_id, + Session { + thread, + acp_thread: acp_thread.clone(), + }, + ); + })?; + Ok(acp_thread) }) } @@ -137,28 +188,155 @@ impl acp_thread::AgentConnection for NativeAgentConnection { fn prompt(&self, params: acp::PromptRequest, cx: &mut App) -> Task> { let session_id = params.session_id.clone(); let agent = self.0.clone(); + log::info!("Received prompt request for session: {}", session_id); + log::debug!("Prompt blocks count: {}", params.prompt.len()); cx.spawn(async move |cx| { - // Get thread - let thread: Entity = agent - .read_with(cx, |agent, _| agent.sessions.get(&session_id).cloned())? - .ok_or_else(|| anyhow::anyhow!("Session not found"))?; + // Get session + let session = agent + .read_with(cx, |agent, _| { + agent.sessions.get(&session_id).map(|s| Session { + thread: s.thread.clone(), + acp_thread: s.acp_thread.clone(), + }) + })? + .ok_or_else(|| { + log::error!("Session not found: {}", session_id); + anyhow::anyhow!("Session not found") + })?; + log::debug!("Found session for: {}", session_id); // Convert prompt to message let message = convert_prompt_to_message(params.prompt); + log::info!("Converted prompt to message: {} chars", message.len()); + log::debug!("Message content: {}", message); // Get model using the ModelSelector capability (always available for agent2) // Get the selected model from the thread directly - let model = thread.read_with(cx, |thread, _| thread.selected_model.clone())?; + let model = session + .thread + .read_with(cx, |thread, _| thread.selected_model.clone())?; // Send to thread - thread.update(cx, |thread, cx| thread.send(model, message, cx))?; + log::info!("Sending message to thread with model: {:?}", model.name()); + let response_stream = session + .thread + .update(cx, |thread, cx| thread.send(model, message, cx))?; + // Handle response stream and forward to session.acp_thread + let acp_thread = session.acp_thread.clone(); + cx.spawn(async move |cx| { + use futures::StreamExt; + use language_model::LanguageModelCompletionEvent; + + let mut response_stream = response_stream; + + while let Some(result) = response_stream.next().await { + match result { + Ok(event) => { + log::trace!("Received completion event: {:?}", event); + + match event { + LanguageModelCompletionEvent::Text(text) => { + // Send text chunk as agent message + acp_thread.update(cx, |thread, cx| { + thread.handle_session_update( + acp::SessionUpdate::AgentMessageChunk { + content: acp::ContentBlock::Text( + acp::TextContent { + text: text.into(), + annotations: None, + }, + ), + }, + cx, + ) + })??; + } + LanguageModelCompletionEvent::ToolUse(tool_use) => { + // Convert LanguageModelToolUse to ACP ToolCall + acp_thread.update(cx, |thread, cx| { + thread.handle_session_update( + acp::SessionUpdate::ToolCall(acp::ToolCall { + id: acp::ToolCallId(tool_use.id.to_string().into()), + label: tool_use.name.to_string(), + kind: acp::ToolKind::Other, + status: acp::ToolCallStatus::Pending, + content: vec![], + locations: vec![], + raw_input: Some(tool_use.input), + }), + cx, + ) + })??; + } + LanguageModelCompletionEvent::StartMessage { .. } => { + log::debug!("Started new assistant message"); + } + LanguageModelCompletionEvent::UsageUpdate(usage) => { + log::debug!("Token usage update: {:?}", usage); + } + LanguageModelCompletionEvent::Thinking { text, .. } => { + // Send thinking text as agent thought chunk + acp_thread.update(cx, |thread, cx| { + thread.handle_session_update( + acp::SessionUpdate::AgentThoughtChunk { + content: acp::ContentBlock::Text( + acp::TextContent { + text: text.into(), + annotations: None, + }, + ), + }, + cx, + ) + })??; + } + LanguageModelCompletionEvent::StatusUpdate(status) => { + log::trace!("Status update: {:?}", status); + } + LanguageModelCompletionEvent::Stop(stop_reason) => { + log::debug!("Assistant message complete: {:?}", stop_reason); + } + LanguageModelCompletionEvent::RedactedThinking { .. } => { + log::trace!("Redacted thinking event"); + } + LanguageModelCompletionEvent::ToolUseJsonParseError { + id, + tool_name, + raw_input, + json_parse_error, + } => { + log::error!( + "Tool use JSON parse error for tool '{}' (id: {}): {} - input: {}", + tool_name, + id, + json_parse_error, + raw_input + ); + } + } + } + Err(e) => { + log::error!("Error in model response stream: {:?}", e); + // TODO: Consider sending an error message to the UI + break; + } + } + } + + log::info!("Response stream completed"); + anyhow::Ok(()) + }) + .detach(); + + log::info!("Successfully sent prompt to thread and started response handler"); Ok(()) }) } fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) { + log::info!("Cancelling session: {}", session_id); self.0.update(cx, |agent, _cx| { agent.sessions.remove(session_id); }); @@ -167,23 +345,29 @@ impl acp_thread::AgentConnection for NativeAgentConnection { /// Convert ACP content blocks to a message string fn convert_prompt_to_message(blocks: Vec) -> String { + log::debug!("Converting {} content blocks to message", blocks.len()); let mut message = String::new(); for block in blocks { match block { acp::ContentBlock::Text(text) => { + log::trace!("Processing text block: {} chars", text.text.len()); message.push_str(&text.text); } acp::ContentBlock::ResourceLink(link) => { + log::trace!("Processing resource link: {}", link.uri); message.push_str(&format!(" @{} ", link.uri)); } acp::ContentBlock::Image(_) => { + log::trace!("Processing image block"); message.push_str(" [image] "); } acp::ContentBlock::Audio(_) => { + log::trace!("Processing audio block"); message.push_str(" [audio] "); } acp::ContentBlock::Resource(resource) => { + log::trace!("Processing resource block: {:?}", resource.resource); message.push_str(&format!(" [resource: {:?}] ", resource.resource)); } } diff --git a/crates/agent2/src/native_agent_server.rs b/crates/agent2/src/native_agent_server.rs index b71fa455b5..aafe70a8a2 100644 --- a/crates/agent2/src/native_agent_server.rs +++ b/crates/agent2/src/native_agent_server.rs @@ -35,15 +35,22 @@ impl AgentServer for NativeAgentServer { _project: &Entity, cx: &mut App, ) -> Task>> { + log::info!( + "NativeAgentServer::connect called for path: {:?}", + _root_dir + ); cx.spawn(async move |cx| { + log::debug!("Creating templates for native agent"); // Create templates (you might want to load these from files or resources) let templates = Templates::new(); // Create the native agent + log::debug!("Creating native agent entity"); let agent = cx.update(|cx| cx.new(|_| NativeAgent::new(templates)))?; // Create the connection wrapper let connection = NativeAgentConnection(agent); + log::info!("NativeAgentServer connection established successfully"); Ok(Rc::new(connection) as Rc) }) diff --git a/crates/agent2/src/thread.rs b/crates/agent2/src/thread.rs index 758e940269..4bc2932aeb 100644 --- a/crates/agent2/src/thread.rs +++ b/crates/agent2/src/thread.rs @@ -9,6 +9,7 @@ use language_model::{ LanguageModelToolResult, LanguageModelToolResultContent, LanguageModelToolSchemaFormat, LanguageModelToolUse, MessageContent, Role, StopReason, }; +use log; use schemars::{JsonSchema, Schema}; use serde::Deserialize; use smol::stream::StreamExt; @@ -38,7 +39,6 @@ pub struct Thread { tools: BTreeMap>, templates: Arc, pub selected_model: Arc, - // project: Entity, // action_log: Entity, } @@ -80,22 +80,36 @@ impl Thread { content: impl Into, cx: &mut Context, ) -> mpsc::UnboundedReceiver> { + let content = content.into(); + log::info!("Thread::send called with model: {:?}", model.name()); + log::debug!("Thread::send content: {:?}", content); + cx.notify(); let (events_tx, events_rx) = mpsc::unbounded::>(); let system_message = self.build_system_message(cx); + log::debug!( + "System messages count: {}", + if system_message.is_some() { 1 } else { 0 } + ); self.messages.extend(system_message); self.messages.push(AgentMessage { role: Role::User, - content: vec![content.into()], + content: vec![content], }); + log::info!("Total messages in thread: {}", self.messages.len()); self.running_turn = Some(cx.spawn(async move |thread, cx| { + log::info!("Starting agent turn execution"); let turn_result = async { // Perform one request, then keep looping if the model makes tool calls. let mut completion_intent = CompletionIntent::UserPrompt; loop { + log::debug!( + "Building completion request with intent: {:?}", + completion_intent + ); let request = thread.update(cx, |thread, cx| { thread.build_completion_request(completion_intent, cx) })?; @@ -106,11 +120,14 @@ impl Thread { // ); // Stream events, appending to messages and collecting up tool uses. + log::info!("Calling model.stream_completion"); let mut events = model.stream_completion(request, cx).await?; + log::debug!("Stream completion started successfully"); let mut tool_uses = Vec::new(); while let Some(event) = events.next().await { match event { Ok(event) => { + log::trace!("Received completion event: {:?}", event); thread .update(cx, |thread, cx| { tool_uses.extend(thread.handle_streamed_completion_event( @@ -122,6 +139,7 @@ impl Thread { .ok(); } Err(error) => { + log::error!("Error in completion stream: {:?}", error); events_tx.unbounded_send(Err(error)).ok(); break; } @@ -130,13 +148,16 @@ impl Thread { // If there are no tool uses, the turn is done. if tool_uses.is_empty() { + log::info!("No tool uses found, completing turn"); break; } + log::info!("Found {} tool uses to execute", tool_uses.len()); // If there are tool uses, wait for their results to be // computed, then send them together in a single message on // the next loop iteration. let tool_results = future::join_all(tool_uses).await; + log::debug!("Tool execution completed, {} results", tool_results.len()); thread .update(cx, |thread, _cx| { thread.messages.push(AgentMessage { @@ -156,13 +177,17 @@ impl Thread { .await; if let Err(error) = turn_result { + log::error!("Turn execution failed: {:?}", error); events_tx.unbounded_send(Err(error)).ok(); + } else { + log::info!("Turn execution completed successfully"); } })); events_rx } pub fn build_system_message(&mut self, cx: &App) -> Option { + log::debug!("Building system message"); let mut system_message = AgentMessage { role: Role::System, content: Vec::new(), @@ -176,7 +201,9 @@ impl Thread { } } - (!system_message.content.is_empty()).then_some(system_message) + let result = (!system_message.content.is_empty()).then_some(system_message); + log::debug!("System message built: {}", result.is_some()); + result } /// A helper method that's called on every streamed completion event. @@ -188,6 +215,7 @@ impl Thread { events_tx: mpsc::UnboundedSender>, cx: &mut Context, ) -> Option> { + log::trace!("Handling streamed completion event: {:?}", event); use LanguageModelCompletionEvent::*; events_tx.unbounded_send(Ok(event.clone())).ok(); @@ -329,41 +357,74 @@ impl Thread { completion_intent: CompletionIntent, cx: &mut App, ) -> LanguageModelRequest { - LanguageModelRequest { + log::debug!("Building completion request"); + log::debug!("Completion intent: {:?}", completion_intent); + log::debug!("Completion mode: {:?}", self.completion_mode); + + let messages = self.build_request_messages(); + log::info!("Request will include {} messages", messages.len()); + + let tools: Vec = self + .tools + .values() + .filter_map(|tool| { + let tool_name = tool.name().to_string(); + log::trace!("Including tool: {}", tool_name); + Some(LanguageModelRequestTool { + name: tool_name, + description: tool.description(cx).to_string(), + input_schema: tool + .input_schema(LanguageModelToolSchemaFormat::JsonSchema) + .log_err()?, + }) + }) + .collect(); + + log::info!("Request includes {} tools", tools.len()); + + let request = LanguageModelRequest { thread_id: None, prompt_id: None, intent: Some(completion_intent), mode: Some(self.completion_mode), - messages: self.build_request_messages(), - tools: self - .tools - .values() - .filter_map(|tool| { - Some(LanguageModelRequestTool { - name: tool.name().to_string(), - description: tool.description(cx).to_string(), - input_schema: tool - .input_schema(LanguageModelToolSchemaFormat::JsonSchema) - .log_err()?, - }) - }) - .collect(), + messages, + tools, tool_choice: None, stop: Vec::new(), temperature: None, thinking_allowed: false, - } + }; + + log::debug!("Completion request built successfully"); + request } fn build_request_messages(&self) -> Vec { - self.messages + log::trace!( + "Building request messages from {} thread messages", + self.messages.len() + ); + let messages = self + .messages .iter() - .map(|message| LanguageModelRequestMessage { - role: message.role, - content: message.content.clone(), - cache: false, + .map(|message| { + log::trace!( + " - {} message with {} content items", + match message.role { + Role::System => "System", + Role::User => "User", + Role::Assistant => "Assistant", + }, + message.content.len() + ); + LanguageModelRequestMessage { + role: message.role, + content: message.content.clone(), + cache: false, + } }) - .collect() + .collect(); + messages } }