Restore HTTP client transition, but use reqwest everywhere (#19055)
Release Notes: - N/A
This commit is contained in:
parent
c709b66f35
commit
22ac178f9d
35 changed files with 838 additions and 418 deletions
34
crates/reqwest_client/Cargo.toml
Normal file
34
crates/reqwest_client/Cargo.toml
Normal file
|
@ -0,0 +1,34 @@
|
|||
[package]
|
||||
name = "reqwest_client"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
publish = false
|
||||
license = "Apache-2.0"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[features]
|
||||
test-support = []
|
||||
|
||||
[lib]
|
||||
path = "src/reqwest_client.rs"
|
||||
doctest = true
|
||||
|
||||
[[example]]
|
||||
name = "client"
|
||||
path = "examples/client.rs"
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
bytes = "1.0"
|
||||
futures.workspace = true
|
||||
http_client.workspace = true
|
||||
serde.workspace = true
|
||||
smol.workspace = true
|
||||
log.workspace = true
|
||||
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
reqwest = { workspace = true, features = ["rustls-tls-manual-roots", "stream"] }
|
||||
|
||||
[dev-dependencies]
|
||||
gpui.workspace = true
|
1
crates/reqwest_client/LICENSE-GPL
Symbolic link
1
crates/reqwest_client/LICENSE-GPL
Symbolic link
|
@ -0,0 +1 @@
|
|||
../../LICENSE-GPL
|
41
crates/reqwest_client/examples/client.rs
Normal file
41
crates/reqwest_client/examples/client.rs
Normal file
|
@ -0,0 +1,41 @@
|
|||
use std::time::Instant;
|
||||
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::AsyncReadExt as _;
|
||||
use http_client::AsyncBody;
|
||||
use http_client::HttpClient;
|
||||
use reqwest_client::ReqwestClient;
|
||||
use smol::stream::StreamExt;
|
||||
|
||||
fn main() {
|
||||
let app = gpui::App::new();
|
||||
app.run(|cx| {
|
||||
cx.spawn(|cx| async move {
|
||||
let client = ReqwestClient::new();
|
||||
let start = Instant::now();
|
||||
let requests = [
|
||||
client.get("https://www.google.com/", AsyncBody::empty(), true),
|
||||
client.get("https://zed.dev/", AsyncBody::empty(), true),
|
||||
client.get("https://docs.rs/", AsyncBody::empty(), true),
|
||||
];
|
||||
let mut requests = requests.into_iter().collect::<FuturesUnordered<_>>();
|
||||
while let Some(response) = requests.next().await {
|
||||
let mut body = String::new();
|
||||
response
|
||||
.unwrap()
|
||||
.into_body()
|
||||
.read_to_string(&mut body)
|
||||
.await
|
||||
.unwrap();
|
||||
println!("{}", &body.len());
|
||||
}
|
||||
println!("{:?}", start.elapsed());
|
||||
|
||||
cx.update(|cx| {
|
||||
cx.quit();
|
||||
})
|
||||
.ok();
|
||||
})
|
||||
.detach();
|
||||
})
|
||||
}
|
261
crates/reqwest_client/src/reqwest_client.rs
Normal file
261
crates/reqwest_client/src/reqwest_client.rs
Normal file
|
@ -0,0 +1,261 @@
|
|||
use std::{any::type_name, borrow::Cow, io::Read, mem, pin::Pin, sync::OnceLock, task::Poll};
|
||||
|
||||
use anyhow::anyhow;
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use futures::{AsyncRead, TryStreamExt};
|
||||
use http_client::{http, ReadTimeout, RedirectPolicy};
|
||||
use reqwest::{
|
||||
header::{HeaderMap, HeaderValue},
|
||||
redirect,
|
||||
};
|
||||
use smol::future::FutureExt;
|
||||
|
||||
const DEFAULT_CAPACITY: usize = 4096;
|
||||
|
||||
pub struct ReqwestClient {
|
||||
client: reqwest::Client,
|
||||
proxy: Option<http::Uri>,
|
||||
handle: tokio::runtime::Handle,
|
||||
}
|
||||
|
||||
impl ReqwestClient {
|
||||
pub fn new() -> Self {
|
||||
reqwest::Client::new().into()
|
||||
}
|
||||
|
||||
pub fn user_agent(agent: &str) -> anyhow::Result<Self> {
|
||||
let mut map = HeaderMap::new();
|
||||
map.insert(http::header::USER_AGENT, HeaderValue::from_str(agent)?);
|
||||
let client = reqwest::Client::builder().default_headers(map).build()?;
|
||||
Ok(client.into())
|
||||
}
|
||||
|
||||
pub fn proxy_and_user_agent(proxy: Option<http::Uri>, agent: &str) -> anyhow::Result<Self> {
|
||||
let mut map = HeaderMap::new();
|
||||
map.insert(http::header::USER_AGENT, HeaderValue::from_str(agent)?);
|
||||
let mut client = reqwest::Client::builder().default_headers(map);
|
||||
if let Some(proxy) = proxy.clone() {
|
||||
client = client.proxy(reqwest::Proxy::all(proxy.to_string())?);
|
||||
}
|
||||
let client = client.build()?;
|
||||
let mut client: ReqwestClient = client.into();
|
||||
client.proxy = proxy;
|
||||
Ok(client)
|
||||
}
|
||||
}
|
||||
|
||||
static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
|
||||
|
||||
impl From<reqwest::Client> for ReqwestClient {
|
||||
fn from(client: reqwest::Client) -> Self {
|
||||
let handle = tokio::runtime::Handle::try_current().unwrap_or_else(|_| {
|
||||
log::info!("no tokio runtime found, creating one for Reqwest...");
|
||||
let runtime = RUNTIME.get_or_init(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
// Since we now have two executors, let's try to keep our footprint small
|
||||
.worker_threads(1)
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to initialize HTTP client")
|
||||
});
|
||||
|
||||
runtime.handle().clone()
|
||||
});
|
||||
Self {
|
||||
client,
|
||||
handle,
|
||||
proxy: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This struct is essentially a re-implementation of
|
||||
// https://docs.rs/tokio-util/0.7.12/tokio_util/io/struct.ReaderStream.html
|
||||
// except outside of Tokio's aegis
|
||||
struct StreamReader {
|
||||
reader: Option<Pin<Box<dyn futures::AsyncRead + Send + Sync>>>,
|
||||
buf: BytesMut,
|
||||
capacity: usize,
|
||||
}
|
||||
|
||||
impl StreamReader {
|
||||
fn new(reader: Pin<Box<dyn futures::AsyncRead + Send + Sync>>) -> Self {
|
||||
Self {
|
||||
reader: Some(reader),
|
||||
buf: BytesMut::new(),
|
||||
capacity: DEFAULT_CAPACITY,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl futures::Stream for StreamReader {
|
||||
type Item = std::io::Result<Bytes>;
|
||||
|
||||
fn poll_next(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
let mut this = self.as_mut();
|
||||
|
||||
let mut reader = match this.reader.take() {
|
||||
Some(r) => r,
|
||||
None => return Poll::Ready(None),
|
||||
};
|
||||
|
||||
if this.buf.capacity() == 0 {
|
||||
let capacity = this.capacity;
|
||||
this.buf.reserve(capacity);
|
||||
}
|
||||
|
||||
match poll_read_buf(&mut reader, cx, &mut this.buf) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Err(err)) => {
|
||||
self.reader = None;
|
||||
|
||||
Poll::Ready(Some(Err(err)))
|
||||
}
|
||||
Poll::Ready(Ok(0)) => {
|
||||
self.reader = None;
|
||||
Poll::Ready(None)
|
||||
}
|
||||
Poll::Ready(Ok(_)) => {
|
||||
let chunk = this.buf.split();
|
||||
self.reader = Some(reader);
|
||||
Poll::Ready(Some(Ok(chunk.freeze())))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Implementation from https://docs.rs/tokio-util/0.7.12/src/tokio_util/util/poll_buf.rs.html
|
||||
/// Specialized for this use case
|
||||
pub fn poll_read_buf(
|
||||
io: &mut Pin<Box<dyn futures::AsyncRead + Send + Sync>>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
buf: &mut BytesMut,
|
||||
) -> Poll<std::io::Result<usize>> {
|
||||
if !buf.has_remaining_mut() {
|
||||
return Poll::Ready(Ok(0));
|
||||
}
|
||||
|
||||
let n = {
|
||||
let dst = buf.chunk_mut();
|
||||
|
||||
// Safety: `chunk_mut()` returns a `&mut UninitSlice`, and `UninitSlice` is a
|
||||
// transparent wrapper around `[MaybeUninit<u8>]`.
|
||||
let dst = unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>]) };
|
||||
let mut buf = tokio::io::ReadBuf::uninit(dst);
|
||||
let ptr = buf.filled().as_ptr();
|
||||
let unfilled_portion = buf.initialize_unfilled();
|
||||
// SAFETY: Pin projection
|
||||
let io_pin = unsafe { Pin::new_unchecked(io) };
|
||||
std::task::ready!(io_pin.poll_read(cx, unfilled_portion)?);
|
||||
|
||||
// Ensure the pointer does not change from under us
|
||||
assert_eq!(ptr, buf.filled().as_ptr());
|
||||
buf.filled().len()
|
||||
};
|
||||
|
||||
// Safety: This is guaranteed to be the number of initialized (and read)
|
||||
// bytes due to the invariants provided by `ReadBuf::filled`.
|
||||
unsafe {
|
||||
buf.advance_mut(n);
|
||||
}
|
||||
|
||||
Poll::Ready(Ok(n))
|
||||
}
|
||||
|
||||
struct SyncReader {
|
||||
cursor: Option<std::io::Cursor<Cow<'static, [u8]>>>,
|
||||
}
|
||||
|
||||
impl SyncReader {
|
||||
fn new(cursor: std::io::Cursor<Cow<'static, [u8]>>) -> Self {
|
||||
Self {
|
||||
cursor: Some(cursor),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl futures::stream::Stream for SyncReader {
|
||||
type Item = Result<Bytes, std::io::Error>;
|
||||
|
||||
fn poll_next(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
_cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
let Some(mut cursor) = self.cursor.take() else {
|
||||
return Poll::Ready(None);
|
||||
};
|
||||
|
||||
let mut buf = Vec::new();
|
||||
match cursor.read_to_end(&mut buf) {
|
||||
Ok(_) => {
|
||||
return Poll::Ready(Some(Ok(Bytes::from(buf))));
|
||||
}
|
||||
Err(e) => return Poll::Ready(Some(Err(e))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl http_client::HttpClient for ReqwestClient {
|
||||
fn proxy(&self) -> Option<&http::Uri> {
|
||||
self.proxy.as_ref()
|
||||
}
|
||||
|
||||
fn type_name(&self) -> &'static str {
|
||||
type_name::<Self>()
|
||||
}
|
||||
|
||||
fn send(
|
||||
&self,
|
||||
req: http::Request<http_client::AsyncBody>,
|
||||
) -> futures::future::BoxFuture<
|
||||
'static,
|
||||
Result<http_client::Response<http_client::AsyncBody>, anyhow::Error>,
|
||||
> {
|
||||
let (parts, body) = req.into_parts();
|
||||
|
||||
let mut request = self.client.request(parts.method, parts.uri.to_string());
|
||||
request = request.headers(parts.headers);
|
||||
if let Some(redirect_policy) = parts.extensions.get::<RedirectPolicy>() {
|
||||
request = request.redirect_policy(match redirect_policy {
|
||||
RedirectPolicy::NoFollow => redirect::Policy::none(),
|
||||
RedirectPolicy::FollowLimit(limit) => redirect::Policy::limited(*limit as usize),
|
||||
RedirectPolicy::FollowAll => redirect::Policy::limited(100),
|
||||
});
|
||||
}
|
||||
if let Some(ReadTimeout(timeout)) = parts.extensions.get::<ReadTimeout>() {
|
||||
request = request.timeout(*timeout);
|
||||
}
|
||||
let request = request.body(match body.0 {
|
||||
http_client::Inner::Empty => reqwest::Body::default(),
|
||||
http_client::Inner::SyncReader(cursor) => {
|
||||
reqwest::Body::wrap_stream(SyncReader::new(cursor))
|
||||
}
|
||||
http_client::Inner::AsyncReader(stream) => {
|
||||
reqwest::Body::wrap_stream(StreamReader::new(stream))
|
||||
}
|
||||
});
|
||||
|
||||
let handle = self.handle.clone();
|
||||
async move {
|
||||
let mut response = handle.spawn(async { request.send().await }).await??;
|
||||
|
||||
let headers = mem::take(response.headers_mut());
|
||||
let mut builder = http::Response::builder()
|
||||
.status(response.status().as_u16())
|
||||
.version(response.version());
|
||||
*builder.headers_mut().unwrap() = headers;
|
||||
|
||||
let bytes = response
|
||||
.bytes_stream()
|
||||
.map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
|
||||
.into_async_read();
|
||||
let body = http_client::AsyncBody::from_reader(bytes);
|
||||
|
||||
builder.body(body).map_err(|e| anyhow!(e))
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue