More resilient eval (#32257)

Bubbles up rate limit information so that we can retry after a certain
duration if needed higher up in the stack.

Also caps the number of concurrent evals running at once to also help.

Release Notes:

- N/A
This commit is contained in:
Ben Brandt 2025-06-09 20:07:22 +02:00 committed by GitHub
parent fa54fa80d0
commit e4bd115a63
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 147 additions and 56 deletions

View file

@ -1,4 +1,3 @@
use anyhow::Result;
use futures::Stream;
use smol::lock::{Semaphore, SemaphoreGuardArc};
use std::{
@ -8,6 +7,8 @@ use std::{
task::{Context, Poll},
};
use crate::LanguageModelCompletionError;
#[derive(Clone)]
pub struct RateLimiter {
semaphore: Arc<Semaphore>,
@ -36,9 +37,12 @@ impl RateLimiter {
}
}
pub fn run<'a, Fut, T>(&self, future: Fut) -> impl 'a + Future<Output = Result<T>>
pub fn run<'a, Fut, T>(
&self,
future: Fut,
) -> impl 'a + Future<Output = Result<T, LanguageModelCompletionError>>
where
Fut: 'a + Future<Output = Result<T>>,
Fut: 'a + Future<Output = Result<T, LanguageModelCompletionError>>,
{
let guard = self.semaphore.acquire_arc();
async move {
@ -52,9 +56,12 @@ impl RateLimiter {
pub fn stream<'a, Fut, T>(
&self,
future: Fut,
) -> impl 'a + Future<Output = Result<impl Stream<Item = T::Item> + use<Fut, T>>>
) -> impl 'a
+ Future<
Output = Result<impl Stream<Item = T::Item> + use<Fut, T>, LanguageModelCompletionError>,
>
where
Fut: 'a + Future<Output = Result<T>>,
Fut: 'a + Future<Output = Result<T, LanguageModelCompletionError>>,
T: Stream,
{
let guard = self.semaphore.acquire_arc();