Send events to Snowflake in the format they're expected by Amplitude (#20765)
This will allow us to use the events table directly in Amplitude, which lets us use the newer event ingestion flow that detects changes to the table. Otherwise we'll need a transformation. I think Amplitude's API is probably a pretty good example to follow for the raw event schema, even if we don't end up using their product. They also recommend a "Noun Verbed" format for naming events, so I think we should go with this. This will help us be consistent and encourage the author of events to think more clearly about what event they're reporting. cc @ConradIrwin Release Notes: - N/A
This commit is contained in:
parent
97e9137cb7
commit
f9990b42fa
1 changed files with 182 additions and 96 deletions
|
@ -15,6 +15,7 @@ use chrono::Duration;
|
|||
use rpc::ExtensionMetadata;
|
||||
use semantic_version::SemanticVersion;
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use serde_json::json;
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use telemetry_events::{
|
||||
|
@ -1392,111 +1393,196 @@ fn for_snowflake(
|
|||
body: EventRequestBody,
|
||||
first_event_at: chrono::DateTime<chrono::Utc>,
|
||||
) -> impl Iterator<Item = SnowflakeRow> {
|
||||
body.events.into_iter().map(move |event| SnowflakeRow {
|
||||
event: match &event.event {
|
||||
Event::Editor(editor_event) => format!("editor_{}", editor_event.operation),
|
||||
Event::InlineCompletion(inline_completion_event) => format!(
|
||||
"inline_completion_{}",
|
||||
if inline_completion_event.suggestion_accepted {
|
||||
"accept "
|
||||
} else {
|
||||
"discard"
|
||||
}
|
||||
body.events.into_iter().map(move |event| {
|
||||
let timestamp =
|
||||
first_event_at + Duration::milliseconds(event.milliseconds_since_first_event);
|
||||
let (event_type, mut event_properties) = match &event.event {
|
||||
Event::Editor(e) => (
|
||||
match e.operation.as_str() {
|
||||
"open" => "Editor Opened".to_string(),
|
||||
"save" => "Editor Saved".to_string(),
|
||||
_ => format!("Unknown Editor Event: {}", e.operation),
|
||||
},
|
||||
serde_json::to_value(e).unwrap(),
|
||||
),
|
||||
Event::Call(call_event) => format!("call_{}", call_event.operation.replace(" ", "_")),
|
||||
Event::Assistant(assistant_event) => {
|
||||
Event::InlineCompletion(e) => (
|
||||
format!(
|
||||
"assistant_{}",
|
||||
match assistant_event.phase {
|
||||
telemetry_events::AssistantPhase::Response => "response",
|
||||
telemetry_events::AssistantPhase::Invoked => "invoke",
|
||||
telemetry_events::AssistantPhase::Accepted => "accept",
|
||||
telemetry_events::AssistantPhase::Rejected => "reject",
|
||||
"Inline Completion {}",
|
||||
if e.suggestion_accepted {
|
||||
"Accepted"
|
||||
} else {
|
||||
"Discarded"
|
||||
}
|
||||
)
|
||||
),
|
||||
serde_json::to_value(e).unwrap(),
|
||||
),
|
||||
Event::Call(e) => {
|
||||
let event_type = match e.operation.trim() {
|
||||
"unshare project" => "Project Unshared".to_string(),
|
||||
"open channel notes" => "Channel Notes Opened".to_string(),
|
||||
"share project" => "Project Shared".to_string(),
|
||||
"join channel" => "Channel Joined".to_string(),
|
||||
"hang up" => "Call Ended".to_string(),
|
||||
"accept incoming" => "Incoming Call Accepted".to_string(),
|
||||
"invite" => "Participant Invited".to_string(),
|
||||
"disable microphone" => "Microphone Disabled".to_string(),
|
||||
"enable microphone" => "Microphone Enabled".to_string(),
|
||||
"enable screen share" => "Screen Share Enabled".to_string(),
|
||||
"disable screen share" => "Screen Share Disabled".to_string(),
|
||||
"decline incoming" => "Incoming Call Declined".to_string(),
|
||||
"enable camera" => "Camera Enabled".to_string(),
|
||||
"disable camera" => "Camera Disabled".to_string(),
|
||||
_ => format!("Unknown Call Event: {}", e.operation),
|
||||
};
|
||||
|
||||
(event_type, serde_json::to_value(e).unwrap())
|
||||
}
|
||||
Event::Cpu(_) => "system_cpu".to_string(),
|
||||
Event::Memory(_) => "system_memory".to_string(),
|
||||
Event::App(app_event) => app_event.operation.replace(" ", "_"),
|
||||
Event::Setting(_) => "setting_change".to_string(),
|
||||
Event::Extension(_) => "extension_load".to_string(),
|
||||
Event::Edit(_) => "edit".to_string(),
|
||||
Event::Action(_) => "command_palette_action".to_string(),
|
||||
Event::Repl(_) => "repl".to_string(),
|
||||
},
|
||||
system_id: body.system_id.clone(),
|
||||
timestamp: first_event_at + Duration::milliseconds(event.milliseconds_since_first_event),
|
||||
data: SnowflakeData {
|
||||
installation_id: body.installation_id.clone(),
|
||||
session_id: body.session_id.clone(),
|
||||
metrics_id: body.metrics_id.clone(),
|
||||
is_staff: body.is_staff,
|
||||
app_version: body.app_version.clone(),
|
||||
os_name: body.os_name.clone(),
|
||||
os_version: body.os_version.clone(),
|
||||
architecture: body.architecture.clone(),
|
||||
release_channel: body.release_channel.clone(),
|
||||
signed_in: event.signed_in,
|
||||
editor_event: match &event.event {
|
||||
Event::Editor(editor_event) => Some(editor_event.clone()),
|
||||
_ => None,
|
||||
},
|
||||
inline_completion_event: match &event.event {
|
||||
Event::InlineCompletion(inline_completion_event) => {
|
||||
Some(inline_completion_event.clone())
|
||||
}
|
||||
_ => None,
|
||||
},
|
||||
call_event: match &event.event {
|
||||
Event::Call(call_event) => Some(call_event.clone()),
|
||||
_ => None,
|
||||
},
|
||||
assistant_event: match &event.event {
|
||||
Event::Assistant(assistant_event) => Some(assistant_event.clone()),
|
||||
_ => None,
|
||||
},
|
||||
cpu_event: match &event.event {
|
||||
Event::Cpu(cpu_event) => Some(cpu_event.clone()),
|
||||
_ => None,
|
||||
},
|
||||
memory_event: match &event.event {
|
||||
Event::Memory(memory_event) => Some(memory_event.clone()),
|
||||
_ => None,
|
||||
},
|
||||
app_event: match &event.event {
|
||||
Event::App(app_event) => Some(app_event.clone()),
|
||||
_ => None,
|
||||
},
|
||||
setting_event: match &event.event {
|
||||
Event::Setting(setting_event) => Some(setting_event.clone()),
|
||||
_ => None,
|
||||
},
|
||||
extension_event: match &event.event {
|
||||
Event::Extension(extension_event) => Some(extension_event.clone()),
|
||||
_ => None,
|
||||
},
|
||||
edit_event: match &event.event {
|
||||
Event::Edit(edit_event) => Some(edit_event.clone()),
|
||||
_ => None,
|
||||
},
|
||||
repl_event: match &event.event {
|
||||
Event::Repl(repl_event) => Some(repl_event.clone()),
|
||||
_ => None,
|
||||
},
|
||||
action_event: match event.event {
|
||||
Event::Action(action_event) => Some(action_event.clone()),
|
||||
_ => None,
|
||||
},
|
||||
},
|
||||
Event::Assistant(e) => (
|
||||
match e.phase {
|
||||
telemetry_events::AssistantPhase::Response => "Assistant Responded".to_string(),
|
||||
telemetry_events::AssistantPhase::Invoked => "Assistant Invoked".to_string(),
|
||||
telemetry_events::AssistantPhase::Accepted => {
|
||||
"Assistant Response Accepted".to_string()
|
||||
}
|
||||
telemetry_events::AssistantPhase::Rejected => {
|
||||
"Assistant Response Rejected".to_string()
|
||||
}
|
||||
},
|
||||
serde_json::to_value(e).unwrap(),
|
||||
),
|
||||
Event::Cpu(e) => (
|
||||
"System CPU Sampled".to_string(),
|
||||
serde_json::to_value(e).unwrap(),
|
||||
),
|
||||
Event::Memory(e) => (
|
||||
"System Memory Sampled".to_string(),
|
||||
serde_json::to_value(e).unwrap(),
|
||||
),
|
||||
Event::App(e) => {
|
||||
let mut properties = json!({});
|
||||
let event_type = match e.operation.trim() {
|
||||
"extensions: install extension" => "Extension Installed".to_string(),
|
||||
"open" => "App Opened".to_string(),
|
||||
"project search: open" => "Project Search Opened".to_string(),
|
||||
"first open" => {
|
||||
properties["is_first_open"] = json!(true);
|
||||
"App First Opened".to_string()
|
||||
}
|
||||
"extensions: uninstall extension" => "Extension Uninstalled".to_string(),
|
||||
"welcome page: close" => "Welcome Page Closed".to_string(),
|
||||
"open project" => {
|
||||
properties["is_first_time"] = json!(false);
|
||||
"Project Opened".to_string()
|
||||
}
|
||||
"welcome page: install cli" => "CLI Installed".to_string(),
|
||||
"project diagnostics: open" => "Project Diagnostics Opened".to_string(),
|
||||
"extensions page: open" => "Extensions Page Opened".to_string(),
|
||||
"welcome page: change theme" => "Welcome Theme Changed".to_string(),
|
||||
"welcome page: toggle metric telemetry" => {
|
||||
properties["enabled"] = json!(false);
|
||||
"Welcome Telemetry Toggled".to_string()
|
||||
}
|
||||
"welcome page: change keymap" => "Keymap Changed".to_string(),
|
||||
"welcome page: toggle vim" => {
|
||||
properties["enabled"] = json!(false);
|
||||
"Welcome Vim Mode Toggled".to_string()
|
||||
}
|
||||
"welcome page: sign in to copilot" => "Welcome Copilot Signed In".to_string(),
|
||||
"welcome page: toggle diagnostic telemetry" => {
|
||||
"Welcome Telemetry Toggled".to_string()
|
||||
}
|
||||
"welcome page: open" => "Welcome Page Opened".to_string(),
|
||||
"close" => "App Closed".to_string(),
|
||||
"markdown preview: open" => "Markdown Preview Opened".to_string(),
|
||||
"welcome page: open extensions" => "Extensions Page Opened".to_string(),
|
||||
"open node project" | "open pnpm project" | "open yarn project" => {
|
||||
properties["project_type"] = json!("node");
|
||||
properties["is_first_time"] = json!(false);
|
||||
"Project Opened".to_string()
|
||||
}
|
||||
"repl sessions: open" => "REPL Session Started".to_string(),
|
||||
"welcome page: toggle helix" => {
|
||||
properties["enabled"] = json!(false);
|
||||
"Helix Mode Toggled".to_string()
|
||||
}
|
||||
"welcome page: edit settings" => {
|
||||
properties["changed_settings"] = json!([]);
|
||||
"Settings Edited".to_string()
|
||||
}
|
||||
"welcome page: view docs" => "Documentation Viewed".to_string(),
|
||||
"open ssh project" => {
|
||||
properties["is_first_time"] = json!(false);
|
||||
"SSH Project Opened".to_string()
|
||||
}
|
||||
"create ssh server" => "SSH Server Created".to_string(),
|
||||
"create ssh project" => "SSH Project Created".to_string(),
|
||||
"first open for release channel" => {
|
||||
properties["is_first_for_channel"] = json!(true);
|
||||
"App First Opened For Release Channel".to_string()
|
||||
}
|
||||
_ => format!("Unknown App Event: {}", e.operation),
|
||||
};
|
||||
(event_type, properties)
|
||||
}
|
||||
Event::Setting(e) => (
|
||||
"Settings Changed".to_string(),
|
||||
serde_json::to_value(e).unwrap(),
|
||||
),
|
||||
Event::Extension(e) => (
|
||||
"Extension Loaded".to_string(),
|
||||
serde_json::to_value(e).unwrap(),
|
||||
),
|
||||
Event::Edit(e) => (
|
||||
"Editor Edited".to_string(),
|
||||
serde_json::to_value(e).unwrap(),
|
||||
),
|
||||
Event::Action(e) => (
|
||||
"Action Invoked".to_string(),
|
||||
serde_json::to_value(e).unwrap(),
|
||||
),
|
||||
Event::Repl(e) => (
|
||||
"Kernel Status Changed".to_string(),
|
||||
serde_json::to_value(e).unwrap(),
|
||||
),
|
||||
};
|
||||
|
||||
if let serde_json::Value::Object(ref mut map) = event_properties {
|
||||
map.insert("app_version".to_string(), body.app_version.clone().into());
|
||||
map.insert("os_name".to_string(), body.os_name.clone().into());
|
||||
map.insert("os_version".to_string(), body.os_version.clone().into());
|
||||
map.insert("architecture".to_string(), body.architecture.clone().into());
|
||||
map.insert(
|
||||
"release_channel".to_string(),
|
||||
body.release_channel.clone().into(),
|
||||
);
|
||||
map.insert("signed_in".to_string(), event.signed_in.into());
|
||||
}
|
||||
|
||||
let user_properties = Some(serde_json::json!({
|
||||
"is_staff": body.is_staff,
|
||||
}));
|
||||
|
||||
SnowflakeRow {
|
||||
time: timestamp,
|
||||
user_id: body.metrics_id.clone(),
|
||||
device_id: body.system_id.clone(),
|
||||
event_type,
|
||||
event_properties,
|
||||
user_properties,
|
||||
insert_id: Some(Uuid::new_v4().to_string()),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct SnowflakeRow {
|
||||
pub event: String,
|
||||
pub system_id: Option<String>,
|
||||
pub timestamp: chrono::DateTime<chrono::Utc>,
|
||||
pub data: SnowflakeData,
|
||||
pub time: chrono::DateTime<chrono::Utc>,
|
||||
pub user_id: Option<String>,
|
||||
pub device_id: Option<String>,
|
||||
pub event_type: String,
|
||||
pub event_properties: serde_json::Value,
|
||||
pub user_properties: Option<serde_json::Value>,
|
||||
pub insert_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue