
Closes #26030 *Note: This is my first contribution to Zed* This addresses a second streaming bottleneck in Bedrock that remained after the initial fix in #28281 (released in preview 194). The issue is in the mechanism used to convert Zed's internal `AsyncBody` into the `SdkBody` expected by the Bedrock language provider. We are using a non-streaming converter that buffers responses. **How the fix works:** The AWS SDK provides streaming-compatible converters to create `SdkBody` instances, but these require the input body to implement the `Body` trait from the `http-body` crate. This PR enables streaming by implementing the required trait and switching to the streaming-compatible converter. **Changes (2 commits):** * 1st Commit - **Implement http-body Body trait for AsyncBody:** - Add `http-body = 1.0` dependency (already an indirect dependency) - Implement the `Body` trait for our existing `AsyncBody` type - Uses `poll_frame` to read data chunks asynchronously, preserving streaming behavior * 2nd Commit - **Use streaming-compatible AWS SDK converter:** - Create `SdkBody` using `SdkBody::from_body_1_x()` with the new `Body` trait implementation **Details/FAQ:** **Q: Why add another dependency?** A: We tried to avoid adding a dependency, but the AWS SDK requires the `Body` trait and `http-body` is where it's defined. The crate is already an indirect dependency, making this a reasonable solution. **Q: Why modify the shared `http_client` crate instead of just `aws_bedrock_client`?** A: We considered implementing the `Body` trait on a wrapper in `aws_bedrock_client`, but since `AsyncBody` already uses `http` crate types, extending support to the companion `http-body` crate seems reasonable and may benefit other integrations. **Q: How was this bottleneck discovered?** A: After @5herlocked's initial streaming fix in #28281, I tested preview 194 and noticed streaming still had issues. I found a way to reproduce the problem and chatted with @5herlocked about it. He immediately pinpointed the exact location where the issue was occurring, his diagnosis made this fix possible. **Q: How does this relate to the previous fix?** A: #28281 fixed buffering issues higher in the stack, but unfortunately there was another bottleneck lower-down in the aws-http-client. This PR addresses that separate buffering issue. **Q: Does this use zero-copy or one-copy?** A: The `Body` implementation includes one copy. Someone more knowledgeable might be able to achieve a zero-copy approach, but we opted for a conservative approach. The performance impact should not be perceptible in typical usage. **Testing:** Confirmed that Bedrock streaming now works without buffering delays in a local build. Release Notes: - Improved Bedrock streaming by eliminating response buffering delays --------- Co-authored-by: Marshall Bowers <git@maxdeviant.com>
104 lines
3 KiB
Rust
104 lines
3 KiB
Rust
use std::fmt;
|
|
use std::sync::Arc;
|
|
|
|
use aws_smithy_runtime_api::client::http::{
|
|
HttpClient as AwsClient, HttpConnector as AwsConnector,
|
|
HttpConnectorFuture as AwsConnectorFuture, HttpConnectorFuture, HttpConnectorSettings,
|
|
SharedHttpConnector,
|
|
};
|
|
use aws_smithy_runtime_api::client::orchestrator::{HttpRequest as AwsHttpRequest, HttpResponse};
|
|
use aws_smithy_runtime_api::client::result::ConnectorError;
|
|
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
|
|
use aws_smithy_runtime_api::http::{Headers, StatusCode};
|
|
use aws_smithy_types::body::SdkBody;
|
|
use http_client::AsyncBody;
|
|
use http_client::{HttpClient, Request};
|
|
|
|
struct AwsHttpConnector {
|
|
client: Arc<dyn HttpClient>,
|
|
}
|
|
|
|
impl std::fmt::Debug for AwsHttpConnector {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result {
|
|
f.debug_struct("AwsHttpConnector").finish()
|
|
}
|
|
}
|
|
|
|
impl AwsConnector for AwsHttpConnector {
|
|
fn call(&self, request: AwsHttpRequest) -> AwsConnectorFuture {
|
|
let req = match request.try_into_http1x() {
|
|
Ok(req) => req,
|
|
Err(err) => {
|
|
return HttpConnectorFuture::ready(Err(ConnectorError::other(err.into(), None)));
|
|
}
|
|
};
|
|
|
|
let (parts, body) = req.into_parts();
|
|
|
|
let response = self
|
|
.client
|
|
.send(Request::from_parts(parts, convert_to_async_body(body)));
|
|
|
|
HttpConnectorFuture::new(async move {
|
|
let response = match response.await {
|
|
Ok(response) => response,
|
|
Err(err) => return Err(ConnectorError::other(err.into(), None)),
|
|
};
|
|
let (parts, body) = response.into_parts();
|
|
|
|
let mut response = HttpResponse::new(
|
|
StatusCode::try_from(parts.status.as_u16()).unwrap(),
|
|
convert_to_sdk_body(body),
|
|
);
|
|
|
|
let headers = match Headers::try_from(parts.headers) {
|
|
Ok(headers) => headers,
|
|
Err(err) => return Err(ConnectorError::other(err.into(), None)),
|
|
};
|
|
|
|
*response.headers_mut() = headers;
|
|
|
|
Ok(response)
|
|
})
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct AwsHttpClient {
|
|
client: Arc<dyn HttpClient>,
|
|
}
|
|
|
|
impl std::fmt::Debug for AwsHttpClient {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result {
|
|
f.debug_struct("AwsHttpClient").finish()
|
|
}
|
|
}
|
|
|
|
impl AwsHttpClient {
|
|
pub fn new(client: Arc<dyn HttpClient>) -> Self {
|
|
Self { client }
|
|
}
|
|
}
|
|
|
|
impl AwsClient for AwsHttpClient {
|
|
fn http_connector(
|
|
&self,
|
|
_settings: &HttpConnectorSettings,
|
|
_components: &RuntimeComponents,
|
|
) -> SharedHttpConnector {
|
|
SharedHttpConnector::new(AwsHttpConnector {
|
|
client: self.client.clone(),
|
|
})
|
|
}
|
|
}
|
|
|
|
pub fn convert_to_sdk_body(body: AsyncBody) -> SdkBody {
|
|
SdkBody::from_body_1_x(body)
|
|
}
|
|
|
|
pub fn convert_to_async_body(body: SdkBody) -> AsyncBody {
|
|
match body.bytes() {
|
|
Some(bytes) => AsyncBody::from((*bytes).to_vec()),
|
|
None => AsyncBody::empty(),
|
|
}
|
|
}
|