collab: Backfill github_user_created_at
on users (#16600)
This PR adds a backfiller to backfill the `github_user_created_at` column on users. Release Notes: - N/A
This commit is contained in:
parent
28568429aa
commit
8a5fcc2c22
6 changed files with 153 additions and 0 deletions
|
@ -216,6 +216,11 @@ spec:
|
||||||
secretKeyRef:
|
secretKeyRef:
|
||||||
name: supermaven
|
name: supermaven
|
||||||
key: api_key
|
key: api_key
|
||||||
|
- name: USER_BACKFILLER_GITHUB_ACCESS_TOKEN
|
||||||
|
valueFrom:
|
||||||
|
secretKeyRef:
|
||||||
|
name: user-backfiller
|
||||||
|
key: github_access_token
|
||||||
- name: INVITE_LINK_PREFIX
|
- name: INVITE_LINK_PREFIX
|
||||||
value: ${INVITE_LINK_PREFIX}
|
value: ${INVITE_LINK_PREFIX}
|
||||||
- name: RUST_BACKTRACE
|
- name: RUST_BACKTRACE
|
||||||
|
|
|
@ -377,4 +377,14 @@ impl Database {
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn get_users_missing_github_user_created_at(&self) -> Result<Vec<user::Model>> {
|
||||||
|
self.transaction(|tx| async move {
|
||||||
|
Ok(user::Entity::find()
|
||||||
|
.filter(user::Column::GithubUserCreatedAt.is_null())
|
||||||
|
.all(&*tx)
|
||||||
|
.await?)
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ pub mod migrations;
|
||||||
mod rate_limiter;
|
mod rate_limiter;
|
||||||
pub mod rpc;
|
pub mod rpc;
|
||||||
pub mod seed;
|
pub mod seed;
|
||||||
|
pub mod user_backfiller;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
|
@ -177,6 +178,7 @@ pub struct Config {
|
||||||
pub stripe_api_key: Option<String>,
|
pub stripe_api_key: Option<String>,
|
||||||
pub stripe_price_id: Option<Arc<str>>,
|
pub stripe_price_id: Option<Arc<str>>,
|
||||||
pub supermaven_admin_api_key: Option<Arc<str>>,
|
pub supermaven_admin_api_key: Option<Arc<str>>,
|
||||||
|
pub user_backfiller_github_access_token: Option<Arc<str>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
|
@ -235,6 +237,7 @@ impl Config {
|
||||||
supermaven_admin_api_key: None,
|
supermaven_admin_api_key: None,
|
||||||
qwen2_7b_api_key: None,
|
qwen2_7b_api_key: None,
|
||||||
qwen2_7b_api_url: None,
|
qwen2_7b_api_url: None,
|
||||||
|
user_backfiller_github_access_token: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ use axum::{
|
||||||
};
|
};
|
||||||
use collab::llm::{db::LlmDatabase, log_usage_periodically};
|
use collab::llm::{db::LlmDatabase, log_usage_periodically};
|
||||||
use collab::migrations::run_database_migrations;
|
use collab::migrations::run_database_migrations;
|
||||||
|
use collab::user_backfiller::spawn_user_backfiller;
|
||||||
use collab::{api::billing::poll_stripe_events_periodically, llm::LlmState, ServiceMode};
|
use collab::{api::billing::poll_stripe_events_periodically, llm::LlmState, ServiceMode};
|
||||||
use collab::{
|
use collab::{
|
||||||
api::fetch_extensions_from_blob_store_periodically, db, env, executor::Executor,
|
api::fetch_extensions_from_blob_store_periodically, db, env, executor::Executor,
|
||||||
|
@ -131,6 +132,7 @@ async fn main() -> Result<()> {
|
||||||
if mode.is_api() {
|
if mode.is_api() {
|
||||||
poll_stripe_events_periodically(state.clone());
|
poll_stripe_events_periodically(state.clone());
|
||||||
fetch_extensions_from_blob_store_periodically(state.clone());
|
fetch_extensions_from_blob_store_periodically(state.clone());
|
||||||
|
spawn_user_backfiller(state.clone());
|
||||||
|
|
||||||
app = app
|
app = app
|
||||||
.merge(collab::api::events::router())
|
.merge(collab::api::events::router())
|
||||||
|
|
|
@ -682,6 +682,7 @@ impl TestServer {
|
||||||
supermaven_admin_api_key: None,
|
supermaven_admin_api_key: None,
|
||||||
qwen2_7b_api_key: None,
|
qwen2_7b_api_key: None,
|
||||||
qwen2_7b_api_url: None,
|
qwen2_7b_api_url: None,
|
||||||
|
user_backfiller_github_access_token: None,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
132
crates/collab/src/user_backfiller.rs
Normal file
132
crates/collab/src/user_backfiller.rs
Normal file
|
@ -0,0 +1,132 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use anyhow::{anyhow, Context, Result};
|
||||||
|
use util::ResultExt;
|
||||||
|
|
||||||
|
use crate::db::Database;
|
||||||
|
use crate::executor::Executor;
|
||||||
|
use crate::{AppState, Config};
|
||||||
|
|
||||||
|
pub fn spawn_user_backfiller(app_state: Arc<AppState>) {
|
||||||
|
let Some(user_backfiller_github_access_token) =
|
||||||
|
app_state.config.user_backfiller_github_access_token.clone()
|
||||||
|
else {
|
||||||
|
log::info!("no USER_BACKFILLER_GITHUB_ACCESS_TOKEN set; not spawning user backfiller");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let executor = app_state.executor.clone();
|
||||||
|
executor.spawn_detached({
|
||||||
|
let executor = executor.clone();
|
||||||
|
async move {
|
||||||
|
let user_backfiller = UserBackfiller::new(
|
||||||
|
app_state.config.clone(),
|
||||||
|
user_backfiller_github_access_token,
|
||||||
|
app_state.db.clone(),
|
||||||
|
executor,
|
||||||
|
);
|
||||||
|
|
||||||
|
log::info!("backfilling users");
|
||||||
|
|
||||||
|
user_backfiller
|
||||||
|
.backfill_github_user_created_at()
|
||||||
|
.await
|
||||||
|
.log_err();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
struct UserBackfiller {
|
||||||
|
config: Config,
|
||||||
|
github_access_token: Arc<str>,
|
||||||
|
db: Arc<Database>,
|
||||||
|
http_client: reqwest::Client,
|
||||||
|
executor: Executor,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UserBackfiller {
|
||||||
|
fn new(
|
||||||
|
config: Config,
|
||||||
|
github_access_token: Arc<str>,
|
||||||
|
db: Arc<Database>,
|
||||||
|
executor: Executor,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
config,
|
||||||
|
github_access_token,
|
||||||
|
db,
|
||||||
|
http_client: reqwest::Client::new(),
|
||||||
|
executor,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn backfill_github_user_created_at(&self) -> Result<()> {
|
||||||
|
let initial_channel_id = self.config.auto_join_channel_id;
|
||||||
|
|
||||||
|
let users_missing_github_user_created_at =
|
||||||
|
self.db.get_users_missing_github_user_created_at().await?;
|
||||||
|
|
||||||
|
for user in users_missing_github_user_created_at {
|
||||||
|
match self
|
||||||
|
.fetch_github_user(&format!(
|
||||||
|
"https://api.github.com/users/{}",
|
||||||
|
user.github_login
|
||||||
|
))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(github_user) => {
|
||||||
|
self.db
|
||||||
|
.get_or_create_user_by_github_account(
|
||||||
|
&user.github_login,
|
||||||
|
Some(github_user.id),
|
||||||
|
user.email_address.as_deref(),
|
||||||
|
Some(github_user.created_at),
|
||||||
|
initial_channel_id,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
log::info!("backfilled user: {}", user.github_login);
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
log::error!("failed to fetch GitHub user {}: {err}", user.github_login);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.executor
|
||||||
|
.sleep(std::time::Duration::from_millis(200))
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fetch_github_user(&self, url: &str) -> Result<GithubUser> {
|
||||||
|
let response = self
|
||||||
|
.http_client
|
||||||
|
.get(url)
|
||||||
|
.header(
|
||||||
|
"authorization",
|
||||||
|
format!("Bearer {}", self.github_access_token),
|
||||||
|
)
|
||||||
|
.header("user-agent", "zed")
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("failed to fetch '{url}'"))?;
|
||||||
|
|
||||||
|
let response = match response.error_for_status() {
|
||||||
|
Ok(response) => response,
|
||||||
|
Err(err) => return Err(anyhow!("failed to fetch GitHub user: {err}")),
|
||||||
|
};
|
||||||
|
|
||||||
|
response
|
||||||
|
.json()
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("failed to deserialize GitHub user from '{url}'"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(serde::Deserialize)]
|
||||||
|
struct GithubUser {
|
||||||
|
id: i32,
|
||||||
|
created_at: chrono::DateTime<chrono::Utc>,
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue