From 37915ec4f2f5d03eedc5accd979c37f4c3da0121 Mon Sep 17 00:00:00 2001 From: KCaverly Date: Fri, 8 Sep 2023 16:53:16 -0400 Subject: [PATCH] updated notify to accomodate for updated countdown --- crates/search/src/project_search.rs | 2 +- crates/semantic_index/src/embedding.rs | 42 ++++++++++----------- crates/semantic_index/src/semantic_index.rs | 17 +++++++-- 3 files changed, 35 insertions(+), 26 deletions(-) diff --git a/crates/search/src/project_search.rs b/crates/search/src/project_search.rs index 6b5ebd56d4..5a1d5992a6 100644 --- a/crates/search/src/project_search.rs +++ b/crates/search/src/project_search.rs @@ -332,7 +332,7 @@ impl View for ProjectSearchView { rate_limit_expiration_time.duration_since(SystemTime::now()) { Some(format!( - "Remaining files to index(rate limit resets in {}s): {}", + "Remaining files to index (rate limit resets in {}s): {}", remaining_seconds.as_secs(), remaining_files )) diff --git a/crates/semantic_index/src/embedding.rs b/crates/semantic_index/src/embedding.rs index 148b354794..7bac809c97 100644 --- a/crates/semantic_index/src/embedding.rs +++ b/crates/semantic_index/src/embedding.rs @@ -85,8 +85,8 @@ impl ToSql for Embedding { pub struct OpenAIEmbeddings { pub client: Arc, pub executor: Arc, - rate_limit_count_rx: watch::Receiver<(Option, usize)>, - rate_limit_count_tx: Arc, usize)>>>, + rate_limit_count_rx: watch::Receiver>, + rate_limit_count_tx: Arc>>>, } #[derive(Serialize)] @@ -159,7 +159,7 @@ const OPENAI_INPUT_LIMIT: usize = 8190; impl OpenAIEmbeddings { pub fn new(client: Arc, executor: Arc) -> Self { - let (rate_limit_count_tx, rate_limit_count_rx) = watch::channel_with((None, 0)); + let (rate_limit_count_tx, rate_limit_count_rx) = watch::channel_with(None); let rate_limit_count_tx = Arc::new(Mutex::new(rate_limit_count_tx)); OpenAIEmbeddings { @@ -171,18 +171,22 @@ impl OpenAIEmbeddings { } fn resolve_rate_limit(&self) { - let (reset_time, delay_count) = *self.rate_limit_count_tx.lock().borrow(); - let updated_count = delay_count - 1; - let updated_time = if updated_count == 0 { None } else { reset_time }; + let reset_time = *self.rate_limit_count_tx.lock().borrow(); - log::trace!("resolving rate limit: Count: {:?}", updated_count); + if let Some(reset_time) = reset_time { + if SystemTime::now() >= reset_time { + *self.rate_limit_count_tx.lock().borrow_mut() = None + } + } - *self.rate_limit_count_tx.lock().borrow_mut() = (updated_time, updated_count); + log::trace!( + "resolving reset time: {:?}", + *self.rate_limit_count_tx.lock().borrow() + ); } - fn update_rate_limit(&self, reset_time: SystemTime, count_increase: usize) { - let (original_time, original_count) = *self.rate_limit_count_tx.lock().borrow(); - let updated_count = original_count + count_increase; + fn update_reset_time(&self, reset_time: SystemTime) { + let original_time = *self.rate_limit_count_tx.lock().borrow(); let updated_time = if let Some(original_time) = original_time { if reset_time < original_time { @@ -194,9 +198,9 @@ impl OpenAIEmbeddings { Some(reset_time) }; - log::trace!("updating rate limit: Count: {:?}", updated_count); + log::trace!("updating rate limit time: {:?}", updated_time); - *self.rate_limit_count_tx.lock().borrow_mut() = (updated_time, updated_count); + *self.rate_limit_count_tx.lock().borrow_mut() = updated_time; } async fn send_request( &self, @@ -229,8 +233,7 @@ impl EmbeddingProvider for OpenAIEmbeddings { } fn rate_limit_expiration(&self) -> Option { - let (expiration_time, _) = *self.rate_limit_count_rx.borrow(); - expiration_time + *self.rate_limit_count_rx.borrow() } fn truncate(&self, span: &str) -> (String, usize) { let mut tokens = OPENAI_BPE_TOKENIZER.encode_with_special_tokens(span); @@ -296,6 +299,7 @@ impl EmbeddingProvider for OpenAIEmbeddings { .collect()); } StatusCode::TOO_MANY_REQUESTS => { + rate_limiting = true; let mut body = String::new(); response.body_mut().read_to_string(&mut body).await?; @@ -316,13 +320,7 @@ impl EmbeddingProvider for OpenAIEmbeddings { // If we've previously rate limited, increment the duration but not the count let reset_time = SystemTime::now().add(delay_duration); - if rate_limiting { - self.update_rate_limit(reset_time, 0); - } else { - self.update_rate_limit(reset_time, 1); - } - - rate_limiting = true; + self.update_reset_time(reset_time); log::trace!( "openai rate limiting: waiting {:?} until lifted", diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index b60d697b43..92b11f00d1 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -232,9 +232,20 @@ impl ProjectState { _observe_pending_file_count: cx.spawn_weak({ let mut pending_file_count_rx = pending_file_count_rx.clone(); |this, mut cx| async move { - while let Some(_) = pending_file_count_rx.next().await { - if let Some(this) = this.upgrade(&cx) { - this.update(&mut cx, |_, cx| cx.notify()); + loop { + let mut timer = cx.background().timer(Duration::from_millis(350)).fuse(); + let mut pending_file_count = pending_file_count_rx.next().fuse(); + futures::select_biased! { + _ = pending_file_count => { + if let Some(this) = this.upgrade(&cx) { + this.update(&mut cx, |_, cx| cx.notify()); + } + }, + _ = timer => { + if let Some(this) = this.upgrade(&cx) { + this.update(&mut cx, |_, cx| cx.notify()); + } + } } } }