Get RPC2 tests passing
Co-authored-by: Conrad <conrad@zed.dev> Co-authored-by: Kyle <kyle@zed.dev>
This commit is contained in:
parent
71ad3e1b20
commit
065d26f5b2
17 changed files with 210 additions and 178 deletions
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
description = "Shared logic for communication between the Zed app and the zed.dev server"
|
||||
edition = "2021"
|
||||
name = "rpc"
|
||||
name = "rpc2"
|
||||
version = "0.1.0"
|
||||
publish = false
|
||||
|
||||
|
@ -10,12 +10,12 @@ path = "src/rpc.rs"
|
|||
doctest = false
|
||||
|
||||
[features]
|
||||
test-support = ["collections/test-support", "gpui/test-support"]
|
||||
test-support = ["collections/test-support", "gpui2/test-support"]
|
||||
|
||||
[dependencies]
|
||||
clock = { path = "../clock" }
|
||||
collections = { path = "../collections" }
|
||||
gpui = { path = "../gpui", optional = true }
|
||||
gpui2 = { path = "../gpui2", optional = true }
|
||||
util = { path = "../util" }
|
||||
anyhow.workspace = true
|
||||
async-lock = "2.4"
|
||||
|
@ -37,7 +37,7 @@ prost-build = "0.9"
|
|||
|
||||
[dev-dependencies]
|
||||
collections = { path = "../collections", features = ["test-support"] }
|
||||
gpui = { path = "../gpui", features = ["test-support"] }
|
||||
gpui2 = { path = "../gpui2", features = ["test-support"] }
|
||||
smol.workspace = true
|
||||
tempdir.workspace = true
|
||||
ctor.workspace = true
|
||||
|
|
|
@ -34,7 +34,7 @@ impl Connection {
|
|||
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
pub fn in_memory(
|
||||
executor: std::sync::Arc<gpui::executor::Background>,
|
||||
executor: gpui2::Executor,
|
||||
) -> (Self, Self, std::sync::Arc<std::sync::atomic::AtomicBool>) {
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, Ordering::SeqCst},
|
||||
|
@ -53,7 +53,7 @@ impl Connection {
|
|||
#[allow(clippy::type_complexity)]
|
||||
fn channel(
|
||||
killed: Arc<AtomicBool>,
|
||||
executor: Arc<gpui::executor::Background>,
|
||||
executor: gpui2::Executor,
|
||||
) -> (
|
||||
Box<dyn Send + Unpin + futures::Sink<WebSocketMessage, Error = anyhow::Error>>,
|
||||
Box<dyn Send + Unpin + futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>>>,
|
||||
|
@ -66,14 +66,12 @@ impl Connection {
|
|||
|
||||
let tx = tx.sink_map_err(|error| anyhow!(error)).with({
|
||||
let killed = killed.clone();
|
||||
let executor = Arc::downgrade(&executor);
|
||||
let executor = executor.clone();
|
||||
move |msg| {
|
||||
let killed = killed.clone();
|
||||
let executor = executor.clone();
|
||||
Box::pin(async move {
|
||||
if let Some(executor) = executor.upgrade() {
|
||||
executor.simulate_random_delay().await;
|
||||
}
|
||||
executor.simulate_random_delay().await;
|
||||
|
||||
// Writes to a half-open TCP connection will error.
|
||||
if killed.load(SeqCst) {
|
||||
|
@ -87,14 +85,12 @@ impl Connection {
|
|||
|
||||
let rx = rx.then({
|
||||
let killed = killed;
|
||||
let executor = Arc::downgrade(&executor);
|
||||
let executor = executor.clone();
|
||||
move |msg| {
|
||||
let killed = killed.clone();
|
||||
let executor = executor.clone();
|
||||
Box::pin(async move {
|
||||
if let Some(executor) = executor.upgrade() {
|
||||
executor.simulate_random_delay().await;
|
||||
}
|
||||
executor.simulate_random_delay().await;
|
||||
|
||||
// Reads from a half-open TCP connection will hang.
|
||||
if killed.load(SeqCst) {
|
||||
|
|
|
@ -342,7 +342,7 @@ impl Peer {
|
|||
pub fn add_test_connection(
|
||||
self: &Arc<Self>,
|
||||
connection: Connection,
|
||||
executor: Arc<gpui::executor::Background>,
|
||||
executor: gpui2::Executor,
|
||||
) -> (
|
||||
ConnectionId,
|
||||
impl Future<Output = anyhow::Result<()>> + Send,
|
||||
|
@ -557,7 +557,7 @@ mod tests {
|
|||
use super::*;
|
||||
use crate::TypedEnvelope;
|
||||
use async_tungstenite::tungstenite::Message as WebSocketMessage;
|
||||
use gpui::TestAppContext;
|
||||
use gpui2::TestAppContext;
|
||||
|
||||
#[ctor::ctor]
|
||||
fn init_logger() {
|
||||
|
@ -566,9 +566,9 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
#[gpui::test(iterations = 50)]
|
||||
#[gpui2::test(iterations = 50)]
|
||||
async fn test_request_response(cx: &mut TestAppContext) {
|
||||
let executor = cx.foreground();
|
||||
let executor = cx.executor();
|
||||
|
||||
// create 2 clients connected to 1 server
|
||||
let server = Peer::new(0);
|
||||
|
@ -576,18 +576,18 @@ mod tests {
|
|||
let client2 = Peer::new(0);
|
||||
|
||||
let (client1_to_server_conn, server_to_client_1_conn, _kill) =
|
||||
Connection::in_memory(cx.background());
|
||||
Connection::in_memory(cx.executor().clone());
|
||||
let (client1_conn_id, io_task1, client1_incoming) =
|
||||
client1.add_test_connection(client1_to_server_conn, cx.background());
|
||||
client1.add_test_connection(client1_to_server_conn, cx.executor().clone());
|
||||
let (_, io_task2, server_incoming1) =
|
||||
server.add_test_connection(server_to_client_1_conn, cx.background());
|
||||
server.add_test_connection(server_to_client_1_conn, cx.executor().clone());
|
||||
|
||||
let (client2_to_server_conn, server_to_client_2_conn, _kill) =
|
||||
Connection::in_memory(cx.background());
|
||||
Connection::in_memory(cx.executor().clone());
|
||||
let (client2_conn_id, io_task3, client2_incoming) =
|
||||
client2.add_test_connection(client2_to_server_conn, cx.background());
|
||||
client2.add_test_connection(client2_to_server_conn, cx.executor().clone());
|
||||
let (_, io_task4, server_incoming2) =
|
||||
server.add_test_connection(server_to_client_2_conn, cx.background());
|
||||
server.add_test_connection(server_to_client_2_conn, cx.executor().clone());
|
||||
|
||||
executor.spawn(io_task1).detach();
|
||||
executor.spawn(io_task2).detach();
|
||||
|
@ -662,27 +662,27 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
#[gpui::test(iterations = 50)]
|
||||
#[gpui2::test(iterations = 50)]
|
||||
async fn test_order_of_response_and_incoming(cx: &mut TestAppContext) {
|
||||
let executor = cx.foreground();
|
||||
let executor = cx.executor();
|
||||
let server = Peer::new(0);
|
||||
let client = Peer::new(0);
|
||||
|
||||
let (client_to_server_conn, server_to_client_conn, _kill) =
|
||||
Connection::in_memory(cx.background());
|
||||
Connection::in_memory(executor.clone());
|
||||
let (client_to_server_conn_id, io_task1, mut client_incoming) =
|
||||
client.add_test_connection(client_to_server_conn, cx.background());
|
||||
client.add_test_connection(client_to_server_conn, executor.clone());
|
||||
|
||||
let (server_to_client_conn_id, io_task2, mut server_incoming) =
|
||||
server.add_test_connection(server_to_client_conn, cx.background());
|
||||
server.add_test_connection(server_to_client_conn, executor.clone());
|
||||
|
||||
executor.spawn(io_task1).detach();
|
||||
executor.spawn(io_task2).detach();
|
||||
|
||||
executor
|
||||
.spawn(async move {
|
||||
let request = server_incoming
|
||||
.next()
|
||||
.await
|
||||
let future = server_incoming.next().await;
|
||||
let request = future
|
||||
.unwrap()
|
||||
.into_any()
|
||||
.downcast::<TypedEnvelope<proto::Ping>>()
|
||||
|
@ -760,18 +760,18 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
#[gpui::test(iterations = 50)]
|
||||
#[gpui2::test(iterations = 50)]
|
||||
async fn test_dropping_request_before_completion(cx: &mut TestAppContext) {
|
||||
let executor = cx.foreground();
|
||||
let executor = cx.executor().clone();
|
||||
let server = Peer::new(0);
|
||||
let client = Peer::new(0);
|
||||
|
||||
let (client_to_server_conn, server_to_client_conn, _kill) =
|
||||
Connection::in_memory(cx.background());
|
||||
Connection::in_memory(cx.executor().clone());
|
||||
let (client_to_server_conn_id, io_task1, mut client_incoming) =
|
||||
client.add_test_connection(client_to_server_conn, cx.background());
|
||||
client.add_test_connection(client_to_server_conn, cx.executor().clone());
|
||||
let (server_to_client_conn_id, io_task2, mut server_incoming) =
|
||||
server.add_test_connection(server_to_client_conn, cx.background());
|
||||
server.add_test_connection(server_to_client_conn, cx.executor().clone());
|
||||
|
||||
executor.spawn(io_task1).detach();
|
||||
executor.spawn(io_task2).detach();
|
||||
|
@ -858,7 +858,7 @@ mod tests {
|
|||
.detach();
|
||||
|
||||
// Allow the request to make some progress before dropping it.
|
||||
cx.background().simulate_random_delay().await;
|
||||
cx.executor().simulate_random_delay().await;
|
||||
drop(request1_task);
|
||||
|
||||
request2_task.await;
|
||||
|
@ -872,15 +872,15 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
#[gpui::test(iterations = 50)]
|
||||
#[gpui2::test(iterations = 50)]
|
||||
async fn test_disconnect(cx: &mut TestAppContext) {
|
||||
let executor = cx.foreground();
|
||||
let executor = cx.executor();
|
||||
|
||||
let (client_conn, mut server_conn, _kill) = Connection::in_memory(cx.background());
|
||||
let (client_conn, mut server_conn, _kill) = Connection::in_memory(executor.clone());
|
||||
|
||||
let client = Peer::new(0);
|
||||
let (connection_id, io_handler, mut incoming) =
|
||||
client.add_test_connection(client_conn, cx.background());
|
||||
client.add_test_connection(client_conn, executor.clone());
|
||||
|
||||
let (io_ended_tx, io_ended_rx) = oneshot::channel();
|
||||
executor
|
||||
|
@ -908,14 +908,14 @@ mod tests {
|
|||
.is_err());
|
||||
}
|
||||
|
||||
#[gpui::test(iterations = 50)]
|
||||
#[gpui2::test(iterations = 50)]
|
||||
async fn test_io_error(cx: &mut TestAppContext) {
|
||||
let executor = cx.foreground();
|
||||
let (client_conn, mut server_conn, _kill) = Connection::in_memory(cx.background());
|
||||
let executor = cx.executor();
|
||||
let (client_conn, mut server_conn, _kill) = Connection::in_memory(executor.clone());
|
||||
|
||||
let client = Peer::new(0);
|
||||
let (connection_id, io_handler, mut incoming) =
|
||||
client.add_test_connection(client_conn, cx.background());
|
||||
client.add_test_connection(client_conn, executor.clone());
|
||||
executor.spawn(io_handler).detach();
|
||||
executor
|
||||
.spawn(async move { incoming.next().await })
|
||||
|
|
|
@ -616,7 +616,7 @@ pub fn split_worktree_update(
|
|||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[gpui::test]
|
||||
#[gpui2::test]
|
||||
async fn test_buffer_size() {
|
||||
let (tx, rx) = futures::channel::mpsc::unbounded();
|
||||
let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
|
||||
|
@ -648,7 +648,7 @@ mod tests {
|
|||
assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
|
||||
}
|
||||
|
||||
#[gpui::test]
|
||||
#[gpui2::test]
|
||||
fn test_converting_peer_id_from_and_to_u64() {
|
||||
let peer_id = PeerId {
|
||||
owner_id: 10,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue