remote_server: Remove dependency on libssl and libcrypto (#15446)
Fixes: #15599 Release Notes: - N/A --------- Co-authored-by: Mikayla <mikayla@zed.dev> Co-authored-by: Conrad <conrad@zed.dev>
This commit is contained in:
parent
9016de5d63
commit
2c8a6ee7cc
41 changed files with 670 additions and 226 deletions
|
@ -16,13 +16,12 @@ path = "src/http_client.rs"
|
|||
doctest = true
|
||||
|
||||
[dependencies]
|
||||
http = "1.0.0"
|
||||
http = "0.2"
|
||||
anyhow.workspace = true
|
||||
derive_more.workspace = true
|
||||
futures.workspace = true
|
||||
isahc.workspace = true
|
||||
log.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
futures-lite.workspace = true
|
||||
smol.workspace = true
|
||||
url.workspace = true
|
||||
|
|
109
crates/http_client/src/async_body.rs
Normal file
109
crates/http_client/src/async_body.rs
Normal file
|
@ -0,0 +1,109 @@
|
|||
use std::{borrow::Cow, io::Read, pin::Pin, task::Poll};
|
||||
|
||||
use futures::{AsyncRead, AsyncReadExt};
|
||||
|
||||
/// Based on the implementation of AsyncBody in
|
||||
/// https://github.com/sagebind/isahc/blob/5c533f1ef4d6bdf1fd291b5103c22110f41d0bf0/src/body/mod.rs
|
||||
pub struct AsyncBody(pub Inner);
|
||||
|
||||
pub enum Inner {
|
||||
/// An empty body.
|
||||
Empty,
|
||||
|
||||
/// A body stored in memory.
|
||||
SyncReader(std::io::Cursor<Cow<'static, [u8]>>),
|
||||
|
||||
/// An asynchronous reader.
|
||||
AsyncReader(Pin<Box<dyn futures::AsyncRead + Send + Sync>>),
|
||||
}
|
||||
|
||||
impl AsyncBody {
|
||||
/// Create a new empty body.
|
||||
///
|
||||
/// An empty body represents the *absence* of a body, which is semantically
|
||||
/// different than the presence of a body of zero length.
|
||||
pub fn empty() -> Self {
|
||||
Self(Inner::Empty)
|
||||
}
|
||||
/// Create a streaming body that reads from the given reader.
|
||||
pub fn from_reader<R>(read: R) -> Self
|
||||
where
|
||||
R: AsyncRead + Send + Sync + 'static,
|
||||
{
|
||||
Self(Inner::AsyncReader(Box::pin(read)))
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for AsyncBody {
|
||||
fn default() -> Self {
|
||||
Self(Inner::Empty)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<()> for AsyncBody {
|
||||
fn from(_: ()) -> Self {
|
||||
Self(Inner::Empty)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Vec<u8>> for AsyncBody {
|
||||
fn from(body: Vec<u8>) -> Self {
|
||||
Self(Inner::SyncReader(std::io::Cursor::new(Cow::Owned(body))))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&'_ [u8]> for AsyncBody {
|
||||
fn from(body: &[u8]) -> Self {
|
||||
body.to_vec().into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<String> for AsyncBody {
|
||||
fn from(body: String) -> Self {
|
||||
body.into_bytes().into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&'_ str> for AsyncBody {
|
||||
fn from(body: &str) -> Self {
|
||||
body.as_bytes().into()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Into<Self>> From<Option<T>> for AsyncBody {
|
||||
fn from(body: Option<T>) -> Self {
|
||||
match body {
|
||||
Some(body) => body.into(),
|
||||
None => Self(Inner::Empty),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::io::Read for AsyncBody {
|
||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
match &mut self.0 {
|
||||
Inner::Empty => Ok(0),
|
||||
Inner::SyncReader(cursor) => cursor.read(buf),
|
||||
Inner::AsyncReader(async_reader) => smol::block_on(async_reader.read(buf)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl futures::AsyncRead for AsyncBody {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> std::task::Poll<std::io::Result<usize>> {
|
||||
// SAFETY: Standard Enum pin projection
|
||||
let inner = unsafe { &mut self.get_unchecked_mut().0 };
|
||||
match inner {
|
||||
Inner::Empty => Poll::Ready(Ok(0)),
|
||||
// Blocking call is over an in-memory buffer
|
||||
Inner::SyncReader(cursor) => Poll::Ready(cursor.read(buf)),
|
||||
Inner::AsyncReader(async_reader) => {
|
||||
AsyncRead::poll_read(async_reader.as_mut(), cx, buf)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -34,7 +34,7 @@ pub async fn latest_github_release(
|
|||
) -> Result<GithubRelease, anyhow::Error> {
|
||||
let mut response = http
|
||||
.get(
|
||||
&format!("https://api.github.com/repos/{repo_name_with_owner}/releases"),
|
||||
format!("https://api.github.com/repos/{repo_name_with_owner}/releases").as_str(),
|
||||
Default::default(),
|
||||
true,
|
||||
)
|
||||
|
@ -91,13 +91,14 @@ pub async fn get_release_by_tag_name(
|
|||
.context("error fetching latest release")?;
|
||||
|
||||
let mut body = Vec::new();
|
||||
let status = response.status();
|
||||
response
|
||||
.body_mut()
|
||||
.read_to_end(&mut body)
|
||||
.await
|
||||
.context("error reading latest release")?;
|
||||
|
||||
if response.status().is_client_error() {
|
||||
if status.is_client_error() {
|
||||
let text = String::from_utf8_lossy(body.as_slice());
|
||||
bail!(
|
||||
"status error {}, response: {text:?}",
|
||||
|
|
|
@ -1,47 +1,48 @@
|
|||
mod async_body;
|
||||
pub mod github;
|
||||
|
||||
pub use anyhow::{anyhow, Result};
|
||||
pub use async_body::{AsyncBody, Inner};
|
||||
use derive_more::Deref;
|
||||
pub use http::{self, Method, Request, Response, StatusCode, Uri};
|
||||
|
||||
use futures::future::BoxFuture;
|
||||
use futures_lite::FutureExt;
|
||||
use isahc::config::{Configurable, RedirectPolicy};
|
||||
pub use isahc::http;
|
||||
pub use isahc::{
|
||||
http::{Method, StatusCode, Uri},
|
||||
AsyncBody, Error, HttpClient as IsahcHttpClient, Request, Response,
|
||||
};
|
||||
use http::request::Builder;
|
||||
#[cfg(feature = "test-support")]
|
||||
use std::fmt;
|
||||
use std::{
|
||||
sync::{Arc, Mutex},
|
||||
time::Duration,
|
||||
};
|
||||
use std::sync::{Arc, Mutex};
|
||||
pub use url::Url;
|
||||
|
||||
pub trait HttpClient: Send + Sync {
|
||||
pub trait HttpClient: 'static + Send + Sync {
|
||||
fn send(
|
||||
&self,
|
||||
req: http::Request<AsyncBody>,
|
||||
) -> BoxFuture<'static, Result<Response<AsyncBody>, anyhow::Error>> {
|
||||
self.send_with_redirect_policy(req, false)
|
||||
}
|
||||
|
||||
// TODO: Make a better API for this
|
||||
fn send_with_redirect_policy(
|
||||
&self,
|
||||
req: Request<AsyncBody>,
|
||||
) -> BoxFuture<'static, Result<Response<AsyncBody>, Error>>;
|
||||
follow_redirects: bool,
|
||||
) -> BoxFuture<'static, Result<Response<AsyncBody>, anyhow::Error>>;
|
||||
|
||||
fn get<'a>(
|
||||
&'a self,
|
||||
uri: &str,
|
||||
body: AsyncBody,
|
||||
follow_redirects: bool,
|
||||
) -> BoxFuture<'a, Result<Response<AsyncBody>, Error>> {
|
||||
let request = isahc::Request::builder()
|
||||
.redirect_policy(if follow_redirects {
|
||||
RedirectPolicy::Follow
|
||||
} else {
|
||||
RedirectPolicy::None
|
||||
})
|
||||
.method(Method::GET)
|
||||
.uri(uri)
|
||||
.body(body);
|
||||
) -> BoxFuture<'a, Result<Response<AsyncBody>, anyhow::Error>> {
|
||||
let request = Builder::new().uri(uri).body(body);
|
||||
|
||||
match request {
|
||||
Ok(request) => self.send(request),
|
||||
Err(error) => async move { Err(error.into()) }.boxed(),
|
||||
Ok(request) => Box::pin(async move {
|
||||
self.send_with_redirect_policy(request, follow_redirects)
|
||||
.await
|
||||
.map_err(Into::into)
|
||||
}),
|
||||
Err(e) => Box::pin(async move { Err(e.into()) }),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -49,15 +50,16 @@ pub trait HttpClient: Send + Sync {
|
|||
&'a self,
|
||||
uri: &str,
|
||||
body: AsyncBody,
|
||||
) -> BoxFuture<'a, Result<Response<AsyncBody>, Error>> {
|
||||
let request = isahc::Request::builder()
|
||||
.method(Method::POST)
|
||||
) -> BoxFuture<'a, Result<Response<AsyncBody>, anyhow::Error>> {
|
||||
let request = Builder::new()
|
||||
.uri(uri)
|
||||
.method(Method::POST)
|
||||
.header("Content-Type", "application/json")
|
||||
.body(body);
|
||||
|
||||
match request {
|
||||
Ok(request) => self.send(request),
|
||||
Err(error) => async move { Err(error.into()) }.boxed(),
|
||||
Ok(request) => Box::pin(async move { self.send(request).await.map_err(Into::into) }),
|
||||
Err(e) => Box::pin(async move { Err(e.into()) }),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -74,29 +76,28 @@ pub struct HttpClientWithProxy {
|
|||
|
||||
impl HttpClientWithProxy {
|
||||
/// Returns a new [`HttpClientWithProxy`] with the given proxy URL.
|
||||
pub fn new(user_agent: Option<String>, proxy_url: Option<String>) -> Self {
|
||||
let proxy_url = proxy_url
|
||||
.and_then(|input| {
|
||||
input
|
||||
.parse::<Uri>()
|
||||
.inspect_err(|e| log::error!("Error parsing proxy settings: {}", e))
|
||||
.ok()
|
||||
})
|
||||
pub fn new(client: Arc<dyn HttpClient>, proxy_url: Option<String>) -> Self {
|
||||
let proxy_uri = proxy_url
|
||||
.and_then(|proxy| proxy.parse().ok())
|
||||
.or_else(read_proxy_from_env);
|
||||
|
||||
Self::new_uri(client, proxy_uri)
|
||||
}
|
||||
pub fn new_uri(client: Arc<dyn HttpClient>, proxy_uri: Option<Uri>) -> Self {
|
||||
Self {
|
||||
client: client(user_agent, proxy_url.clone()),
|
||||
proxy: proxy_url,
|
||||
client,
|
||||
proxy: proxy_uri,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl HttpClient for HttpClientWithProxy {
|
||||
fn send(
|
||||
fn send_with_redirect_policy(
|
||||
&self,
|
||||
req: Request<AsyncBody>,
|
||||
) -> BoxFuture<'static, Result<Response<AsyncBody>, Error>> {
|
||||
self.client.send(req)
|
||||
follow_redirects: bool,
|
||||
) -> BoxFuture<'static, Result<Response<AsyncBody>, anyhow::Error>> {
|
||||
self.client.send_with_redirect_policy(req, follow_redirects)
|
||||
}
|
||||
|
||||
fn proxy(&self) -> Option<&Uri> {
|
||||
|
@ -105,11 +106,12 @@ impl HttpClient for HttpClientWithProxy {
|
|||
}
|
||||
|
||||
impl HttpClient for Arc<HttpClientWithProxy> {
|
||||
fn send(
|
||||
fn send_with_redirect_policy(
|
||||
&self,
|
||||
req: Request<AsyncBody>,
|
||||
) -> BoxFuture<'static, Result<Response<AsyncBody>, Error>> {
|
||||
self.client.send(req)
|
||||
follow_redirects: bool,
|
||||
) -> BoxFuture<'static, Result<Response<AsyncBody>, anyhow::Error>> {
|
||||
self.client.send_with_redirect_policy(req, follow_redirects)
|
||||
}
|
||||
|
||||
fn proxy(&self) -> Option<&Uri> {
|
||||
|
@ -123,14 +125,35 @@ pub struct HttpClientWithUrl {
|
|||
client: HttpClientWithProxy,
|
||||
}
|
||||
|
||||
impl std::ops::Deref for HttpClientWithUrl {
|
||||
type Target = HttpClientWithProxy;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.client
|
||||
}
|
||||
}
|
||||
|
||||
impl HttpClientWithUrl {
|
||||
/// Returns a new [`HttpClientWithUrl`] with the given base URL.
|
||||
pub fn new(
|
||||
client: Arc<dyn HttpClient>,
|
||||
base_url: impl Into<String>,
|
||||
user_agent: Option<String>,
|
||||
proxy_url: Option<String>,
|
||||
) -> Self {
|
||||
let client = HttpClientWithProxy::new(user_agent, proxy_url);
|
||||
let client = HttpClientWithProxy::new(client, proxy_url);
|
||||
|
||||
Self {
|
||||
base_url: Mutex::new(base_url.into()),
|
||||
client,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_uri(
|
||||
client: Arc<dyn HttpClient>,
|
||||
base_url: impl Into<String>,
|
||||
proxy_uri: Option<Uri>,
|
||||
) -> Self {
|
||||
let client = HttpClientWithProxy::new_uri(client, proxy_uri);
|
||||
|
||||
Self {
|
||||
base_url: Mutex::new(base_url.into()),
|
||||
|
@ -195,11 +218,12 @@ impl HttpClientWithUrl {
|
|||
}
|
||||
|
||||
impl HttpClient for Arc<HttpClientWithUrl> {
|
||||
fn send(
|
||||
fn send_with_redirect_policy(
|
||||
&self,
|
||||
req: Request<AsyncBody>,
|
||||
) -> BoxFuture<'static, Result<Response<AsyncBody>, Error>> {
|
||||
self.client.send(req)
|
||||
follow_redirects: bool,
|
||||
) -> BoxFuture<'static, Result<Response<AsyncBody>, anyhow::Error>> {
|
||||
self.client.send_with_redirect_policy(req, follow_redirects)
|
||||
}
|
||||
|
||||
fn proxy(&self) -> Option<&Uri> {
|
||||
|
@ -208,11 +232,12 @@ impl HttpClient for Arc<HttpClientWithUrl> {
|
|||
}
|
||||
|
||||
impl HttpClient for HttpClientWithUrl {
|
||||
fn send(
|
||||
fn send_with_redirect_policy(
|
||||
&self,
|
||||
req: Request<AsyncBody>,
|
||||
) -> BoxFuture<'static, Result<Response<AsyncBody>, Error>> {
|
||||
self.client.send(req)
|
||||
follow_redirects: bool,
|
||||
) -> BoxFuture<'static, Result<Response<AsyncBody>, anyhow::Error>> {
|
||||
self.client.send_with_redirect_policy(req, follow_redirects)
|
||||
}
|
||||
|
||||
fn proxy(&self) -> Option<&Uri> {
|
||||
|
@ -220,26 +245,7 @@ impl HttpClient for HttpClientWithUrl {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn client(user_agent: Option<String>, proxy: Option<Uri>) -> Arc<dyn HttpClient> {
|
||||
let mut builder = isahc::HttpClient::builder()
|
||||
// Some requests to Qwen2 models on Runpod can take 32+ seconds,
|
||||
// especially if there's a cold boot involved. We may need to have
|
||||
// those requests use a different http client, because global timeouts
|
||||
// of 50 and 60 seconds, respectively, would be very high!
|
||||
.connect_timeout(Duration::from_secs(5))
|
||||
.low_speed_timeout(100, Duration::from_secs(30))
|
||||
.proxy(proxy.clone());
|
||||
if let Some(user_agent) = user_agent {
|
||||
builder = builder.default_header("User-Agent", user_agent);
|
||||
}
|
||||
|
||||
Arc::new(HttpClientWithProxy {
|
||||
client: Arc::new(builder.build().unwrap()),
|
||||
proxy,
|
||||
})
|
||||
}
|
||||
|
||||
fn read_proxy_from_env() -> Option<Uri> {
|
||||
pub fn read_proxy_from_env() -> Option<Uri> {
|
||||
const ENV_VARS: &[&str] = &[
|
||||
"ALL_PROXY",
|
||||
"all_proxy",
|
||||
|
@ -258,23 +264,9 @@ fn read_proxy_from_env() -> Option<Uri> {
|
|||
None
|
||||
}
|
||||
|
||||
impl HttpClient for isahc::HttpClient {
|
||||
fn send(
|
||||
&self,
|
||||
req: Request<AsyncBody>,
|
||||
) -> BoxFuture<'static, Result<Response<AsyncBody>, Error>> {
|
||||
let client = self.clone();
|
||||
Box::pin(async move { client.send_async(req).await })
|
||||
}
|
||||
|
||||
fn proxy(&self) -> Option<&Uri> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "test-support")]
|
||||
type FakeHttpHandler = Box<
|
||||
dyn Fn(Request<AsyncBody>) -> BoxFuture<'static, Result<Response<AsyncBody>, Error>>
|
||||
dyn Fn(Request<AsyncBody>) -> BoxFuture<'static, Result<Response<AsyncBody>, anyhow::Error>>
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
|
@ -289,7 +281,7 @@ pub struct FakeHttpClient {
|
|||
impl FakeHttpClient {
|
||||
pub fn create<Fut, F>(handler: F) -> Arc<HttpClientWithUrl>
|
||||
where
|
||||
Fut: futures::Future<Output = Result<Response<AsyncBody>, Error>> + Send + 'static,
|
||||
Fut: futures::Future<Output = Result<Response<AsyncBody>, anyhow::Error>> + Send + 'static,
|
||||
F: Fn(Request<AsyncBody>) -> Fut + Send + Sync + 'static,
|
||||
{
|
||||
Arc::new(HttpClientWithUrl {
|
||||
|
@ -331,12 +323,13 @@ impl fmt::Debug for FakeHttpClient {
|
|||
|
||||
#[cfg(feature = "test-support")]
|
||||
impl HttpClient for FakeHttpClient {
|
||||
fn send(
|
||||
fn send_with_redirect_policy(
|
||||
&self,
|
||||
req: Request<AsyncBody>,
|
||||
) -> BoxFuture<'static, Result<Response<AsyncBody>, Error>> {
|
||||
_follow_redirects: bool,
|
||||
) -> BoxFuture<'static, Result<Response<AsyncBody>, anyhow::Error>> {
|
||||
let future = (self.handler)(req);
|
||||
Box::pin(async move { future.await.map(Into::into) })
|
||||
future
|
||||
}
|
||||
|
||||
fn proxy(&self) -> Option<&Uri> {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue