From 35280f7d80ca126ee366e3ed27dd2fd7ccaab317 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Thu, 16 Mar 2023 12:58:09 -0700 Subject: [PATCH] Introduce a delay before retrying a transaction after a serialization failure Co-authored-by: Antonio Scandurra --- crates/collab/src/bin/seed.rs | 4 +- crates/collab/src/db.rs | 131 ++++++++++++++++++---------------- crates/collab/src/lib.rs | 3 +- crates/collab/src/main.rs | 2 +- 4 files changed, 74 insertions(+), 66 deletions(-) diff --git a/crates/collab/src/bin/seed.rs b/crates/collab/src/bin/seed.rs index 6153c459b0..9384e826c0 100644 --- a/crates/collab/src/bin/seed.rs +++ b/crates/collab/src/bin/seed.rs @@ -1,4 +1,4 @@ -use collab::db; +use collab::{db, executor::Executor}; use db::{ConnectOptions, Database}; use serde::{de::DeserializeOwned, Deserialize}; use std::fmt::Write; @@ -13,7 +13,7 @@ struct GitHubUser { #[tokio::main] async fn main() { let database_url = std::env::var("DATABASE_URL").expect("missing DATABASE_URL env var"); - let db = Database::new(ConnectOptions::new(database_url)) + let db = Database::new(ConnectOptions::new(database_url), Executor::Production) .await .expect("failed to connect to postgres database"); let github_token = std::env::var("GITHUB_TOKEN").expect("missing GITHUB_TOKEN env var"); diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 3a711cbe29..9abbf96219 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -15,6 +15,7 @@ mod worktree; mod worktree_diagnostic_summary; mod worktree_entry; +use crate::executor::Executor; use crate::{Error, Result}; use anyhow::anyhow; use collections::{BTreeMap, HashMap, HashSet}; @@ -22,6 +23,8 @@ pub use contact::Contact; use dashmap::DashMap; use futures::StreamExt; use hyper::StatusCode; +use rand::prelude::StdRng; +use rand::{Rng, SeedableRng}; use rpc::{proto, ConnectionId}; use sea_orm::Condition; pub use sea_orm::ConnectOptions; @@ -46,20 +49,20 @@ pub struct Database { options: ConnectOptions, pool: DatabaseConnection, rooms: DashMap>>, - #[cfg(test)] - background: Option>, + rng: Mutex, + executor: Executor, #[cfg(test)] runtime: Option, } impl Database { - pub async fn new(options: ConnectOptions) -> Result { + pub async fn new(options: ConnectOptions, executor: Executor) -> Result { Ok(Self { options: options.clone(), pool: sea_orm::Database::connect(options).await?, rooms: DashMap::with_capacity(16384), - #[cfg(test)] - background: None, + rng: Mutex::new(StdRng::seed_from_u64(0)), + executor, #[cfg(test)] runtime: None, }) @@ -2805,30 +2808,26 @@ impl Database { Fut: Send + Future>, { let body = async { + let mut i = 0; loop { let (tx, result) = self.with_transaction(&f).await?; match result { - Ok(result) => { - match tx.commit().await.map_err(Into::into) { - Ok(()) => return Ok(result), - Err(error) => { - if is_serialization_error(&error) { - // Retry (don't break the loop) - } else { - return Err(error); - } + Ok(result) => match tx.commit().await.map_err(Into::into) { + Ok(()) => return Ok(result), + Err(error) => { + if !self.retry_on_serialization_error(&error, i).await { + return Err(error); } } - } + }, Err(error) => { tx.rollback().await?; - if is_serialization_error(&error) { - // Retry (don't break the loop) - } else { + if !self.retry_on_serialization_error(&error, i).await { return Err(error); } } } + i += 1; } }; @@ -2841,6 +2840,7 @@ impl Database { Fut: Send + Future>>, { let body = async { + let mut i = 0; loop { let (tx, result) = self.with_transaction(&f).await?; match result { @@ -2856,35 +2856,28 @@ impl Database { })); } Err(error) => { - if is_serialization_error(&error) { - // Retry (don't break the loop) - } else { + if !self.retry_on_serialization_error(&error, i).await { return Err(error); } } } } - Ok(None) => { - match tx.commit().await.map_err(Into::into) { - Ok(()) => return Ok(None), - Err(error) => { - if is_serialization_error(&error) { - // Retry (don't break the loop) - } else { - return Err(error); - } + Ok(None) => match tx.commit().await.map_err(Into::into) { + Ok(()) => return Ok(None), + Err(error) => { + if !self.retry_on_serialization_error(&error, i).await { + return Err(error); } } - } + }, Err(error) => { tx.rollback().await?; - if is_serialization_error(&error) { - // Retry (don't break the loop) - } else { + if !self.retry_on_serialization_error(&error, i).await { return Err(error); } } } + i += 1; } }; @@ -2897,38 +2890,34 @@ impl Database { Fut: Send + Future>, { let body = async { + let mut i = 0; loop { let lock = self.rooms.entry(room_id).or_default().clone(); let _guard = lock.lock_owned().await; let (tx, result) = self.with_transaction(&f).await?; match result { - Ok(data) => { - match tx.commit().await.map_err(Into::into) { - Ok(()) => { - return Ok(RoomGuard { - data, - _guard, - _not_send: PhantomData, - }); - } - Err(error) => { - if is_serialization_error(&error) { - // Retry (don't break the loop) - } else { - return Err(error); - } + Ok(data) => match tx.commit().await.map_err(Into::into) { + Ok(()) => { + return Ok(RoomGuard { + data, + _guard, + _not_send: PhantomData, + }); + } + Err(error) => { + if !self.retry_on_serialization_error(&error, i).await { + return Err(error); } } - } + }, Err(error) => { tx.rollback().await?; - if is_serialization_error(&error) { - // Retry (don't break the loop) - } else { + if !self.retry_on_serialization_error(&error, i).await { return Err(error); } } } + i += 1; } }; @@ -2954,14 +2943,14 @@ impl Database { Ok((tx, result)) } - async fn run(&self, future: F) -> T + async fn run(&self, future: F) -> Result where - F: Future, + F: Future>, { #[cfg(test)] { - if let Some(background) = self.background.as_ref() { - background.simulate_random_delay().await; + if let Executor::Deterministic(executor) = &self.executor { + executor.simulate_random_delay().await; } self.runtime.as_ref().unwrap().block_on(future) @@ -2972,6 +2961,23 @@ impl Database { future.await } } + + async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: u32) -> bool { + // If the error is due to a failure to serialize concurrent transactions, then retry + // this transaction after a delay. With each subsequent retry, double the delay duration. + // Also vary the delay randomly in order to ensure different database connections retry + // at different times. + if is_serialization_error(error) { + let base_delay = 4_u64 << prev_attempt_count.min(16); + let randomized_delay = base_delay as f32 * self.rng.lock().await.gen_range(0.5..=2.0); + self.executor + .sleep(Duration::from_millis(randomized_delay as u64)) + .await; + true + } else { + false + } + } } fn is_serialization_error(error: &Error) -> bool { @@ -3273,7 +3279,6 @@ mod test { use gpui::executor::Background; use lazy_static::lazy_static; use parking_lot::Mutex; - use rand::prelude::*; use sea_orm::ConnectionTrait; use sqlx::migrate::MigrateDatabase; use std::sync::Arc; @@ -3295,7 +3300,9 @@ mod test { let mut db = runtime.block_on(async { let mut options = ConnectOptions::new(url); options.max_connections(5); - let db = Database::new(options).await.unwrap(); + let db = Database::new(options, Executor::Deterministic(background)) + .await + .unwrap(); let sql = include_str!(concat!( env!("CARGO_MANIFEST_DIR"), "/migrations.sqlite/20221109000000_test_schema.sql" @@ -3310,7 +3317,6 @@ mod test { db }); - db.background = Some(background); db.runtime = Some(runtime); Self { @@ -3344,13 +3350,14 @@ mod test { options .max_connections(5) .idle_timeout(Duration::from_secs(0)); - let db = Database::new(options).await.unwrap(); + let db = Database::new(options, Executor::Deterministic(background)) + .await + .unwrap(); let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations"); db.migrate(Path::new(migrations_path), false).await.unwrap(); db }); - db.background = Some(background); db.runtime = Some(runtime); Self { diff --git a/crates/collab/src/lib.rs b/crates/collab/src/lib.rs index 8c99a5ea0f..13fb8ed0eb 100644 --- a/crates/collab/src/lib.rs +++ b/crates/collab/src/lib.rs @@ -10,6 +10,7 @@ mod tests; use axum::{http::StatusCode, response::IntoResponse}; use db::Database; +use executor::Executor; use serde::Deserialize; use std::{path::PathBuf, sync::Arc}; @@ -118,7 +119,7 @@ impl AppState { pub async fn new(config: Config) -> Result> { let mut db_options = db::ConnectOptions::new(config.database_url.clone()); db_options.max_connections(config.database_max_connections); - let db = Database::new(db_options).await?; + let db = Database::new(db_options, Executor::Production).await?; let live_kit_client = if let Some(((server, key), secret)) = config .live_kit_server .as_ref() diff --git a/crates/collab/src/main.rs b/crates/collab/src/main.rs index 30ed35bc35..6fbb451fee 100644 --- a/crates/collab/src/main.rs +++ b/crates/collab/src/main.rs @@ -32,7 +32,7 @@ async fn main() -> Result<()> { let config = envy::from_env::().expect("error loading config"); let mut db_options = db::ConnectOptions::new(config.database_url.clone()); db_options.max_connections(5); - let db = Database::new(db_options).await?; + let db = Database::new(db_options, Executor::Production).await?; let migrations_path = config .migrations_path