ZIm/crates/aws_http_client/src/aws_http_client.rs
tiagoq 56b99f49fd
bedrock: Fix remaining streaming delays (#33931)
Closes #26030

*Note: This is my first contribution to Zed*

This addresses a second streaming bottleneck in Bedrock that remained
after the initial fix in #28281 (released in preview 194).

The issue is in the mechanism used to convert Zed's internal `AsyncBody`
into the `SdkBody` expected by the Bedrock language provider. We are
using a non-streaming converter that buffers responses.

**How the fix works:**
The AWS SDK provides streaming-compatible converters to create `SdkBody`
instances, but these require the input body to implement the `Body`
trait from the `http-body` crate.

This PR enables streaming by implementing the required trait and
switching to the streaming-compatible converter.

**Changes (2 commits):**

 * 1st Commit - **Implement http-body Body trait for AsyncBody:**
   - Add `http-body = 1.0` dependency (already an indirect dependency)
   - Implement the `Body` trait for our existing `AsyncBody` type
- Uses `poll_frame` to read data chunks asynchronously, preserving
streaming behavior

 * 2nd Commit - **Use streaming-compatible AWS SDK converter:**
- Create `SdkBody` using `SdkBody::from_body_1_x()` with the new `Body`
trait implementation

**Details/FAQ:**

**Q: Why add another dependency?**
A: We tried to avoid adding a dependency, but the AWS SDK requires the
`Body` trait and `http-body` is where it's defined. The crate is already
an indirect dependency, making this a reasonable solution.

**Q: Why modify the shared `http_client` crate instead of just
`aws_bedrock_client`?**
A: We considered implementing the `Body` trait on a wrapper in
`aws_bedrock_client`, but since `AsyncBody` already uses `http` crate
types, extending support to the companion `http-body` crate seems
reasonable and may benefit other integrations.

**Q: How was this bottleneck discovered?**
A: After @5herlocked's initial streaming fix in #28281, I tested preview
194 and noticed streaming still had issues. I found a way to reproduce
the problem and chatted with @5herlocked about it. He immediately
pinpointed the exact location where the issue was occurring, his
diagnosis made this fix possible.

**Q: How does this relate to the previous fix?**
A: #28281 fixed buffering issues higher in the stack, but unfortunately
there was another bottleneck lower-down in the aws-http-client. This PR
addresses that separate buffering issue.

**Q: Does this use zero-copy or one-copy?**
A: The `Body` implementation includes one copy. Someone more
knowledgeable might be able to achieve a zero-copy approach, but we
opted for a conservative approach. The performance impact should not be
perceptible in typical usage.

**Testing:**
Confirmed that Bedrock streaming now works without buffering delays in a
local build.


Release Notes:

- Improved Bedrock streaming by eliminating response buffering delays

---------

Co-authored-by: Marshall Bowers <git@maxdeviant.com>
2025-07-22 11:55:24 -04:00

104 lines
3 KiB
Rust

use std::fmt;
use std::sync::Arc;
use aws_smithy_runtime_api::client::http::{
HttpClient as AwsClient, HttpConnector as AwsConnector,
HttpConnectorFuture as AwsConnectorFuture, HttpConnectorFuture, HttpConnectorSettings,
SharedHttpConnector,
};
use aws_smithy_runtime_api::client::orchestrator::{HttpRequest as AwsHttpRequest, HttpResponse};
use aws_smithy_runtime_api::client::result::ConnectorError;
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
use aws_smithy_runtime_api::http::{Headers, StatusCode};
use aws_smithy_types::body::SdkBody;
use http_client::AsyncBody;
use http_client::{HttpClient, Request};
struct AwsHttpConnector {
client: Arc<dyn HttpClient>,
}
impl std::fmt::Debug for AwsHttpConnector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AwsHttpConnector").finish()
}
}
impl AwsConnector for AwsHttpConnector {
fn call(&self, request: AwsHttpRequest) -> AwsConnectorFuture {
let req = match request.try_into_http1x() {
Ok(req) => req,
Err(err) => {
return HttpConnectorFuture::ready(Err(ConnectorError::other(err.into(), None)));
}
};
let (parts, body) = req.into_parts();
let response = self
.client
.send(Request::from_parts(parts, convert_to_async_body(body)));
HttpConnectorFuture::new(async move {
let response = match response.await {
Ok(response) => response,
Err(err) => return Err(ConnectorError::other(err.into(), None)),
};
let (parts, body) = response.into_parts();
let mut response = HttpResponse::new(
StatusCode::try_from(parts.status.as_u16()).unwrap(),
convert_to_sdk_body(body),
);
let headers = match Headers::try_from(parts.headers) {
Ok(headers) => headers,
Err(err) => return Err(ConnectorError::other(err.into(), None)),
};
*response.headers_mut() = headers;
Ok(response)
})
}
}
#[derive(Clone)]
pub struct AwsHttpClient {
client: Arc<dyn HttpClient>,
}
impl std::fmt::Debug for AwsHttpClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AwsHttpClient").finish()
}
}
impl AwsHttpClient {
pub fn new(client: Arc<dyn HttpClient>) -> Self {
Self { client }
}
}
impl AwsClient for AwsHttpClient {
fn http_connector(
&self,
_settings: &HttpConnectorSettings,
_components: &RuntimeComponents,
) -> SharedHttpConnector {
SharedHttpConnector::new(AwsHttpConnector {
client: self.client.clone(),
})
}
}
pub fn convert_to_sdk_body(body: AsyncBody) -> SdkBody {
SdkBody::from_body_1_x(body)
}
pub fn convert_to_async_body(body: SdkBody) -> AsyncBody {
match body.bytes() {
Some(bytes) => AsyncBody::from((*bytes).to_vec()),
None => AsyncBody::empty(),
}
}