updated semantic_index reset status to leverage target reset system time as opposed to duration

This commit is contained in:
KCaverly 2023-09-08 15:04:50 -04:00
parent a5ee8fc805
commit bf43f93197
4 changed files with 50 additions and 55 deletions

View file

@ -34,7 +34,7 @@ use std::{
ops::{Not, Range}, ops::{Not, Range},
path::PathBuf, path::PathBuf,
sync::Arc, sync::Arc,
time::Duration, time::SystemTime,
}; };
use util::ResultExt as _; use util::ResultExt as _;
use workspace::{ use workspace::{
@ -322,17 +322,23 @@ impl View for ProjectSearchView {
SemanticIndexStatus::Indexed => Some("Indexing complete".to_string()), SemanticIndexStatus::Indexed => Some("Indexing complete".to_string()),
SemanticIndexStatus::Indexing { SemanticIndexStatus::Indexing {
remaining_files, remaining_files,
rate_limiting, rate_limit_expiration_time,
} => { } => {
if remaining_files == 0 { if remaining_files == 0 {
Some(format!("Indexing...")) Some(format!("Indexing..."))
} else { } else {
if rate_limiting > Duration::ZERO { if let Some(rate_limit_expiration_time) = rate_limit_expiration_time {
Some(format!( if let Ok(remaining_seconds) =
"Remaining files to index (rate limit resets in {}s): {}", rate_limit_expiration_time.duration_since(SystemTime::now())
rate_limiting.as_secs(), {
remaining_files Some(format!(
)) "Remaining files to index(rate limit resets in {}s): {}",
remaining_seconds.as_secs(),
remaining_files
))
} else {
Some(format!("Remaining files to index: {}", remaining_files))
}
} else { } else {
Some(format!("Remaining files to index: {}", remaining_files)) Some(format!("Remaining files to index: {}", remaining_files))
} }

View file

@ -14,8 +14,9 @@ use rusqlite::types::{FromSql, FromSqlResult, ToSqlOutput, ValueRef};
use rusqlite::ToSql; use rusqlite::ToSql;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::env; use std::env;
use std::ops::Add;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::{Duration, SystemTime};
use tiktoken_rs::{cl100k_base, CoreBPE}; use tiktoken_rs::{cl100k_base, CoreBPE};
use util::http::{HttpClient, Request}; use util::http::{HttpClient, Request};
@ -84,8 +85,8 @@ impl ToSql for Embedding {
pub struct OpenAIEmbeddings { pub struct OpenAIEmbeddings {
pub client: Arc<dyn HttpClient>, pub client: Arc<dyn HttpClient>,
pub executor: Arc<Background>, pub executor: Arc<Background>,
rate_limit_count_rx: watch::Receiver<(Duration, usize)>, rate_limit_count_rx: watch::Receiver<(Option<SystemTime>, usize)>,
rate_limit_count_tx: Arc<Mutex<watch::Sender<(Duration, usize)>>>, rate_limit_count_tx: Arc<Mutex<watch::Sender<(Option<SystemTime>, usize)>>>,
} }
#[derive(Serialize)] #[derive(Serialize)]
@ -118,15 +119,15 @@ pub trait EmbeddingProvider: Sync + Send {
async fn embed_batch(&self, spans: Vec<String>) -> Result<Vec<Embedding>>; async fn embed_batch(&self, spans: Vec<String>) -> Result<Vec<Embedding>>;
fn max_tokens_per_batch(&self) -> usize; fn max_tokens_per_batch(&self) -> usize;
fn truncate(&self, span: &str) -> (String, usize); fn truncate(&self, span: &str) -> (String, usize);
fn rate_limit_expiration(&self) -> Duration; fn rate_limit_expiration(&self) -> Option<SystemTime>;
} }
pub struct DummyEmbeddings {} pub struct DummyEmbeddings {}
#[async_trait] #[async_trait]
impl EmbeddingProvider for DummyEmbeddings { impl EmbeddingProvider for DummyEmbeddings {
fn rate_limit_expiration(&self) -> Duration { fn rate_limit_expiration(&self) -> Option<SystemTime> {
Duration::ZERO None
} }
async fn embed_batch(&self, spans: Vec<String>) -> Result<Vec<Embedding>> { async fn embed_batch(&self, spans: Vec<String>) -> Result<Vec<Embedding>> {
// 1024 is the OpenAI Embeddings size for ada models. // 1024 is the OpenAI Embeddings size for ada models.
@ -158,7 +159,7 @@ const OPENAI_INPUT_LIMIT: usize = 8190;
impl OpenAIEmbeddings { impl OpenAIEmbeddings {
pub fn new(client: Arc<dyn HttpClient>, executor: Arc<Background>) -> Self { pub fn new(client: Arc<dyn HttpClient>, executor: Arc<Background>) -> Self {
let (rate_limit_count_tx, rate_limit_count_rx) = watch::channel_with((Duration::ZERO, 0)); let (rate_limit_count_tx, rate_limit_count_rx) = watch::channel_with((None, 0));
let rate_limit_count_tx = Arc::new(Mutex::new(rate_limit_count_tx)); let rate_limit_count_tx = Arc::new(Mutex::new(rate_limit_count_tx));
OpenAIEmbeddings { OpenAIEmbeddings {
@ -170,39 +171,32 @@ impl OpenAIEmbeddings {
} }
fn resolve_rate_limit(&self) { fn resolve_rate_limit(&self) {
let (current_delay, delay_count) = *self.rate_limit_count_tx.lock().borrow(); let (reset_time, delay_count) = *self.rate_limit_count_tx.lock().borrow();
let updated_count = delay_count - 1; let updated_count = delay_count - 1;
let updated_duration = if updated_count == 0 { let updated_time = if updated_count == 0 { None } else { reset_time };
Duration::ZERO
} else {
current_delay
};
log::trace!( log::trace!("resolving rate limit: Count: {:?}", updated_count);
"resolving rate limit: Count: {:?} Duration: {:?}",
updated_count,
updated_duration
);
*self.rate_limit_count_tx.lock().borrow_mut() = (updated_duration, updated_count); *self.rate_limit_count_tx.lock().borrow_mut() = (updated_time, updated_count);
} }
fn update_rate_limit(&self, delay_duration: Duration, count_increase: usize) { fn update_rate_limit(&self, reset_time: SystemTime, count_increase: usize) {
let (current_delay, delay_count) = *self.rate_limit_count_tx.lock().borrow(); let (original_time, original_count) = *self.rate_limit_count_tx.lock().borrow();
let updated_count = delay_count + count_increase; let updated_count = original_count + count_increase;
let updated_duration = if current_delay < delay_duration {
delay_duration let updated_time = if let Some(original_time) = original_time {
if reset_time < original_time {
Some(reset_time)
} else {
Some(original_time)
}
} else { } else {
current_delay Some(reset_time)
}; };
log::trace!( log::trace!("updating rate limit: Count: {:?}", updated_count);
"updating rate limit: Count: {:?} Duration: {:?}",
updated_count,
updated_duration
);
*self.rate_limit_count_tx.lock().borrow_mut() = (updated_duration, updated_count); *self.rate_limit_count_tx.lock().borrow_mut() = (updated_time, updated_count);
} }
async fn send_request( async fn send_request(
&self, &self,
@ -234,9 +228,9 @@ impl EmbeddingProvider for OpenAIEmbeddings {
50000 50000
} }
fn rate_limit_expiration(&self) -> Duration { fn rate_limit_expiration(&self) -> Option<SystemTime> {
let (duration, _) = *self.rate_limit_count_rx.borrow(); let (expiration_time, _) = *self.rate_limit_count_rx.borrow();
duration expiration_time
} }
fn truncate(&self, span: &str) -> (String, usize) { fn truncate(&self, span: &str) -> (String, usize) {
let mut tokens = OPENAI_BPE_TOKENIZER.encode_with_special_tokens(span); let mut tokens = OPENAI_BPE_TOKENIZER.encode_with_special_tokens(span);
@ -321,10 +315,11 @@ impl EmbeddingProvider for OpenAIEmbeddings {
}; };
// If we've previously rate limited, increment the duration but not the count // If we've previously rate limited, increment the duration but not the count
let reset_time = SystemTime::now().add(delay_duration);
if rate_limiting { if rate_limiting {
self.update_rate_limit(delay_duration, 0); self.update_rate_limit(reset_time, 0);
} else { } else {
self.update_rate_limit(delay_duration, 1); self.update_rate_limit(reset_time, 1);
} }
rate_limiting = true; rate_limiting = true;

View file

@ -112,7 +112,7 @@ pub enum SemanticIndexStatus {
Indexed, Indexed,
Indexing { Indexing {
remaining_files: usize, remaining_files: usize,
rate_limiting: Duration, rate_limit_expiration_time: Option<SystemTime>,
}, },
} }
@ -132,8 +132,6 @@ struct ProjectState {
pending_file_count_rx: watch::Receiver<usize>, pending_file_count_rx: watch::Receiver<usize>,
pending_file_count_tx: Arc<Mutex<watch::Sender<usize>>>, pending_file_count_tx: Arc<Mutex<watch::Sender<usize>>>,
pending_index: usize, pending_index: usize,
rate_limiting_count_rx: watch::Receiver<usize>,
rate_limiting_count_tx: Arc<Mutex<watch::Sender<usize>>>,
_subscription: gpui::Subscription, _subscription: gpui::Subscription,
_observe_pending_file_count: Task<()>, _observe_pending_file_count: Task<()>,
} }
@ -225,15 +223,11 @@ impl ProjectState {
fn new(subscription: gpui::Subscription, cx: &mut ModelContext<SemanticIndex>) -> Self { fn new(subscription: gpui::Subscription, cx: &mut ModelContext<SemanticIndex>) -> Self {
let (pending_file_count_tx, pending_file_count_rx) = watch::channel_with(0); let (pending_file_count_tx, pending_file_count_rx) = watch::channel_with(0);
let pending_file_count_tx = Arc::new(Mutex::new(pending_file_count_tx)); let pending_file_count_tx = Arc::new(Mutex::new(pending_file_count_tx));
let (rate_limiting_count_tx, rate_limiting_count_rx) = watch::channel_with(0);
let rate_limiting_count_tx = Arc::new(Mutex::new(rate_limiting_count_tx));
Self { Self {
worktrees: Default::default(), worktrees: Default::default(),
pending_file_count_rx: pending_file_count_rx.clone(), pending_file_count_rx: pending_file_count_rx.clone(),
pending_file_count_tx, pending_file_count_tx,
pending_index: 0, pending_index: 0,
rate_limiting_count_rx: rate_limiting_count_rx.clone(),
rate_limiting_count_tx,
_subscription: subscription, _subscription: subscription,
_observe_pending_file_count: cx.spawn_weak({ _observe_pending_file_count: cx.spawn_weak({
let mut pending_file_count_rx = pending_file_count_rx.clone(); let mut pending_file_count_rx = pending_file_count_rx.clone();
@ -299,7 +293,7 @@ impl SemanticIndex {
} else { } else {
SemanticIndexStatus::Indexing { SemanticIndexStatus::Indexing {
remaining_files: project_state.pending_file_count_rx.borrow().clone(), remaining_files: project_state.pending_file_count_rx.borrow().clone(),
rate_limiting: self.embedding_provider.rate_limit_expiration(), rate_limit_expiration_time: self.embedding_provider.rate_limit_expiration(),
} }
} }
} else { } else {

View file

@ -21,7 +21,7 @@ use std::{
atomic::{self, AtomicUsize}, atomic::{self, AtomicUsize},
Arc, Arc,
}, },
time::{Duration, SystemTime}, time::SystemTime,
}; };
use unindent::Unindent; use unindent::Unindent;
use util::RandomCharIter; use util::RandomCharIter;
@ -1275,8 +1275,8 @@ impl EmbeddingProvider for FakeEmbeddingProvider {
200 200
} }
fn rate_limit_expiration(&self) -> Duration { fn rate_limit_expiration(&self) -> Option<SystemTime> {
Duration::ZERO None
} }
async fn embed_batch(&self, spans: Vec<String>) -> Result<Vec<Embedding>> { async fn embed_batch(&self, spans: Vec<String>) -> Result<Vec<Embedding>> {