use super::ips_file::IpsFile; use crate::api::CloudflareIpCountryHeader; use crate::{api::slack, AppState, Error, Result}; use anyhow::anyhow; use aws_sdk_s3::primitives::ByteStream; use axum::{ body::Bytes, headers::Header, http::{HeaderMap, HeaderName, StatusCode}, routing::post, Extension, Router, TypedHeader, }; use chrono::Duration; use semantic_version::SemanticVersion; use serde::{Deserialize, Serialize}; use serde_json::json; use sha2::{Digest, Sha256}; use std::sync::{Arc, OnceLock}; use telemetry_events::{Event, EventRequestBody, Panic}; use util::ResultExt; use uuid::Uuid; const CRASH_REPORTS_BUCKET: &str = "zed-crash-reports"; pub fn router() -> Router { Router::new() .route("/telemetry/events", post(post_events)) .route("/telemetry/crashes", post(post_crash)) .route("/telemetry/panics", post(post_panic)) .route("/telemetry/hangs", post(post_hang)) } pub struct ZedChecksumHeader(Vec); impl Header for ZedChecksumHeader { fn name() -> &'static HeaderName { static ZED_CHECKSUM_HEADER: OnceLock = OnceLock::new(); ZED_CHECKSUM_HEADER.get_or_init(|| HeaderName::from_static("x-zed-checksum")) } fn decode<'i, I>(values: &mut I) -> Result where Self: Sized, I: Iterator, { let checksum = values .next() .ok_or_else(axum::headers::Error::invalid)? .to_str() .map_err(|_| axum::headers::Error::invalid())?; let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?; Ok(Self(bytes)) } fn encode>(&self, _values: &mut E) { unimplemented!() } } pub async fn post_crash( Extension(app): Extension>, headers: HeaderMap, body: Bytes, ) -> Result<()> { let report = IpsFile::parse(&body)?; let version_threshold = SemanticVersion::new(0, 123, 0); let bundle_id = &report.header.bundle_id; let app_version = &report.app_version(); if bundle_id == "dev.zed.Zed-Dev" { log::error!("Crash uploads from {} are ignored.", bundle_id); return Ok(()); } if app_version.is_none() || app_version.unwrap() < version_threshold { log::error!( "Crash uploads from {} are ignored.", report.header.app_version ); return Ok(()); } let app_version = app_version.unwrap(); if let Some(blob_store_client) = app.blob_store_client.as_ref() { let response = blob_store_client .head_object() .bucket(CRASH_REPORTS_BUCKET) .key(report.header.incident_id.clone() + ".ips") .send() .await; if response.is_ok() { log::info!("We've already uploaded this crash"); return Ok(()); } blob_store_client .put_object() .bucket(CRASH_REPORTS_BUCKET) .key(report.header.incident_id.clone() + ".ips") .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead) .body(ByteStream::from(body.to_vec())) .send() .await .map_err(|e| log::error!("Failed to upload crash: {}", e)) .ok(); } let recent_panic_on: Option = headers .get("x-zed-panicked-on") .and_then(|h| h.to_str().ok()) .and_then(|s| s.parse().ok()); let installation_id = headers .get("x-zed-installation-id") .and_then(|h| h.to_str().ok()) .map(|s| s.to_string()) .unwrap_or_default(); let mut recent_panic = None; if let Some(recent_panic_on) = recent_panic_on { let crashed_at = match report.timestamp() { Ok(t) => Some(t), Err(e) => { log::error!("Can't parse {}: {}", report.header.timestamp, e); None } }; if crashed_at.is_some_and(|t| (t.timestamp_millis() - recent_panic_on).abs() <= 30000) { recent_panic = headers.get("x-zed-panic").and_then(|h| h.to_str().ok()); } } let description = report.description(recent_panic); let summary = report.backtrace_summary(); tracing::error!( service = "client", version = %report.header.app_version, os_version = %report.header.os_version, bundle_id = %report.header.bundle_id, incident_id = %report.header.incident_id, installation_id = %installation_id, description = %description, backtrace = %summary, "crash report" ); if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() { let payload = slack::WebhookBody::new(|w| { w.add_section(|s| s.text(slack::Text::markdown(description))) .add_section(|s| { s.add_field(slack::Text::markdown(format!( "*Version:*\n{} ({})", bundle_id, app_version ))) .add_field({ let hostname = app.config.blob_store_url.clone().unwrap_or_default(); let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| { hostname.strip_prefix("http://").unwrap_or_default() }); slack::Text::markdown(format!( "*Incident:*\n", CRASH_REPORTS_BUCKET, hostname, report.header.incident_id, report .header .incident_id .chars() .take(8) .collect::(), )) }) }) .add_rich_text(|r| r.add_preformatted(|p| p.add_text(summary))) }); let payload_json = serde_json::to_string(&payload).map_err(|err| { log::error!("Failed to serialize payload to JSON: {err}"); Error::Internal(anyhow!(err)) })?; reqwest::Client::new() .post(slack_panics_webhook) .header("Content-Type", "application/json") .body(payload_json) .send() .await .map_err(|err| { log::error!("Failed to send payload to Slack: {err}"); Error::Internal(anyhow!(err)) })?; } Ok(()) } pub async fn post_hang( Extension(app): Extension>, TypedHeader(ZedChecksumHeader(checksum)): TypedHeader, body: Bytes, ) -> Result<()> { let Some(expected) = calculate_json_checksum(app.clone(), &body) else { return Err(Error::http( StatusCode::INTERNAL_SERVER_ERROR, "events not enabled".into(), ))?; }; if checksum != expected { return Err(Error::http( StatusCode::BAD_REQUEST, "invalid checksum".into(), ))?; } let incident_id = Uuid::new_v4().to_string(); // dump JSON into S3 so we can get frame offsets if we need to. if let Some(blob_store_client) = app.blob_store_client.as_ref() { blob_store_client .put_object() .bucket(CRASH_REPORTS_BUCKET) .key(incident_id.clone() + ".hang.json") .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead) .body(ByteStream::from(body.to_vec())) .send() .await .map_err(|e| log::error!("Failed to upload crash: {}", e)) .ok(); } let report: telemetry_events::HangReport = serde_json::from_slice(&body).map_err(|err| { log::error!("can't parse report json: {err}"); Error::Internal(anyhow!(err)) })?; let mut backtrace = "Possible hang detected on main thread:".to_string(); let unknown = "".to_string(); for frame in report.backtrace.iter() { backtrace.push_str(&format!("\n{}", frame.symbols.first().unwrap_or(&unknown))); } tracing::error!( service = "client", version = %report.app_version.unwrap_or_default().to_string(), os_name = %report.os_name, os_version = report.os_version.unwrap_or_default().to_string(), incident_id = %incident_id, installation_id = %report.installation_id.unwrap_or_default(), backtrace = %backtrace, "hang report"); Ok(()) } pub async fn post_panic( Extension(app): Extension>, TypedHeader(ZedChecksumHeader(checksum)): TypedHeader, body: Bytes, ) -> Result<()> { let Some(expected) = calculate_json_checksum(app.clone(), &body) else { return Err(Error::http( StatusCode::INTERNAL_SERVER_ERROR, "events not enabled".into(), ))?; }; if checksum != expected { return Err(Error::http( StatusCode::BAD_REQUEST, "invalid checksum".into(), ))?; } let report: telemetry_events::PanicRequest = serde_json::from_slice(&body) .map_err(|_| Error::http(StatusCode::BAD_REQUEST, "invalid json".into()))?; let incident_id = uuid::Uuid::new_v4().to_string(); let panic = report.panic; if panic.os_name == "Linux" && panic.os_version == Some("1.0.0".to_string()) { return Err(Error::http( StatusCode::BAD_REQUEST, "invalid os version".into(), ))?; } if let Some(blob_store_client) = app.blob_store_client.as_ref() { let response = blob_store_client .head_object() .bucket(CRASH_REPORTS_BUCKET) .key(incident_id.clone() + ".json") .send() .await; if response.is_ok() { log::info!("We've already uploaded this crash"); return Ok(()); } blob_store_client .put_object() .bucket(CRASH_REPORTS_BUCKET) .key(incident_id.clone() + ".json") .acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead) .body(ByteStream::from(body.to_vec())) .send() .await .map_err(|e| log::error!("Failed to upload crash: {}", e)) .ok(); } tracing::error!( service = "client", version = %panic.app_version, os_name = %panic.os_name, os_version = %panic.os_version.clone().unwrap_or_default(), incident_id = %incident_id, installation_id = %panic.installation_id.clone().unwrap_or_default(), description = %panic.payload, backtrace = %panic.backtrace.join("\n"), "panic report" ); let backtrace = if panic.backtrace.len() > 25 { let total = panic.backtrace.len(); format!( "{}\n and {} more", panic .backtrace .iter() .take(20) .cloned() .collect::>() .join("\n"), total - 20 ) } else { panic.backtrace.join("\n") }; if !report_to_slack(&panic) { return Ok(()); } let backtrace_with_summary = panic.payload + "\n" + &backtrace; if let Some(slack_panics_webhook) = app.config.slack_panics_webhook.clone() { let payload = slack::WebhookBody::new(|w| { w.add_section(|s| s.text(slack::Text::markdown("Panic request".to_string()))) .add_section(|s| { s.add_field(slack::Text::markdown(format!( "*Version:*\n {} ", panic.app_version ))) .add_field({ let hostname = app.config.blob_store_url.clone().unwrap_or_default(); let hostname = hostname.strip_prefix("https://").unwrap_or_else(|| { hostname.strip_prefix("http://").unwrap_or_default() }); slack::Text::markdown(format!( "*{} {}:*\n", panic.os_name, panic.os_version.unwrap_or_default(), CRASH_REPORTS_BUCKET, hostname, incident_id, incident_id.chars().take(8).collect::(), )) }) }) .add_rich_text(|r| r.add_preformatted(|p| p.add_text(backtrace_with_summary))) }); let payload_json = serde_json::to_string(&payload).map_err(|err| { log::error!("Failed to serialize payload to JSON: {err}"); Error::Internal(anyhow!(err)) })?; reqwest::Client::new() .post(slack_panics_webhook) .header("Content-Type", "application/json") .body(payload_json) .send() .await .map_err(|err| { log::error!("Failed to send payload to Slack: {err}"); Error::Internal(anyhow!(err)) })?; } Ok(()) } fn report_to_slack(panic: &Panic) -> bool { // Panics on macOS should make their way to Slack as a crash report, // so we don't need to send them a second time via this channel. if panic.os_name == "macOS" { return false; } if panic.payload.contains("ERROR_SURFACE_LOST_KHR") { return false; } if panic.payload.contains("ERROR_INITIALIZATION_FAILED") { return false; } if panic .payload .contains("GPU has crashed, and no debug information is available") { return false; } true } pub async fn post_events( Extension(app): Extension>, TypedHeader(ZedChecksumHeader(checksum)): TypedHeader, country_code_header: Option>, body: Bytes, ) -> Result<()> { let Some(expected) = calculate_json_checksum(app.clone(), &body) else { return Err(Error::http( StatusCode::INTERNAL_SERVER_ERROR, "events not enabled".into(), ))?; }; let checksum_matched = checksum == expected; let request_body: telemetry_events::EventRequestBody = serde_json::from_slice(&body).map_err(|err| { log::error!("can't parse event json: {err}"); Error::Internal(anyhow!(err)) })?; let Some(last_event) = request_body.events.last() else { return Err(Error::http(StatusCode::BAD_REQUEST, "no events".into()))?; }; let country_code = country_code_header.map(|h| h.to_string()); let first_event_at = chrono::Utc::now() - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event); if let Some(kinesis_client) = app.kinesis_client.clone() { if let Some(stream) = app.config.kinesis_stream.clone() { let mut request = kinesis_client.put_records().stream_name(stream); for row in for_snowflake( request_body.clone(), first_event_at, country_code.clone(), checksum_matched, ) { if let Some(data) = serde_json::to_vec(&row).log_err() { request = request.records( aws_sdk_kinesis::types::PutRecordsRequestEntry::builder() .partition_key(request_body.system_id.clone().unwrap_or_default()) .data(data.into()) .build() .unwrap(), ); } } request.send().await.log_err(); } }; Ok(()) } pub fn calculate_json_checksum(app: Arc, json: &impl AsRef<[u8]>) -> Option> { let checksum_seed = app.config.zed_client_checksum_seed.as_ref()?; let mut summer = Sha256::new(); summer.update(checksum_seed); summer.update(json); summer.update(checksum_seed); Some(summer.finalize().into_iter().collect()) } fn for_snowflake( body: EventRequestBody, first_event_at: chrono::DateTime, country_code: Option, checksum_matched: bool, ) -> impl Iterator { body.events.into_iter().flat_map(move |event| { let timestamp = first_event_at + Duration::milliseconds(event.milliseconds_since_first_event); let (event_type, mut event_properties) = match &event.event { Event::Editor(e) => ( match e.operation.as_str() { "open" => "Editor Opened".to_string(), "save" => "Editor Saved".to_string(), _ => format!("Unknown Editor Event: {}", e.operation), }, serde_json::to_value(e).unwrap(), ), Event::InlineCompletion(e) => ( format!( "Inline Completion {}", if e.suggestion_accepted { "Accepted" } else { "Discarded" } ), serde_json::to_value(e).unwrap(), ), Event::InlineCompletionRating(e) => ( "Inline Completion Rated".to_string(), serde_json::to_value(e).unwrap(), ), Event::Call(e) => { let event_type = match e.operation.trim() { "unshare project" => "Project Unshared".to_string(), "open channel notes" => "Channel Notes Opened".to_string(), "share project" => "Project Shared".to_string(), "join channel" => "Channel Joined".to_string(), "hang up" => "Call Ended".to_string(), "accept incoming" => "Incoming Call Accepted".to_string(), "invite" => "Participant Invited".to_string(), "disable microphone" => "Microphone Disabled".to_string(), "enable microphone" => "Microphone Enabled".to_string(), "enable screen share" => "Screen Share Enabled".to_string(), "disable screen share" => "Screen Share Disabled".to_string(), "decline incoming" => "Incoming Call Declined".to_string(), _ => format!("Unknown Call Event: {}", e.operation), }; (event_type, serde_json::to_value(e).unwrap()) } Event::Assistant(e) => ( match e.phase { telemetry_events::AssistantPhase::Response => "Assistant Responded".to_string(), telemetry_events::AssistantPhase::Invoked => "Assistant Invoked".to_string(), telemetry_events::AssistantPhase::Accepted => { "Assistant Response Accepted".to_string() } telemetry_events::AssistantPhase::Rejected => { "Assistant Response Rejected".to_string() } }, serde_json::to_value(e).unwrap(), ), Event::Cpu(_) | Event::Memory(_) => return None, Event::App(e) => { let mut properties = json!({}); let event_type = match e.operation.trim() { // App "open" => "App Opened".to_string(), "first open" => "App First Opened".to_string(), "first open for release channel" => { "App First Opened For Release Channel".to_string() } "close" => "App Closed".to_string(), // Project "open project" => "Project Opened".to_string(), "open node project" => { properties["project_type"] = json!("node"); "Project Opened".to_string() } "open pnpm project" => { properties["project_type"] = json!("pnpm"); "Project Opened".to_string() } "open yarn project" => { properties["project_type"] = json!("yarn"); "Project Opened".to_string() } // SSH "create ssh server" => "SSH Server Created".to_string(), "create ssh project" => "SSH Project Created".to_string(), "open ssh project" => "SSH Project Opened".to_string(), // Welcome Page "welcome page: change keymap" => "Welcome Keymap Changed".to_string(), "welcome page: change theme" => "Welcome Theme Changed".to_string(), "welcome page: close" => "Welcome Page Closed".to_string(), "welcome page: edit settings" => "Welcome Settings Edited".to_string(), "welcome page: install cli" => "Welcome CLI Installed".to_string(), "welcome page: open" => "Welcome Page Opened".to_string(), "welcome page: open extensions" => "Welcome Extensions Page Opened".to_string(), "welcome page: sign in to copilot" => "Welcome Copilot Signed In".to_string(), "welcome page: toggle diagnostic telemetry" => { "Welcome Diagnostic Telemetry Toggled".to_string() } "welcome page: toggle metric telemetry" => { "Welcome Metric Telemetry Toggled".to_string() } "welcome page: toggle vim" => "Welcome Vim Mode Toggled".to_string(), "welcome page: view docs" => "Welcome Documentation Viewed".to_string(), // Extensions "extensions page: open" => "Extensions Page Opened".to_string(), "extensions: install extension" => "Extension Installed".to_string(), "extensions: uninstall extension" => "Extension Uninstalled".to_string(), // Misc "markdown preview: open" => "Markdown Preview Opened".to_string(), "project diagnostics: open" => "Project Diagnostics Opened".to_string(), "project search: open" => "Project Search Opened".to_string(), "repl sessions: open" => "REPL Session Started".to_string(), // Feature Upsell "feature upsell: toggle vim" => { properties["source"] = json!("Feature Upsell"); "Vim Mode Toggled".to_string() } _ => e .operation .strip_prefix("feature upsell: viewed docs (") .and_then(|s| s.strip_suffix(')')) .map_or_else( || format!("Unknown App Event: {}", e.operation), |docs_url| { properties["url"] = json!(docs_url); properties["source"] = json!("Feature Upsell"); "Documentation Viewed".to_string() }, ), }; (event_type, properties) } Event::Setting(e) => ( "Settings Changed".to_string(), serde_json::to_value(e).unwrap(), ), Event::Extension(e) => ( "Extension Loaded".to_string(), serde_json::to_value(e).unwrap(), ), Event::Edit(e) => ( "Editor Edited".to_string(), serde_json::to_value(e).unwrap(), ), Event::Action(e) => ( "Action Invoked".to_string(), serde_json::to_value(e).unwrap(), ), Event::Repl(e) => ( "Kernel Status Changed".to_string(), serde_json::to_value(e).unwrap(), ), Event::Flexible(e) => ( e.event_type.clone(), serde_json::to_value(&e.event_properties).unwrap(), ), }; if let serde_json::Value::Object(ref mut map) = event_properties { map.insert("app_version".to_string(), body.app_version.clone().into()); map.insert("os_name".to_string(), body.os_name.clone().into()); map.insert("os_version".to_string(), body.os_version.clone().into()); map.insert("architecture".to_string(), body.architecture.clone().into()); map.insert( "release_channel".to_string(), body.release_channel.clone().into(), ); map.insert("signed_in".to_string(), event.signed_in.into()); map.insert("checksum_matched".to_string(), checksum_matched.into()); if let Some(country_code) = country_code.as_ref() { map.insert("country".to_string(), country_code.clone().into()); } } // NOTE: most amplitude user properties are read out of our event_properties // dictionary. See https://app.amplitude.com/data/zed/Zed/sources/detail/production/falcon%3A159998 // for how that is configured. let user_properties = Some(serde_json::json!({ "is_staff": body.is_staff, })); Some(SnowflakeRow { time: timestamp, user_id: body.metrics_id.clone(), device_id: body.system_id.clone(), event_type, event_properties, user_properties, insert_id: Some(Uuid::new_v4().to_string()), }) }) } #[derive(Serialize, Deserialize, Debug)] pub struct SnowflakeRow { pub time: chrono::DateTime, pub user_id: Option, pub device_id: Option, pub event_type: String, pub event_properties: serde_json::Value, pub user_properties: Option, pub insert_id: Option, } impl SnowflakeRow { pub fn new( event_type: impl Into, metrics_id: Uuid, is_staff: bool, system_id: Option, event_properties: serde_json::Value, ) -> Self { Self { time: chrono::Utc::now(), event_type: event_type.into(), device_id: system_id, user_id: Some(metrics_id.to_string()), insert_id: Some(uuid::Uuid::new_v4().to_string()), event_properties, user_properties: Some(json!({"is_staff": is_staff})), } } pub async fn write( self, client: &Option, stream: &Option, ) -> anyhow::Result<()> { let Some((client, stream)) = client.as_ref().zip(stream.as_ref()) else { return Ok(()); }; let row = serde_json::to_vec(&self)?; client .put_record() .stream_name(stream) .partition_key(&self.user_id.unwrap_or_default()) .data(row.into()) .send() .await?; Ok(()) } }