From b87c4a1e13480e5372ced9a6d114b2e4d998874f Mon Sep 17 00:00:00 2001 From: Boris Cherny Date: Thu, 31 Oct 2024 16:21:26 -0700 Subject: [PATCH] assistant: Add health telemetry (#19928) This PR adds a bit of telemetry for Anthropic models, in order to understand model health. With this logging, we can monitor and diagnose dips in performance, for example due to model rollouts. Release Notes: - N/A --------- Co-authored-by: Max Brunsfeld --- Cargo.lock | 1 + crates/assistant/src/context.rs | 27 ++- crates/assistant/src/inline_assistant.rs | 192 +++++++++--------- .../src/terminal_inline_assistant.rs | 112 +++++++--- crates/client/src/telemetry.rs | 7 + crates/language_model/Cargo.toml | 1 + crates/language_model/src/language_model.rs | 50 ++++- crates/language_model/src/logging.rs | 90 ++++++++ .../language_model/src/provider/anthropic.rs | 14 +- .../telemetry_events/src/telemetry_events.rs | 4 + 10 files changed, 354 insertions(+), 144 deletions(-) create mode 100644 crates/language_model/src/logging.rs diff --git a/Cargo.lock b/Cargo.lock index ad47e4326d..e510a07bee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6309,6 +6309,7 @@ dependencies = [ "settings", "smol", "strum 0.25.0", + "telemetry_events", "text", "theme", "thiserror", diff --git a/crates/assistant/src/context.rs b/crates/assistant/src/context.rs index a1de9d3b40..5b4cff01b6 100644 --- a/crates/assistant/src/context.rs +++ b/crates/assistant/src/context.rs @@ -24,6 +24,7 @@ use gpui::{ use language::{AnchorRangeExt, Bias, Buffer, LanguageRegistry, OffsetRangeExt, Point, ToOffset}; use language_model::{ + logging::report_assistant_event, provider::cloud::{MaxMonthlySpendReachedError, PaymentRequiredError}, LanguageModel, LanguageModelCacheConfiguration, LanguageModelCompletionEvent, LanguageModelImage, LanguageModelRegistry, LanguageModelRequest, LanguageModelRequestMessage, @@ -1955,6 +1956,7 @@ impl Context { }); match event { + LanguageModelCompletionEvent::StartMessage { .. } => {} LanguageModelCompletionEvent::Stop(reason) => { stop_reason = reason; } @@ -2060,23 +2062,28 @@ impl Context { None }; - if let Some(telemetry) = this.telemetry.as_ref() { - let language_name = this - .buffer - .read(cx) - .language() - .map(|language| language.name()); - telemetry.report_assistant_event(AssistantEvent { + let language_name = this + .buffer + .read(cx) + .language() + .map(|language| language.name()); + report_assistant_event( + AssistantEvent { conversation_id: Some(this.id.0.clone()), kind: AssistantKind::Panel, phase: AssistantPhase::Response, + message_id: None, model: model.telemetry_id(), model_provider: model.provider_id().to_string(), response_latency, error_message, language_name: language_name.map(|name| name.to_proto()), - }); - } + }, + this.telemetry.clone(), + cx.http_client(), + model.api_key(cx), + cx.background_executor(), + ); if let Ok(stop_reason) = result { match stop_reason { @@ -2543,7 +2550,7 @@ impl Context { let mut messages = stream.await?; let mut replaced = !replace_old; - while let Some(message) = messages.next().await { + while let Some(message) = messages.stream.next().await { let text = message?; let mut lines = text.lines(); this.update(&mut cx, |this, cx| { diff --git a/crates/assistant/src/inline_assistant.rs b/crates/assistant/src/inline_assistant.rs index fdf00c8b04..934c2dd5d3 100644 --- a/crates/assistant/src/inline_assistant.rs +++ b/crates/assistant/src/inline_assistant.rs @@ -21,9 +21,7 @@ use fs::Fs; use futures::{ channel::mpsc, future::{BoxFuture, LocalBoxFuture}, - join, - stream::{self, BoxStream}, - SinkExt, Stream, StreamExt, + join, SinkExt, Stream, StreamExt, }; use gpui::{ anchored, deferred, point, AnyElement, AppContext, ClickEvent, EventEmitter, FocusHandle, @@ -32,7 +30,8 @@ use gpui::{ }; use language::{Buffer, IndentKind, Point, Selection, TransactionId}; use language_model::{ - LanguageModel, LanguageModelRegistry, LanguageModelRequest, LanguageModelRequestMessage, Role, + logging::report_assistant_event, LanguageModel, LanguageModelRegistry, LanguageModelRequest, + LanguageModelRequestMessage, LanguageModelTextStream, Role, }; use multi_buffer::MultiBufferRow; use parking_lot::Mutex; @@ -241,12 +240,13 @@ impl InlineAssistant { }; codegen_ranges.push(start..end); - if let Some(telemetry) = self.telemetry.as_ref() { - if let Some(model) = LanguageModelRegistry::read_global(cx).active_model() { + if let Some(model) = LanguageModelRegistry::read_global(cx).active_model() { + if let Some(telemetry) = self.telemetry.as_ref() { telemetry.report_assistant_event(AssistantEvent { conversation_id: None, kind: AssistantKind::Inline, phase: AssistantPhase::Invoked, + message_id: None, model: model.telemetry_id(), model_provider: model.provider_id().to_string(), response_latency: None, @@ -754,33 +754,6 @@ impl InlineAssistant { pub fn finish_assist(&mut self, assist_id: InlineAssistId, undo: bool, cx: &mut WindowContext) { if let Some(assist) = self.assists.get(&assist_id) { - if let Some(telemetry) = self.telemetry.as_ref() { - if let Some(model) = LanguageModelRegistry::read_global(cx).active_model() { - let language_name = assist.editor.upgrade().and_then(|editor| { - let multibuffer = editor.read(cx).buffer().read(cx); - let ranges = multibuffer.range_to_buffer_ranges(assist.range.clone(), cx); - ranges - .first() - .and_then(|(buffer, _, _)| buffer.read(cx).language()) - .map(|language| language.name()) - }); - telemetry.report_assistant_event(AssistantEvent { - conversation_id: None, - kind: AssistantKind::Inline, - phase: if undo { - AssistantPhase::Rejected - } else { - AssistantPhase::Accepted - }, - model: model.telemetry_id(), - model_provider: model.provider_id().to_string(), - response_latency: None, - error_message: None, - language_name: language_name.map(|name| name.to_proto()), - }); - } - } - let assist_group_id = assist.group_id; if self.assist_groups[&assist_group_id].linked { for assist_id in self.unlink_assist_group(assist_group_id, cx) { @@ -815,12 +788,45 @@ impl InlineAssistant { } } + let active_alternative = assist.codegen.read(cx).active_alternative().clone(); + let message_id = active_alternative.read(cx).message_id.clone(); + + if let Some(model) = LanguageModelRegistry::read_global(cx).active_model() { + let language_name = assist.editor.upgrade().and_then(|editor| { + let multibuffer = editor.read(cx).buffer().read(cx); + let ranges = multibuffer.range_to_buffer_ranges(assist.range.clone(), cx); + ranges + .first() + .and_then(|(buffer, _, _)| buffer.read(cx).language()) + .map(|language| language.name()) + }); + report_assistant_event( + AssistantEvent { + conversation_id: None, + kind: AssistantKind::Inline, + message_id, + phase: if undo { + AssistantPhase::Rejected + } else { + AssistantPhase::Accepted + }, + model: model.telemetry_id(), + model_provider: model.provider_id().to_string(), + response_latency: None, + error_message: None, + language_name: language_name.map(|name| name.to_proto()), + }, + self.telemetry.clone(), + cx.http_client(), + model.api_key(cx), + cx.background_executor(), + ); + } + if undo { assist.codegen.update(cx, |codegen, cx| codegen.undo(cx)); } else { - let confirmed_alternative = assist.codegen.read(cx).active_alternative().clone(); - self.confirmed_assists - .insert(assist_id, confirmed_alternative); + self.confirmed_assists.insert(assist_id, active_alternative); } } } @@ -2497,6 +2503,7 @@ pub struct CodegenAlternative { line_operations: Vec, request: Option, elapsed_time: Option, + message_id: Option, } enum CodegenStatus { @@ -2555,6 +2562,7 @@ impl CodegenAlternative { buffer: buffer.clone(), old_buffer, edit_position: None, + message_id: None, snapshot, last_equal_ranges: Default::default(), transformation_transaction_id: None, @@ -2659,20 +2667,20 @@ impl CodegenAlternative { self.edit_position = Some(self.range.start.bias_right(&self.snapshot)); + let api_key = model.api_key(cx); let telemetry_id = model.telemetry_id(); let provider_id = model.provider_id(); - let chunks: LocalBoxFuture>>> = + let stream: LocalBoxFuture> = if user_prompt.trim().to_lowercase() == "delete" { - async { Ok(stream::empty().boxed()) }.boxed_local() + async { Ok(LanguageModelTextStream::default()) }.boxed_local() } else { let request = self.build_request(user_prompt, assistant_panel_context, cx)?; self.request = Some(request.clone()); - let chunks = cx - .spawn(|_, cx| async move { model.stream_completion_text(request, &cx).await }); - async move { Ok(chunks.await?.boxed()) }.boxed_local() + cx.spawn(|_, cx| async move { model.stream_completion_text(request, &cx).await }) + .boxed_local() }; - self.handle_stream(telemetry_id, provider_id.to_string(), chunks, cx); + self.handle_stream(telemetry_id, provider_id.to_string(), api_key, stream, cx); Ok(()) } @@ -2737,7 +2745,8 @@ impl CodegenAlternative { &mut self, model_telemetry_id: String, model_provider_id: String, - stream: impl 'static + Future>>>, + model_api_key: Option, + stream: impl 'static + Future>, cx: &mut ModelContext, ) { let start_time = Instant::now(); @@ -2767,6 +2776,7 @@ impl CodegenAlternative { } } + let http_client = cx.http_client().clone(); let telemetry = self.telemetry.clone(); let language_name = { let multibuffer = self.buffer.read(cx); @@ -2782,15 +2792,21 @@ impl CodegenAlternative { let mut edit_start = self.range.start.to_offset(&snapshot); self.generation = cx.spawn(|codegen, mut cx| { async move { - let chunks = stream.await; + let stream = stream.await; + let message_id = stream + .as_ref() + .ok() + .and_then(|stream| stream.message_id.clone()); let generate = async { let (mut diff_tx, mut diff_rx) = mpsc::channel(1); + let executor = cx.background_executor().clone(); + let message_id = message_id.clone(); let line_based_stream_diff: Task> = cx.background_executor().spawn(async move { let mut response_latency = None; let request_start = Instant::now(); let diff = async { - let chunks = StripInvalidSpans::new(chunks?); + let chunks = StripInvalidSpans::new(stream?.stream); futures::pin_mut!(chunks); let mut diff = StreamingDiff::new(selected_text.to_string()); let mut line_diff = LineDiff::default(); @@ -2886,9 +2902,10 @@ impl CodegenAlternative { let error_message = result.as_ref().err().map(|error| error.to_string()); - if let Some(telemetry) = telemetry { - telemetry.report_assistant_event(AssistantEvent { + report_assistant_event( + AssistantEvent { conversation_id: None, + message_id, kind: AssistantKind::Inline, phase: AssistantPhase::Response, model: model_telemetry_id, @@ -2896,8 +2913,12 @@ impl CodegenAlternative { response_latency, error_message, language_name: language_name.map(|name| name.to_proto()), - }); - } + }, + telemetry, + http_client, + model_api_key, + &executor, + ); result?; Ok(()) @@ -2961,6 +2982,7 @@ impl CodegenAlternative { codegen .update(&mut cx, |this, cx| { + this.message_id = message_id; this.last_equal_ranges.clear(); if let Err(error) = result { this.status = CodegenStatus::Error(error); @@ -3512,15 +3534,7 @@ mod tests { ) }); - let (chunks_tx, chunks_rx) = mpsc::unbounded(); - codegen.update(cx, |codegen, cx| { - codegen.handle_stream( - String::new(), - String::new(), - future::ready(Ok(chunks_rx.map(Ok).boxed())), - cx, - ) - }); + let chunks_tx = simulate_response_stream(codegen.clone(), cx); let mut new_text = concat!( " let mut x = 0;\n", @@ -3584,15 +3598,7 @@ mod tests { ) }); - let (chunks_tx, chunks_rx) = mpsc::unbounded(); - codegen.update(cx, |codegen, cx| { - codegen.handle_stream( - String::new(), - String::new(), - future::ready(Ok(chunks_rx.map(Ok).boxed())), - cx, - ) - }); + let chunks_tx = simulate_response_stream(codegen.clone(), cx); cx.background_executor.run_until_parked(); @@ -3659,15 +3665,7 @@ mod tests { ) }); - let (chunks_tx, chunks_rx) = mpsc::unbounded(); - codegen.update(cx, |codegen, cx| { - codegen.handle_stream( - String::new(), - String::new(), - future::ready(Ok(chunks_rx.map(Ok).boxed())), - cx, - ) - }); + let chunks_tx = simulate_response_stream(codegen.clone(), cx); cx.background_executor.run_until_parked(); @@ -3733,16 +3731,7 @@ mod tests { ) }); - let (chunks_tx, chunks_rx) = mpsc::unbounded(); - codegen.update(cx, |codegen, cx| { - codegen.handle_stream( - String::new(), - String::new(), - future::ready(Ok(chunks_rx.map(Ok).boxed())), - cx, - ) - }); - + let chunks_tx = simulate_response_stream(codegen.clone(), cx); let new_text = concat!( "func main() {\n", "\tx := 0\n", @@ -3797,16 +3786,7 @@ mod tests { ) }); - let (chunks_tx, chunks_rx) = mpsc::unbounded(); - codegen.update(cx, |codegen, cx| { - codegen.handle_stream( - String::new(), - String::new(), - future::ready(Ok(chunks_rx.map(Ok).boxed())), - cx, - ) - }); - + let chunks_tx = simulate_response_stream(codegen.clone(), cx); chunks_tx .unbounded_send("let mut x = 0;\nx += 1;".to_string()) .unwrap(); @@ -3880,6 +3860,26 @@ mod tests { } } + fn simulate_response_stream( + codegen: Model, + cx: &mut TestAppContext, + ) -> mpsc::UnboundedSender { + let (chunks_tx, chunks_rx) = mpsc::unbounded(); + codegen.update(cx, |codegen, cx| { + codegen.handle_stream( + String::new(), + String::new(), + None, + future::ready(Ok(LanguageModelTextStream { + message_id: None, + stream: chunks_rx.map(Ok).boxed(), + })), + cx, + ); + }); + chunks_tx + } + fn rust_lang() -> Language { Language::new( LanguageConfig { diff --git a/crates/assistant/src/terminal_inline_assistant.rs b/crates/assistant/src/terminal_inline_assistant.rs index 3e472ae4a9..2fb4b4ffda 100644 --- a/crates/assistant/src/terminal_inline_assistant.rs +++ b/crates/assistant/src/terminal_inline_assistant.rs @@ -17,7 +17,8 @@ use gpui::{ }; use language::Buffer; use language_model::{ - LanguageModelRegistry, LanguageModelRequest, LanguageModelRequestMessage, Role, + logging::report_assistant_event, LanguageModelRegistry, LanguageModelRequest, + LanguageModelRequestMessage, Role, }; use settings::Settings; use std::{ @@ -306,6 +307,33 @@ impl TerminalInlineAssistant { this.focus_handle(cx).focus(cx); }) .log_err(); + + if let Some(model) = LanguageModelRegistry::read_global(cx).active_model() { + let codegen = assist.codegen.read(cx); + let executor = cx.background_executor().clone(); + report_assistant_event( + AssistantEvent { + conversation_id: None, + kind: AssistantKind::InlineTerminal, + message_id: codegen.message_id.clone(), + phase: if undo { + AssistantPhase::Rejected + } else { + AssistantPhase::Accepted + }, + model: model.telemetry_id(), + model_provider: model.provider_id().to_string(), + response_latency: None, + error_message: None, + language_name: None, + }, + codegen.telemetry.clone(), + cx.http_client(), + model.api_key(cx), + &executor, + ); + } + assist.codegen.update(cx, |codegen, cx| { if undo { codegen.undo(cx); @@ -1016,6 +1044,7 @@ pub struct Codegen { telemetry: Option>, terminal: Model, generation: Task<()>, + message_id: Option, transaction: Option, } @@ -1026,6 +1055,7 @@ impl Codegen { telemetry, status: CodegenStatus::Idle, generation: Task::ready(()), + message_id: None, transaction: None, } } @@ -1035,6 +1065,8 @@ impl Codegen { return; }; + let model_api_key = model.api_key(cx); + let http_client = cx.http_client(); let telemetry = self.telemetry.clone(); self.status = CodegenStatus::Pending; self.transaction = Some(TerminalTransaction::start(self.terminal.clone())); @@ -1043,44 +1075,62 @@ impl Codegen { let model_provider_id = model.provider_id(); let response = model.stream_completion_text(prompt, &cx).await; let generate = async { + let message_id = response + .as_ref() + .ok() + .and_then(|response| response.message_id.clone()); + let (mut hunks_tx, mut hunks_rx) = mpsc::channel(1); - let task = cx.background_executor().spawn(async move { - let mut response_latency = None; - let request_start = Instant::now(); - let task = async { - let mut chunks = response?; - while let Some(chunk) = chunks.next().await { - if response_latency.is_none() { - response_latency = Some(request_start.elapsed()); + let task = cx.background_executor().spawn({ + let message_id = message_id.clone(); + let executor = cx.background_executor().clone(); + async move { + let mut response_latency = None; + let request_start = Instant::now(); + let task = async { + let mut chunks = response?.stream; + while let Some(chunk) = chunks.next().await { + if response_latency.is_none() { + response_latency = Some(request_start.elapsed()); + } + let chunk = chunk?; + hunks_tx.send(chunk).await?; } - let chunk = chunk?; - hunks_tx.send(chunk).await?; - } + anyhow::Ok(()) + }; + + let result = task.await; + + let error_message = result.as_ref().err().map(|error| error.to_string()); + report_assistant_event( + AssistantEvent { + conversation_id: None, + kind: AssistantKind::InlineTerminal, + message_id, + phase: AssistantPhase::Response, + model: model_telemetry_id, + model_provider: model_provider_id.to_string(), + response_latency, + error_message, + language_name: None, + }, + telemetry, + http_client, + model_api_key, + &executor, + ); + + result?; anyhow::Ok(()) - }; - - let result = task.await; - - let error_message = result.as_ref().err().map(|error| error.to_string()); - if let Some(telemetry) = telemetry { - telemetry.report_assistant_event(AssistantEvent { - conversation_id: None, - kind: AssistantKind::Inline, - phase: AssistantPhase::Response, - model: model_telemetry_id, - model_provider: model_provider_id.to_string(), - response_latency, - error_message, - language_name: None, - }); } - - result?; - anyhow::Ok(()) }); + this.update(&mut cx, |this, _| { + this.message_id = message_id; + })?; + while let Some(hunk) = hunks_rx.next().await { this.update(&mut cx, |this, cx| { if let Some(transaction) = &mut this.transaction { diff --git a/crates/client/src/telemetry.rs b/crates/client/src/telemetry.rs index ba03255d54..25f8709ff1 100644 --- a/crates/client/src/telemetry.rs +++ b/crates/client/src/telemetry.rs @@ -341,6 +341,13 @@ impl Telemetry { .detach(); } + pub fn metrics_enabled(self: &Arc) -> bool { + let state = self.state.lock(); + let enabled = state.settings.metrics; + drop(state); + return enabled; + } + pub fn set_authenticated_user_info( self: &Arc, metrics_id: Option, diff --git a/crates/language_model/Cargo.toml b/crates/language_model/Cargo.toml index 685b022340..e88675bbae 100644 --- a/crates/language_model/Cargo.toml +++ b/crates/language_model/Cargo.toml @@ -46,6 +46,7 @@ serde_json.workspace = true settings.workspace = true smol.workspace = true strum.workspace = true +telemetry_events.workspace = true theme.workspace = true thiserror.workspace = true tiktoken-rs.workspace = true diff --git a/crates/language_model/src/language_model.rs b/crates/language_model/src/language_model.rs index 81d0c874dc..a2f5a072a9 100644 --- a/crates/language_model/src/language_model.rs +++ b/crates/language_model/src/language_model.rs @@ -1,3 +1,4 @@ +pub mod logging; mod model; pub mod provider; mod rate_limiter; @@ -59,6 +60,7 @@ pub enum LanguageModelCompletionEvent { Stop(StopReason), Text(String), ToolUse(LanguageModelToolUse), + StartMessage { message_id: String }, } #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] @@ -76,6 +78,20 @@ pub struct LanguageModelToolUse { pub input: serde_json::Value, } +pub struct LanguageModelTextStream { + pub message_id: Option, + pub stream: BoxStream<'static, Result>, +} + +impl Default for LanguageModelTextStream { + fn default() -> Self { + Self { + message_id: None, + stream: Box::pin(futures::stream::empty()), + } + } +} + pub trait LanguageModel: Send + Sync { fn id(&self) -> LanguageModelId; fn name(&self) -> LanguageModelName; @@ -87,6 +103,10 @@ pub trait LanguageModel: Send + Sync { fn provider_name(&self) -> LanguageModelProviderName; fn telemetry_id(&self) -> String; + fn api_key(&self, _cx: &AppContext) -> Option { + None + } + /// Returns the availability of this language model. fn availability(&self) -> LanguageModelAvailability { LanguageModelAvailability::Public @@ -113,21 +133,39 @@ pub trait LanguageModel: Send + Sync { &self, request: LanguageModelRequest, cx: &AsyncAppContext, - ) -> BoxFuture<'static, Result>>> { + ) -> BoxFuture<'static, Result> { let events = self.stream_completion(request, cx); async move { - Ok(events - .await? - .filter_map(|result| async move { + let mut events = events.await?; + let mut message_id = None; + let mut first_item_text = None; + + if let Some(first_event) = events.next().await { + match first_event { + Ok(LanguageModelCompletionEvent::StartMessage { message_id: id }) => { + message_id = Some(id.clone()); + } + Ok(LanguageModelCompletionEvent::Text(text)) => { + first_item_text = Some(text); + } + _ => (), + } + } + + let stream = futures::stream::iter(first_item_text.map(Ok)) + .chain(events.filter_map(|result| async move { match result { + Ok(LanguageModelCompletionEvent::StartMessage { .. }) => None, Ok(LanguageModelCompletionEvent::Text(text)) => Some(Ok(text)), Ok(LanguageModelCompletionEvent::Stop(_)) => None, Ok(LanguageModelCompletionEvent::ToolUse(_)) => None, Err(err) => Some(Err(err)), } - }) - .boxed()) + })) + .boxed(); + + Ok(LanguageModelTextStream { message_id, stream }) } .boxed() } diff --git a/crates/language_model/src/logging.rs b/crates/language_model/src/logging.rs new file mode 100644 index 0000000000..d5156125c4 --- /dev/null +++ b/crates/language_model/src/logging.rs @@ -0,0 +1,90 @@ +use anthropic::{AnthropicError, ANTHROPIC_API_URL}; +use anyhow::{anyhow, Context, Result}; +use client::telemetry::Telemetry; +use gpui::BackgroundExecutor; +use http_client::{AsyncBody, HttpClient, Method, Request as HttpRequest}; +use std::env; +use std::sync::Arc; +use telemetry_events::{AssistantEvent, AssistantKind, AssistantPhase}; +use util::ResultExt; + +use crate::provider::anthropic::PROVIDER_ID as ANTHROPIC_PROVIDER_ID; + +pub fn report_assistant_event( + event: AssistantEvent, + telemetry: Option>, + client: Arc, + model_api_key: Option, + executor: &BackgroundExecutor, +) { + if let Some(telemetry) = telemetry.as_ref() { + telemetry.report_assistant_event(event.clone()); + if telemetry.metrics_enabled() && event.model_provider == ANTHROPIC_PROVIDER_ID { + executor + .spawn(async move { + report_anthropic_event(event, client, model_api_key) + .await + .log_err(); + }) + .detach(); + } + } +} + +async fn report_anthropic_event( + event: AssistantEvent, + client: Arc, + model_api_key: Option, +) -> Result<(), AnthropicError> { + let api_key = match model_api_key { + Some(key) => key, + None => { + return Err(AnthropicError::Other(anyhow!( + "Anthropic API key is not set" + ))); + } + }; + + let uri = format!("{ANTHROPIC_API_URL}/v1/log/zed"); + let request_builder = HttpRequest::builder() + .method(Method::POST) + .uri(uri) + .header("X-Api-Key", api_key) + .header("Content-Type", "application/json"); + let serialized_event: serde_json::Value = serde_json::json!({ + "completion_type": match event.kind { + AssistantKind::Inline => "natural_language_completion_in_editor", + AssistantKind::InlineTerminal => "natural_language_completion_in_terminal", + AssistantKind::Panel => "conversation_message", + }, + "event": match event.phase { + AssistantPhase::Response => "response", + AssistantPhase::Invoked => "invoke", + AssistantPhase::Accepted => "accept", + AssistantPhase::Rejected => "reject", + }, + "metadata": { + "language_name": event.language_name, + "message_id": event.message_id, + "platform": env::consts::OS, + } + }); + + let request = request_builder + .body(AsyncBody::from(serialized_event.to_string())) + .context("failed to construct request body")?; + + let response = client + .send(request) + .await + .context("failed to send request to Anthropic")?; + + if response.status().is_success() { + return Ok(()); + } + + return Err(AnthropicError::Other(anyhow!( + "Failed to log: {}", + response.status(), + ))); +} diff --git a/crates/language_model/src/provider/anthropic.rs b/crates/language_model/src/provider/anthropic.rs index b7e65650b5..c19526b92f 100644 --- a/crates/language_model/src/provider/anthropic.rs +++ b/crates/language_model/src/provider/anthropic.rs @@ -26,7 +26,7 @@ use theme::ThemeSettings; use ui::{prelude::*, Icon, IconName, Tooltip}; use util::{maybe, ResultExt}; -const PROVIDER_ID: &str = "anthropic"; +pub const PROVIDER_ID: &str = "anthropic"; const PROVIDER_NAME: &str = "Anthropic"; #[derive(Default, Clone, Debug, PartialEq)] @@ -356,6 +356,10 @@ impl LanguageModel for AnthropicModel { format!("anthropic/{}", self.model.id()) } + fn api_key(&self, cx: &AppContext) -> Option { + self.state.read(cx).api_key.clone() + } + fn max_token_count(&self) -> usize { self.model.max_token_count() } @@ -520,6 +524,14 @@ pub fn map_to_language_model_completion_events( )); } } + Event::MessageStart { message } => { + return Some(( + Some(Ok(LanguageModelCompletionEvent::StartMessage { + message_id: message.id, + })), + state, + )) + } Event::MessageDelta { delta, .. } => { if let Some(stop_reason) = delta.stop_reason.as_deref() { let stop_reason = match stop_reason { diff --git a/crates/telemetry_events/src/telemetry_events.rs b/crates/telemetry_events/src/telemetry_events.rs index 26db3cf8d8..43757f85d8 100644 --- a/crates/telemetry_events/src/telemetry_events.rs +++ b/crates/telemetry_events/src/telemetry_events.rs @@ -47,6 +47,7 @@ pub struct EventWrapper { pub enum AssistantKind { Panel, Inline, + InlineTerminal, } impl Display for AssistantKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -56,6 +57,7 @@ impl Display for AssistantKind { match self { Self::Panel => "panel", Self::Inline => "inline", + Self::InlineTerminal => "inline_terminal", } ) } @@ -140,6 +142,8 @@ pub struct CallEvent { pub struct AssistantEvent { /// Unique random identifier for each assistant tab (None for inline assist) pub conversation_id: Option, + /// Server-generated message ID (only supported for some providers) + pub message_id: Option, /// The kind of assistant (Panel, Inline) pub kind: AssistantKind, #[serde(default)]