assistant: Add health telemetry (#19928)

This PR adds a bit of telemetry for Anthropic models, in order to
understand model health. With this logging, we can monitor and diagnose
dips in performance, for example due to model rollouts.

Release Notes:

- N/A

---------

Co-authored-by: Max Brunsfeld <maxbrunsfeld@gmail.com>
This commit is contained in:
Boris Cherny 2024-10-31 16:21:26 -07:00 committed by GitHub
parent a0988508f0
commit b87c4a1e13
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 354 additions and 144 deletions

1
Cargo.lock generated
View file

@ -6309,6 +6309,7 @@ dependencies = [
"settings", "settings",
"smol", "smol",
"strum 0.25.0", "strum 0.25.0",
"telemetry_events",
"text", "text",
"theme", "theme",
"thiserror", "thiserror",

View file

@ -24,6 +24,7 @@ use gpui::{
use language::{AnchorRangeExt, Bias, Buffer, LanguageRegistry, OffsetRangeExt, Point, ToOffset}; use language::{AnchorRangeExt, Bias, Buffer, LanguageRegistry, OffsetRangeExt, Point, ToOffset};
use language_model::{ use language_model::{
logging::report_assistant_event,
provider::cloud::{MaxMonthlySpendReachedError, PaymentRequiredError}, provider::cloud::{MaxMonthlySpendReachedError, PaymentRequiredError},
LanguageModel, LanguageModelCacheConfiguration, LanguageModelCompletionEvent, LanguageModel, LanguageModelCacheConfiguration, LanguageModelCompletionEvent,
LanguageModelImage, LanguageModelRegistry, LanguageModelRequest, LanguageModelRequestMessage, LanguageModelImage, LanguageModelRegistry, LanguageModelRequest, LanguageModelRequestMessage,
@ -1955,6 +1956,7 @@ impl Context {
}); });
match event { match event {
LanguageModelCompletionEvent::StartMessage { .. } => {}
LanguageModelCompletionEvent::Stop(reason) => { LanguageModelCompletionEvent::Stop(reason) => {
stop_reason = reason; stop_reason = reason;
} }
@ -2060,23 +2062,28 @@ impl Context {
None None
}; };
if let Some(telemetry) = this.telemetry.as_ref() { let language_name = this
let language_name = this .buffer
.buffer .read(cx)
.read(cx) .language()
.language() .map(|language| language.name());
.map(|language| language.name()); report_assistant_event(
telemetry.report_assistant_event(AssistantEvent { AssistantEvent {
conversation_id: Some(this.id.0.clone()), conversation_id: Some(this.id.0.clone()),
kind: AssistantKind::Panel, kind: AssistantKind::Panel,
phase: AssistantPhase::Response, phase: AssistantPhase::Response,
message_id: None,
model: model.telemetry_id(), model: model.telemetry_id(),
model_provider: model.provider_id().to_string(), model_provider: model.provider_id().to_string(),
response_latency, response_latency,
error_message, error_message,
language_name: language_name.map(|name| name.to_proto()), language_name: language_name.map(|name| name.to_proto()),
}); },
} this.telemetry.clone(),
cx.http_client(),
model.api_key(cx),
cx.background_executor(),
);
if let Ok(stop_reason) = result { if let Ok(stop_reason) = result {
match stop_reason { match stop_reason {
@ -2543,7 +2550,7 @@ impl Context {
let mut messages = stream.await?; let mut messages = stream.await?;
let mut replaced = !replace_old; let mut replaced = !replace_old;
while let Some(message) = messages.next().await { while let Some(message) = messages.stream.next().await {
let text = message?; let text = message?;
let mut lines = text.lines(); let mut lines = text.lines();
this.update(&mut cx, |this, cx| { this.update(&mut cx, |this, cx| {

View file

@ -21,9 +21,7 @@ use fs::Fs;
use futures::{ use futures::{
channel::mpsc, channel::mpsc,
future::{BoxFuture, LocalBoxFuture}, future::{BoxFuture, LocalBoxFuture},
join, join, SinkExt, Stream, StreamExt,
stream::{self, BoxStream},
SinkExt, Stream, StreamExt,
}; };
use gpui::{ use gpui::{
anchored, deferred, point, AnyElement, AppContext, ClickEvent, EventEmitter, FocusHandle, anchored, deferred, point, AnyElement, AppContext, ClickEvent, EventEmitter, FocusHandle,
@ -32,7 +30,8 @@ use gpui::{
}; };
use language::{Buffer, IndentKind, Point, Selection, TransactionId}; use language::{Buffer, IndentKind, Point, Selection, TransactionId};
use language_model::{ use language_model::{
LanguageModel, LanguageModelRegistry, LanguageModelRequest, LanguageModelRequestMessage, Role, logging::report_assistant_event, LanguageModel, LanguageModelRegistry, LanguageModelRequest,
LanguageModelRequestMessage, LanguageModelTextStream, Role,
}; };
use multi_buffer::MultiBufferRow; use multi_buffer::MultiBufferRow;
use parking_lot::Mutex; use parking_lot::Mutex;
@ -241,12 +240,13 @@ impl InlineAssistant {
}; };
codegen_ranges.push(start..end); codegen_ranges.push(start..end);
if let Some(telemetry) = self.telemetry.as_ref() { if let Some(model) = LanguageModelRegistry::read_global(cx).active_model() {
if let Some(model) = LanguageModelRegistry::read_global(cx).active_model() { if let Some(telemetry) = self.telemetry.as_ref() {
telemetry.report_assistant_event(AssistantEvent { telemetry.report_assistant_event(AssistantEvent {
conversation_id: None, conversation_id: None,
kind: AssistantKind::Inline, kind: AssistantKind::Inline,
phase: AssistantPhase::Invoked, phase: AssistantPhase::Invoked,
message_id: None,
model: model.telemetry_id(), model: model.telemetry_id(),
model_provider: model.provider_id().to_string(), model_provider: model.provider_id().to_string(),
response_latency: None, response_latency: None,
@ -754,33 +754,6 @@ impl InlineAssistant {
pub fn finish_assist(&mut self, assist_id: InlineAssistId, undo: bool, cx: &mut WindowContext) { pub fn finish_assist(&mut self, assist_id: InlineAssistId, undo: bool, cx: &mut WindowContext) {
if let Some(assist) = self.assists.get(&assist_id) { if let Some(assist) = self.assists.get(&assist_id) {
if let Some(telemetry) = self.telemetry.as_ref() {
if let Some(model) = LanguageModelRegistry::read_global(cx).active_model() {
let language_name = assist.editor.upgrade().and_then(|editor| {
let multibuffer = editor.read(cx).buffer().read(cx);
let ranges = multibuffer.range_to_buffer_ranges(assist.range.clone(), cx);
ranges
.first()
.and_then(|(buffer, _, _)| buffer.read(cx).language())
.map(|language| language.name())
});
telemetry.report_assistant_event(AssistantEvent {
conversation_id: None,
kind: AssistantKind::Inline,
phase: if undo {
AssistantPhase::Rejected
} else {
AssistantPhase::Accepted
},
model: model.telemetry_id(),
model_provider: model.provider_id().to_string(),
response_latency: None,
error_message: None,
language_name: language_name.map(|name| name.to_proto()),
});
}
}
let assist_group_id = assist.group_id; let assist_group_id = assist.group_id;
if self.assist_groups[&assist_group_id].linked { if self.assist_groups[&assist_group_id].linked {
for assist_id in self.unlink_assist_group(assist_group_id, cx) { for assist_id in self.unlink_assist_group(assist_group_id, cx) {
@ -815,12 +788,45 @@ impl InlineAssistant {
} }
} }
let active_alternative = assist.codegen.read(cx).active_alternative().clone();
let message_id = active_alternative.read(cx).message_id.clone();
if let Some(model) = LanguageModelRegistry::read_global(cx).active_model() {
let language_name = assist.editor.upgrade().and_then(|editor| {
let multibuffer = editor.read(cx).buffer().read(cx);
let ranges = multibuffer.range_to_buffer_ranges(assist.range.clone(), cx);
ranges
.first()
.and_then(|(buffer, _, _)| buffer.read(cx).language())
.map(|language| language.name())
});
report_assistant_event(
AssistantEvent {
conversation_id: None,
kind: AssistantKind::Inline,
message_id,
phase: if undo {
AssistantPhase::Rejected
} else {
AssistantPhase::Accepted
},
model: model.telemetry_id(),
model_provider: model.provider_id().to_string(),
response_latency: None,
error_message: None,
language_name: language_name.map(|name| name.to_proto()),
},
self.telemetry.clone(),
cx.http_client(),
model.api_key(cx),
cx.background_executor(),
);
}
if undo { if undo {
assist.codegen.update(cx, |codegen, cx| codegen.undo(cx)); assist.codegen.update(cx, |codegen, cx| codegen.undo(cx));
} else { } else {
let confirmed_alternative = assist.codegen.read(cx).active_alternative().clone(); self.confirmed_assists.insert(assist_id, active_alternative);
self.confirmed_assists
.insert(assist_id, confirmed_alternative);
} }
} }
} }
@ -2497,6 +2503,7 @@ pub struct CodegenAlternative {
line_operations: Vec<LineOperation>, line_operations: Vec<LineOperation>,
request: Option<LanguageModelRequest>, request: Option<LanguageModelRequest>,
elapsed_time: Option<f64>, elapsed_time: Option<f64>,
message_id: Option<String>,
} }
enum CodegenStatus { enum CodegenStatus {
@ -2555,6 +2562,7 @@ impl CodegenAlternative {
buffer: buffer.clone(), buffer: buffer.clone(),
old_buffer, old_buffer,
edit_position: None, edit_position: None,
message_id: None,
snapshot, snapshot,
last_equal_ranges: Default::default(), last_equal_ranges: Default::default(),
transformation_transaction_id: None, transformation_transaction_id: None,
@ -2659,20 +2667,20 @@ impl CodegenAlternative {
self.edit_position = Some(self.range.start.bias_right(&self.snapshot)); self.edit_position = Some(self.range.start.bias_right(&self.snapshot));
let api_key = model.api_key(cx);
let telemetry_id = model.telemetry_id(); let telemetry_id = model.telemetry_id();
let provider_id = model.provider_id(); let provider_id = model.provider_id();
let chunks: LocalBoxFuture<Result<BoxStream<Result<String>>>> = let stream: LocalBoxFuture<Result<LanguageModelTextStream>> =
if user_prompt.trim().to_lowercase() == "delete" { if user_prompt.trim().to_lowercase() == "delete" {
async { Ok(stream::empty().boxed()) }.boxed_local() async { Ok(LanguageModelTextStream::default()) }.boxed_local()
} else { } else {
let request = self.build_request(user_prompt, assistant_panel_context, cx)?; let request = self.build_request(user_prompt, assistant_panel_context, cx)?;
self.request = Some(request.clone()); self.request = Some(request.clone());
let chunks = cx cx.spawn(|_, cx| async move { model.stream_completion_text(request, &cx).await })
.spawn(|_, cx| async move { model.stream_completion_text(request, &cx).await }); .boxed_local()
async move { Ok(chunks.await?.boxed()) }.boxed_local()
}; };
self.handle_stream(telemetry_id, provider_id.to_string(), chunks, cx); self.handle_stream(telemetry_id, provider_id.to_string(), api_key, stream, cx);
Ok(()) Ok(())
} }
@ -2737,7 +2745,8 @@ impl CodegenAlternative {
&mut self, &mut self,
model_telemetry_id: String, model_telemetry_id: String,
model_provider_id: String, model_provider_id: String,
stream: impl 'static + Future<Output = Result<BoxStream<'static, Result<String>>>>, model_api_key: Option<String>,
stream: impl 'static + Future<Output = Result<LanguageModelTextStream>>,
cx: &mut ModelContext<Self>, cx: &mut ModelContext<Self>,
) { ) {
let start_time = Instant::now(); let start_time = Instant::now();
@ -2767,6 +2776,7 @@ impl CodegenAlternative {
} }
} }
let http_client = cx.http_client().clone();
let telemetry = self.telemetry.clone(); let telemetry = self.telemetry.clone();
let language_name = { let language_name = {
let multibuffer = self.buffer.read(cx); let multibuffer = self.buffer.read(cx);
@ -2782,15 +2792,21 @@ impl CodegenAlternative {
let mut edit_start = self.range.start.to_offset(&snapshot); let mut edit_start = self.range.start.to_offset(&snapshot);
self.generation = cx.spawn(|codegen, mut cx| { self.generation = cx.spawn(|codegen, mut cx| {
async move { async move {
let chunks = stream.await; let stream = stream.await;
let message_id = stream
.as_ref()
.ok()
.and_then(|stream| stream.message_id.clone());
let generate = async { let generate = async {
let (mut diff_tx, mut diff_rx) = mpsc::channel(1); let (mut diff_tx, mut diff_rx) = mpsc::channel(1);
let executor = cx.background_executor().clone();
let message_id = message_id.clone();
let line_based_stream_diff: Task<anyhow::Result<()>> = let line_based_stream_diff: Task<anyhow::Result<()>> =
cx.background_executor().spawn(async move { cx.background_executor().spawn(async move {
let mut response_latency = None; let mut response_latency = None;
let request_start = Instant::now(); let request_start = Instant::now();
let diff = async { let diff = async {
let chunks = StripInvalidSpans::new(chunks?); let chunks = StripInvalidSpans::new(stream?.stream);
futures::pin_mut!(chunks); futures::pin_mut!(chunks);
let mut diff = StreamingDiff::new(selected_text.to_string()); let mut diff = StreamingDiff::new(selected_text.to_string());
let mut line_diff = LineDiff::default(); let mut line_diff = LineDiff::default();
@ -2886,9 +2902,10 @@ impl CodegenAlternative {
let error_message = let error_message =
result.as_ref().err().map(|error| error.to_string()); result.as_ref().err().map(|error| error.to_string());
if let Some(telemetry) = telemetry { report_assistant_event(
telemetry.report_assistant_event(AssistantEvent { AssistantEvent {
conversation_id: None, conversation_id: None,
message_id,
kind: AssistantKind::Inline, kind: AssistantKind::Inline,
phase: AssistantPhase::Response, phase: AssistantPhase::Response,
model: model_telemetry_id, model: model_telemetry_id,
@ -2896,8 +2913,12 @@ impl CodegenAlternative {
response_latency, response_latency,
error_message, error_message,
language_name: language_name.map(|name| name.to_proto()), language_name: language_name.map(|name| name.to_proto()),
}); },
} telemetry,
http_client,
model_api_key,
&executor,
);
result?; result?;
Ok(()) Ok(())
@ -2961,6 +2982,7 @@ impl CodegenAlternative {
codegen codegen
.update(&mut cx, |this, cx| { .update(&mut cx, |this, cx| {
this.message_id = message_id;
this.last_equal_ranges.clear(); this.last_equal_ranges.clear();
if let Err(error) = result { if let Err(error) = result {
this.status = CodegenStatus::Error(error); this.status = CodegenStatus::Error(error);
@ -3512,15 +3534,7 @@ mod tests {
) )
}); });
let (chunks_tx, chunks_rx) = mpsc::unbounded(); let chunks_tx = simulate_response_stream(codegen.clone(), cx);
codegen.update(cx, |codegen, cx| {
codegen.handle_stream(
String::new(),
String::new(),
future::ready(Ok(chunks_rx.map(Ok).boxed())),
cx,
)
});
let mut new_text = concat!( let mut new_text = concat!(
" let mut x = 0;\n", " let mut x = 0;\n",
@ -3584,15 +3598,7 @@ mod tests {
) )
}); });
let (chunks_tx, chunks_rx) = mpsc::unbounded(); let chunks_tx = simulate_response_stream(codegen.clone(), cx);
codegen.update(cx, |codegen, cx| {
codegen.handle_stream(
String::new(),
String::new(),
future::ready(Ok(chunks_rx.map(Ok).boxed())),
cx,
)
});
cx.background_executor.run_until_parked(); cx.background_executor.run_until_parked();
@ -3659,15 +3665,7 @@ mod tests {
) )
}); });
let (chunks_tx, chunks_rx) = mpsc::unbounded(); let chunks_tx = simulate_response_stream(codegen.clone(), cx);
codegen.update(cx, |codegen, cx| {
codegen.handle_stream(
String::new(),
String::new(),
future::ready(Ok(chunks_rx.map(Ok).boxed())),
cx,
)
});
cx.background_executor.run_until_parked(); cx.background_executor.run_until_parked();
@ -3733,16 +3731,7 @@ mod tests {
) )
}); });
let (chunks_tx, chunks_rx) = mpsc::unbounded(); let chunks_tx = simulate_response_stream(codegen.clone(), cx);
codegen.update(cx, |codegen, cx| {
codegen.handle_stream(
String::new(),
String::new(),
future::ready(Ok(chunks_rx.map(Ok).boxed())),
cx,
)
});
let new_text = concat!( let new_text = concat!(
"func main() {\n", "func main() {\n",
"\tx := 0\n", "\tx := 0\n",
@ -3797,16 +3786,7 @@ mod tests {
) )
}); });
let (chunks_tx, chunks_rx) = mpsc::unbounded(); let chunks_tx = simulate_response_stream(codegen.clone(), cx);
codegen.update(cx, |codegen, cx| {
codegen.handle_stream(
String::new(),
String::new(),
future::ready(Ok(chunks_rx.map(Ok).boxed())),
cx,
)
});
chunks_tx chunks_tx
.unbounded_send("let mut x = 0;\nx += 1;".to_string()) .unbounded_send("let mut x = 0;\nx += 1;".to_string())
.unwrap(); .unwrap();
@ -3880,6 +3860,26 @@ mod tests {
} }
} }
fn simulate_response_stream(
codegen: Model<CodegenAlternative>,
cx: &mut TestAppContext,
) -> mpsc::UnboundedSender<String> {
let (chunks_tx, chunks_rx) = mpsc::unbounded();
codegen.update(cx, |codegen, cx| {
codegen.handle_stream(
String::new(),
String::new(),
None,
future::ready(Ok(LanguageModelTextStream {
message_id: None,
stream: chunks_rx.map(Ok).boxed(),
})),
cx,
);
});
chunks_tx
}
fn rust_lang() -> Language { fn rust_lang() -> Language {
Language::new( Language::new(
LanguageConfig { LanguageConfig {

View file

@ -17,7 +17,8 @@ use gpui::{
}; };
use language::Buffer; use language::Buffer;
use language_model::{ use language_model::{
LanguageModelRegistry, LanguageModelRequest, LanguageModelRequestMessage, Role, logging::report_assistant_event, LanguageModelRegistry, LanguageModelRequest,
LanguageModelRequestMessage, Role,
}; };
use settings::Settings; use settings::Settings;
use std::{ use std::{
@ -306,6 +307,33 @@ impl TerminalInlineAssistant {
this.focus_handle(cx).focus(cx); this.focus_handle(cx).focus(cx);
}) })
.log_err(); .log_err();
if let Some(model) = LanguageModelRegistry::read_global(cx).active_model() {
let codegen = assist.codegen.read(cx);
let executor = cx.background_executor().clone();
report_assistant_event(
AssistantEvent {
conversation_id: None,
kind: AssistantKind::InlineTerminal,
message_id: codegen.message_id.clone(),
phase: if undo {
AssistantPhase::Rejected
} else {
AssistantPhase::Accepted
},
model: model.telemetry_id(),
model_provider: model.provider_id().to_string(),
response_latency: None,
error_message: None,
language_name: None,
},
codegen.telemetry.clone(),
cx.http_client(),
model.api_key(cx),
&executor,
);
}
assist.codegen.update(cx, |codegen, cx| { assist.codegen.update(cx, |codegen, cx| {
if undo { if undo {
codegen.undo(cx); codegen.undo(cx);
@ -1016,6 +1044,7 @@ pub struct Codegen {
telemetry: Option<Arc<Telemetry>>, telemetry: Option<Arc<Telemetry>>,
terminal: Model<Terminal>, terminal: Model<Terminal>,
generation: Task<()>, generation: Task<()>,
message_id: Option<String>,
transaction: Option<TerminalTransaction>, transaction: Option<TerminalTransaction>,
} }
@ -1026,6 +1055,7 @@ impl Codegen {
telemetry, telemetry,
status: CodegenStatus::Idle, status: CodegenStatus::Idle,
generation: Task::ready(()), generation: Task::ready(()),
message_id: None,
transaction: None, transaction: None,
} }
} }
@ -1035,6 +1065,8 @@ impl Codegen {
return; return;
}; };
let model_api_key = model.api_key(cx);
let http_client = cx.http_client();
let telemetry = self.telemetry.clone(); let telemetry = self.telemetry.clone();
self.status = CodegenStatus::Pending; self.status = CodegenStatus::Pending;
self.transaction = Some(TerminalTransaction::start(self.terminal.clone())); self.transaction = Some(TerminalTransaction::start(self.terminal.clone()));
@ -1043,44 +1075,62 @@ impl Codegen {
let model_provider_id = model.provider_id(); let model_provider_id = model.provider_id();
let response = model.stream_completion_text(prompt, &cx).await; let response = model.stream_completion_text(prompt, &cx).await;
let generate = async { let generate = async {
let message_id = response
.as_ref()
.ok()
.and_then(|response| response.message_id.clone());
let (mut hunks_tx, mut hunks_rx) = mpsc::channel(1); let (mut hunks_tx, mut hunks_rx) = mpsc::channel(1);
let task = cx.background_executor().spawn(async move { let task = cx.background_executor().spawn({
let mut response_latency = None; let message_id = message_id.clone();
let request_start = Instant::now(); let executor = cx.background_executor().clone();
let task = async { async move {
let mut chunks = response?; let mut response_latency = None;
while let Some(chunk) = chunks.next().await { let request_start = Instant::now();
if response_latency.is_none() { let task = async {
response_latency = Some(request_start.elapsed()); let mut chunks = response?.stream;
while let Some(chunk) = chunks.next().await {
if response_latency.is_none() {
response_latency = Some(request_start.elapsed());
}
let chunk = chunk?;
hunks_tx.send(chunk).await?;
} }
let chunk = chunk?;
hunks_tx.send(chunk).await?;
}
anyhow::Ok(())
};
let result = task.await;
let error_message = result.as_ref().err().map(|error| error.to_string());
report_assistant_event(
AssistantEvent {
conversation_id: None,
kind: AssistantKind::InlineTerminal,
message_id,
phase: AssistantPhase::Response,
model: model_telemetry_id,
model_provider: model_provider_id.to_string(),
response_latency,
error_message,
language_name: None,
},
telemetry,
http_client,
model_api_key,
&executor,
);
result?;
anyhow::Ok(()) anyhow::Ok(())
};
let result = task.await;
let error_message = result.as_ref().err().map(|error| error.to_string());
if let Some(telemetry) = telemetry {
telemetry.report_assistant_event(AssistantEvent {
conversation_id: None,
kind: AssistantKind::Inline,
phase: AssistantPhase::Response,
model: model_telemetry_id,
model_provider: model_provider_id.to_string(),
response_latency,
error_message,
language_name: None,
});
} }
result?;
anyhow::Ok(())
}); });
this.update(&mut cx, |this, _| {
this.message_id = message_id;
})?;
while let Some(hunk) = hunks_rx.next().await { while let Some(hunk) = hunks_rx.next().await {
this.update(&mut cx, |this, cx| { this.update(&mut cx, |this, cx| {
if let Some(transaction) = &mut this.transaction { if let Some(transaction) = &mut this.transaction {

View file

@ -341,6 +341,13 @@ impl Telemetry {
.detach(); .detach();
} }
pub fn metrics_enabled(self: &Arc<Self>) -> bool {
let state = self.state.lock();
let enabled = state.settings.metrics;
drop(state);
return enabled;
}
pub fn set_authenticated_user_info( pub fn set_authenticated_user_info(
self: &Arc<Self>, self: &Arc<Self>,
metrics_id: Option<String>, metrics_id: Option<String>,

View file

@ -46,6 +46,7 @@ serde_json.workspace = true
settings.workspace = true settings.workspace = true
smol.workspace = true smol.workspace = true
strum.workspace = true strum.workspace = true
telemetry_events.workspace = true
theme.workspace = true theme.workspace = true
thiserror.workspace = true thiserror.workspace = true
tiktoken-rs.workspace = true tiktoken-rs.workspace = true

View file

@ -1,3 +1,4 @@
pub mod logging;
mod model; mod model;
pub mod provider; pub mod provider;
mod rate_limiter; mod rate_limiter;
@ -59,6 +60,7 @@ pub enum LanguageModelCompletionEvent {
Stop(StopReason), Stop(StopReason),
Text(String), Text(String),
ToolUse(LanguageModelToolUse), ToolUse(LanguageModelToolUse),
StartMessage { message_id: String },
} }
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
@ -76,6 +78,20 @@ pub struct LanguageModelToolUse {
pub input: serde_json::Value, pub input: serde_json::Value,
} }
pub struct LanguageModelTextStream {
pub message_id: Option<String>,
pub stream: BoxStream<'static, Result<String>>,
}
impl Default for LanguageModelTextStream {
fn default() -> Self {
Self {
message_id: None,
stream: Box::pin(futures::stream::empty()),
}
}
}
pub trait LanguageModel: Send + Sync { pub trait LanguageModel: Send + Sync {
fn id(&self) -> LanguageModelId; fn id(&self) -> LanguageModelId;
fn name(&self) -> LanguageModelName; fn name(&self) -> LanguageModelName;
@ -87,6 +103,10 @@ pub trait LanguageModel: Send + Sync {
fn provider_name(&self) -> LanguageModelProviderName; fn provider_name(&self) -> LanguageModelProviderName;
fn telemetry_id(&self) -> String; fn telemetry_id(&self) -> String;
fn api_key(&self, _cx: &AppContext) -> Option<String> {
None
}
/// Returns the availability of this language model. /// Returns the availability of this language model.
fn availability(&self) -> LanguageModelAvailability { fn availability(&self) -> LanguageModelAvailability {
LanguageModelAvailability::Public LanguageModelAvailability::Public
@ -113,21 +133,39 @@ pub trait LanguageModel: Send + Sync {
&self, &self,
request: LanguageModelRequest, request: LanguageModelRequest,
cx: &AsyncAppContext, cx: &AsyncAppContext,
) -> BoxFuture<'static, Result<BoxStream<'static, Result<String>>>> { ) -> BoxFuture<'static, Result<LanguageModelTextStream>> {
let events = self.stream_completion(request, cx); let events = self.stream_completion(request, cx);
async move { async move {
Ok(events let mut events = events.await?;
.await? let mut message_id = None;
.filter_map(|result| async move { let mut first_item_text = None;
if let Some(first_event) = events.next().await {
match first_event {
Ok(LanguageModelCompletionEvent::StartMessage { message_id: id }) => {
message_id = Some(id.clone());
}
Ok(LanguageModelCompletionEvent::Text(text)) => {
first_item_text = Some(text);
}
_ => (),
}
}
let stream = futures::stream::iter(first_item_text.map(Ok))
.chain(events.filter_map(|result| async move {
match result { match result {
Ok(LanguageModelCompletionEvent::StartMessage { .. }) => None,
Ok(LanguageModelCompletionEvent::Text(text)) => Some(Ok(text)), Ok(LanguageModelCompletionEvent::Text(text)) => Some(Ok(text)),
Ok(LanguageModelCompletionEvent::Stop(_)) => None, Ok(LanguageModelCompletionEvent::Stop(_)) => None,
Ok(LanguageModelCompletionEvent::ToolUse(_)) => None, Ok(LanguageModelCompletionEvent::ToolUse(_)) => None,
Err(err) => Some(Err(err)), Err(err) => Some(Err(err)),
} }
}) }))
.boxed()) .boxed();
Ok(LanguageModelTextStream { message_id, stream })
} }
.boxed() .boxed()
} }

View file

@ -0,0 +1,90 @@
use anthropic::{AnthropicError, ANTHROPIC_API_URL};
use anyhow::{anyhow, Context, Result};
use client::telemetry::Telemetry;
use gpui::BackgroundExecutor;
use http_client::{AsyncBody, HttpClient, Method, Request as HttpRequest};
use std::env;
use std::sync::Arc;
use telemetry_events::{AssistantEvent, AssistantKind, AssistantPhase};
use util::ResultExt;
use crate::provider::anthropic::PROVIDER_ID as ANTHROPIC_PROVIDER_ID;
pub fn report_assistant_event(
event: AssistantEvent,
telemetry: Option<Arc<Telemetry>>,
client: Arc<dyn HttpClient>,
model_api_key: Option<String>,
executor: &BackgroundExecutor,
) {
if let Some(telemetry) = telemetry.as_ref() {
telemetry.report_assistant_event(event.clone());
if telemetry.metrics_enabled() && event.model_provider == ANTHROPIC_PROVIDER_ID {
executor
.spawn(async move {
report_anthropic_event(event, client, model_api_key)
.await
.log_err();
})
.detach();
}
}
}
async fn report_anthropic_event(
event: AssistantEvent,
client: Arc<dyn HttpClient>,
model_api_key: Option<String>,
) -> Result<(), AnthropicError> {
let api_key = match model_api_key {
Some(key) => key,
None => {
return Err(AnthropicError::Other(anyhow!(
"Anthropic API key is not set"
)));
}
};
let uri = format!("{ANTHROPIC_API_URL}/v1/log/zed");
let request_builder = HttpRequest::builder()
.method(Method::POST)
.uri(uri)
.header("X-Api-Key", api_key)
.header("Content-Type", "application/json");
let serialized_event: serde_json::Value = serde_json::json!({
"completion_type": match event.kind {
AssistantKind::Inline => "natural_language_completion_in_editor",
AssistantKind::InlineTerminal => "natural_language_completion_in_terminal",
AssistantKind::Panel => "conversation_message",
},
"event": match event.phase {
AssistantPhase::Response => "response",
AssistantPhase::Invoked => "invoke",
AssistantPhase::Accepted => "accept",
AssistantPhase::Rejected => "reject",
},
"metadata": {
"language_name": event.language_name,
"message_id": event.message_id,
"platform": env::consts::OS,
}
});
let request = request_builder
.body(AsyncBody::from(serialized_event.to_string()))
.context("failed to construct request body")?;
let response = client
.send(request)
.await
.context("failed to send request to Anthropic")?;
if response.status().is_success() {
return Ok(());
}
return Err(AnthropicError::Other(anyhow!(
"Failed to log: {}",
response.status(),
)));
}

View file

@ -26,7 +26,7 @@ use theme::ThemeSettings;
use ui::{prelude::*, Icon, IconName, Tooltip}; use ui::{prelude::*, Icon, IconName, Tooltip};
use util::{maybe, ResultExt}; use util::{maybe, ResultExt};
const PROVIDER_ID: &str = "anthropic"; pub const PROVIDER_ID: &str = "anthropic";
const PROVIDER_NAME: &str = "Anthropic"; const PROVIDER_NAME: &str = "Anthropic";
#[derive(Default, Clone, Debug, PartialEq)] #[derive(Default, Clone, Debug, PartialEq)]
@ -356,6 +356,10 @@ impl LanguageModel for AnthropicModel {
format!("anthropic/{}", self.model.id()) format!("anthropic/{}", self.model.id())
} }
fn api_key(&self, cx: &AppContext) -> Option<String> {
self.state.read(cx).api_key.clone()
}
fn max_token_count(&self) -> usize { fn max_token_count(&self) -> usize {
self.model.max_token_count() self.model.max_token_count()
} }
@ -520,6 +524,14 @@ pub fn map_to_language_model_completion_events(
)); ));
} }
} }
Event::MessageStart { message } => {
return Some((
Some(Ok(LanguageModelCompletionEvent::StartMessage {
message_id: message.id,
})),
state,
))
}
Event::MessageDelta { delta, .. } => { Event::MessageDelta { delta, .. } => {
if let Some(stop_reason) = delta.stop_reason.as_deref() { if let Some(stop_reason) = delta.stop_reason.as_deref() {
let stop_reason = match stop_reason { let stop_reason = match stop_reason {

View file

@ -47,6 +47,7 @@ pub struct EventWrapper {
pub enum AssistantKind { pub enum AssistantKind {
Panel, Panel,
Inline, Inline,
InlineTerminal,
} }
impl Display for AssistantKind { impl Display for AssistantKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@ -56,6 +57,7 @@ impl Display for AssistantKind {
match self { match self {
Self::Panel => "panel", Self::Panel => "panel",
Self::Inline => "inline", Self::Inline => "inline",
Self::InlineTerminal => "inline_terminal",
} }
) )
} }
@ -140,6 +142,8 @@ pub struct CallEvent {
pub struct AssistantEvent { pub struct AssistantEvent {
/// Unique random identifier for each assistant tab (None for inline assist) /// Unique random identifier for each assistant tab (None for inline assist)
pub conversation_id: Option<String>, pub conversation_id: Option<String>,
/// Server-generated message ID (only supported for some providers)
pub message_id: Option<String>,
/// The kind of assistant (Panel, Inline) /// The kind of assistant (Panel, Inline)
pub kind: AssistantKind, pub kind: AssistantKind,
#[serde(default)] #[serde(default)]