From f27dc7dec76190862d9c5e2a78371f25c471db54 Mon Sep 17 00:00:00 2001 From: Marshall Bowers Date: Tue, 5 Aug 2025 18:07:18 -0400 Subject: [PATCH] collab: Remove usage meters sync (#35674) This PR removes the usage meters sync from Collab, as it has been moved to Cloud. Release Notes: - N/A --- crates/collab/src/api/billing.rs | 209 +------------------------------ crates/collab/src/main.rs | 26 +--- 2 files changed, 6 insertions(+), 229 deletions(-) diff --git a/crates/collab/src/api/billing.rs b/crates/collab/src/api/billing.rs index 0e15308ffe..808713ff38 100644 --- a/crates/collab/src/api/billing.rs +++ b/crates/collab/src/api/billing.rs @@ -1,31 +1,23 @@ use anyhow::{Context as _, bail}; use chrono::{DateTime, Utc}; -use cloud_llm_client::LanguageModelProvider; -use collections::{HashMap, HashSet}; use sea_orm::ActiveValue; use std::{sync::Arc, time::Duration}; use stripe::{CancellationDetailsReason, EventObject, EventType, ListEvents, SubscriptionStatus}; -use util::{ResultExt, maybe}; +use util::ResultExt; use crate::AppState; use crate::db::billing_subscription::{ StripeCancellationReason, StripeSubscriptionStatus, SubscriptionKind, }; -use crate::llm::db::subscription_usage_meter::{self, CompletionMode}; +use crate::db::{ + CreateBillingCustomerParams, CreateBillingSubscriptionParams, CreateProcessedStripeEventParams, + UpdateBillingCustomerParams, UpdateBillingSubscriptionParams, billing_customer, +}; use crate::rpc::{ResultExt as _, Server}; use crate::stripe_client::{ StripeCancellationDetailsReason, StripeClient, StripeCustomerId, StripeSubscription, StripeSubscriptionId, }; -use crate::{db::UserId, llm::db::LlmDatabase}; -use crate::{ - db::{ - CreateBillingCustomerParams, CreateBillingSubscriptionParams, - CreateProcessedStripeEventParams, UpdateBillingCustomerParams, - UpdateBillingSubscriptionParams, billing_customer, - }, - stripe_billing::StripeBilling, -}; /// The amount of time we wait in between each poll of Stripe events. /// @@ -542,194 +534,3 @@ pub async fn find_or_create_billing_customer( Ok(Some(billing_customer)) } - -const SYNC_LLM_REQUEST_USAGE_WITH_STRIPE_INTERVAL: Duration = Duration::from_secs(60); - -pub fn sync_llm_request_usage_with_stripe_periodically(app: Arc) { - let Some(stripe_billing) = app.stripe_billing.clone() else { - log::warn!("failed to retrieve Stripe billing object"); - return; - }; - let Some(llm_db) = app.llm_db.clone() else { - log::warn!("failed to retrieve LLM database"); - return; - }; - - let executor = app.executor.clone(); - executor.spawn_detached({ - let executor = executor.clone(); - async move { - loop { - sync_model_request_usage_with_stripe(&app, &llm_db, &stripe_billing) - .await - .context("failed to sync LLM request usage to Stripe") - .trace_err(); - executor - .sleep(SYNC_LLM_REQUEST_USAGE_WITH_STRIPE_INTERVAL) - .await; - } - } - }); -} - -async fn sync_model_request_usage_with_stripe( - app: &Arc, - llm_db: &Arc, - stripe_billing: &Arc, -) -> anyhow::Result<()> { - let feature_flags = app.db.list_feature_flags().await?; - let sync_model_request_usage_using_cloud = feature_flags - .iter() - .any(|flag| flag.flag == "cloud-stripe-usage-meters-sync" && flag.enabled_for_all); - if sync_model_request_usage_using_cloud { - return Ok(()); - } - - log::info!("Stripe usage sync: Starting"); - let started_at = Utc::now(); - - let staff_users = app.db.get_staff_users().await?; - let staff_user_ids = staff_users - .iter() - .map(|user| user.id) - .collect::>(); - - let usage_meters = llm_db - .get_current_subscription_usage_meters(Utc::now()) - .await?; - let mut usage_meters_by_user_id = - HashMap::>::default(); - for (usage_meter, usage) in usage_meters { - let meters = usage_meters_by_user_id.entry(usage.user_id).or_default(); - meters.push(usage_meter); - } - - log::info!("Stripe usage sync: Retrieving Zed Pro subscriptions"); - let get_zed_pro_subscriptions_started_at = Utc::now(); - let billing_subscriptions = app.db.get_active_zed_pro_billing_subscriptions().await?; - log::info!( - "Stripe usage sync: Retrieved {} Zed Pro subscriptions in {}", - billing_subscriptions.len(), - Utc::now() - get_zed_pro_subscriptions_started_at - ); - - let claude_sonnet_4 = stripe_billing - .find_price_by_lookup_key("claude-sonnet-4-requests") - .await?; - let claude_sonnet_4_max = stripe_billing - .find_price_by_lookup_key("claude-sonnet-4-requests-max") - .await?; - let claude_opus_4 = stripe_billing - .find_price_by_lookup_key("claude-opus-4-requests") - .await?; - let claude_opus_4_max = stripe_billing - .find_price_by_lookup_key("claude-opus-4-requests-max") - .await?; - let claude_3_5_sonnet = stripe_billing - .find_price_by_lookup_key("claude-3-5-sonnet-requests") - .await?; - let claude_3_7_sonnet = stripe_billing - .find_price_by_lookup_key("claude-3-7-sonnet-requests") - .await?; - let claude_3_7_sonnet_max = stripe_billing - .find_price_by_lookup_key("claude-3-7-sonnet-requests-max") - .await?; - - let model_mode_combinations = [ - ("claude-opus-4", CompletionMode::Max), - ("claude-opus-4", CompletionMode::Normal), - ("claude-sonnet-4", CompletionMode::Max), - ("claude-sonnet-4", CompletionMode::Normal), - ("claude-3-7-sonnet", CompletionMode::Max), - ("claude-3-7-sonnet", CompletionMode::Normal), - ("claude-3-5-sonnet", CompletionMode::Normal), - ]; - - let billing_subscription_count = billing_subscriptions.len(); - - log::info!("Stripe usage sync: Syncing {billing_subscription_count} Zed Pro subscriptions"); - - for (user_id, (billing_customer, billing_subscription)) in billing_subscriptions { - maybe!(async { - if staff_user_ids.contains(&user_id) { - return anyhow::Ok(()); - } - - let stripe_customer_id = - StripeCustomerId(billing_customer.stripe_customer_id.clone().into()); - let stripe_subscription_id = - StripeSubscriptionId(billing_subscription.stripe_subscription_id.clone().into()); - - let usage_meters = usage_meters_by_user_id.get(&user_id); - - for (model, mode) in &model_mode_combinations { - let Ok(model) = - llm_db.model(LanguageModelProvider::Anthropic, model) - else { - log::warn!("Failed to load model for user {user_id}: {model}"); - continue; - }; - - let (price, meter_event_name) = match model.name.as_str() { - "claude-opus-4" => match mode { - CompletionMode::Normal => (&claude_opus_4, "claude_opus_4/requests"), - CompletionMode::Max => (&claude_opus_4_max, "claude_opus_4/requests/max"), - }, - "claude-sonnet-4" => match mode { - CompletionMode::Normal => (&claude_sonnet_4, "claude_sonnet_4/requests"), - CompletionMode::Max => { - (&claude_sonnet_4_max, "claude_sonnet_4/requests/max") - } - }, - "claude-3-5-sonnet" => (&claude_3_5_sonnet, "claude_3_5_sonnet/requests"), - "claude-3-7-sonnet" => match mode { - CompletionMode::Normal => { - (&claude_3_7_sonnet, "claude_3_7_sonnet/requests") - } - CompletionMode::Max => { - (&claude_3_7_sonnet_max, "claude_3_7_sonnet/requests/max") - } - }, - model_name => { - bail!("Attempted to sync usage meter for unsupported model: {model_name:?}") - } - }; - - let model_requests = usage_meters - .and_then(|usage_meters| { - usage_meters - .iter() - .find(|meter| meter.model_id == model.id && meter.mode == *mode) - }) - .map(|usage_meter| usage_meter.requests) - .unwrap_or(0); - - if model_requests > 0 { - stripe_billing - .subscribe_to_price(&stripe_subscription_id, price) - .await?; - } - - stripe_billing - .bill_model_request_usage(&stripe_customer_id, meter_event_name, model_requests) - .await - .with_context(|| { - format!( - "Failed to bill model request usage of {model_requests} for {stripe_customer_id}: {meter_event_name}", - ) - })?; - } - - Ok(()) - }) - .await - .log_err(); - } - - log::info!( - "Stripe usage sync: Synced {billing_subscription_count} Zed Pro subscriptions in {}", - Utc::now() - started_at - ); - - Ok(()) -} diff --git a/crates/collab/src/main.rs b/crates/collab/src/main.rs index 6a78049b3f..0442a69042 100644 --- a/crates/collab/src/main.rs +++ b/crates/collab/src/main.rs @@ -8,7 +8,6 @@ use axum::{ }; use collab::api::CloudflareIpCountryHeader; -use collab::api::billing::sync_llm_request_usage_with_stripe_periodically; use collab::llm::db::LlmDatabase; use collab::migrations::run_database_migrations; use collab::user_backfiller::spawn_user_backfiller; @@ -31,7 +30,7 @@ use tower_http::trace::TraceLayer; use tracing_subscriber::{ Layer, filter::EnvFilter, fmt::format::JsonFields, util::SubscriberInitExt, }; -use util::{ResultExt as _, maybe}; +use util::ResultExt as _; const VERSION: &str = env!("CARGO_PKG_VERSION"); const REVISION: Option<&'static str> = option_env!("GITHUB_SHA"); @@ -133,29 +132,6 @@ async fn main() -> Result<()> { fetch_extensions_from_blob_store_periodically(state.clone()); spawn_user_backfiller(state.clone()); - let llm_db = maybe!(async { - let database_url = state - .config - .llm_database_url - .as_ref() - .context("missing LLM_DATABASE_URL")?; - let max_connections = state - .config - .llm_database_max_connections - .context("missing LLM_DATABASE_MAX_CONNECTIONS")?; - - let mut db_options = db::ConnectOptions::new(database_url); - db_options.max_connections(max_connections); - LlmDatabase::new(db_options, state.executor.clone()).await - }) - .await - .trace_err(); - - if let Some(mut llm_db) = llm_db { - llm_db.initialize().await?; - sync_llm_request_usage_with_stripe_periodically(state.clone()); - } - app = app .merge(collab::api::events::router()) .merge(collab::api::extensions::router())