Simulate request/responses and room broadcasts with random network delay

Co-Authored-By: Nathan Sobo <nathan@zed.dev>
This commit is contained in:
Antonio Scandurra 2023-07-18 18:33:08 +02:00
parent 5267c6d2cb
commit 00b0189660
7 changed files with 220 additions and 60 deletions

6
Cargo.lock generated
View file

@ -1735,6 +1735,7 @@ name = "crdb"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-broadcast",
"collections", "collections",
"futures 0.3.28", "futures 0.3.28",
"gpui", "gpui",
@ -1742,6 +1743,8 @@ dependencies = [
"parking_lot 0.11.2", "parking_lot 0.11.2",
"rand 0.8.5", "rand 0.8.5",
"rope", "rope",
"serde",
"serde_json",
"smallvec", "smallvec",
"sum_tree", "sum_tree",
"uuid 1.3.2", "uuid 1.3.2",
@ -6492,6 +6495,9 @@ name = "smallvec"
version = "1.10.0" version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "smol" name = "smol"

View file

@ -93,7 +93,7 @@ schemars = { version = "0.8" }
serde = { version = "1.0", features = ["derive", "rc"] } serde = { version = "1.0", features = ["derive", "rc"] }
serde_derive = { version = "1.0", features = ["deserialize_in_place"] } serde_derive = { version = "1.0", features = ["deserialize_in_place"] }
serde_json = { version = "1.0", features = ["preserve_order", "raw_value"] } serde_json = { version = "1.0", features = ["preserve_order", "raw_value"] }
smallvec = { version = "1.6", features = ["union"] } smallvec = { version = "1.6", features = ["serde", "union"] }
smol = { version = "1.2" } smol = { version = "1.2" }
tempdir = { version = "0.3.7" } tempdir = { version = "0.3.7" }
thiserror = { version = "1.0.29" } thiserror = { version = "1.0.29" }

View file

@ -16,9 +16,12 @@ anyhow.workspace = true
futures.workspace = true futures.workspace = true
lazy_static.workspace = true lazy_static.workspace = true
parking_lot.workspace = true parking_lot.workspace = true
serde.workspace = true
serde_json.workspace = true
smallvec.workspace = true smallvec.workspace = true
uuid = { version = "1.3", features = ["v4", "fast-rng"] } uuid = { version = "1.3", features = ["v4", "fast-rng", "serde"] }
[dev-dependencies] [dev-dependencies]
gpui = { path = "../gpui", features = ["test-support"] } gpui = { path = "../gpui", features = ["test-support"] }
async-broadcast = "0.4"
rand.workspace = true rand.workspace = true

View file

@ -11,6 +11,7 @@ use futures::future::BoxFuture;
use operations::{CreateBranch, Operation}; use operations::{CreateBranch, Operation};
use parking_lot::Mutex; use parking_lot::Mutex;
use rope::Rope; use rope::Rope;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec}; use smallvec::{smallvec, SmallVec};
use std::{ use std::{
cmp::{self, Ordering}, cmp::{self, Ordering},
@ -23,7 +24,9 @@ use std::{
use sum_tree::{Bias, SumTree, TreeMap}; use sum_tree::{Bias, SumTree, TreeMap};
use uuid::Uuid; use uuid::Uuid;
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(
Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
)]
pub struct RepoId(Uuid); pub struct RepoId(Uuid);
type RevisionId = SmallVec<[OperationId; 2]>; type RevisionId = SmallVec<[OperationId; 2]>;
@ -34,13 +37,13 @@ impl RepoId {
} }
} }
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] #[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct ReplicaId(u32); pub struct ReplicaId(u32);
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] #[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
struct OperationCount(usize); struct OperationCount(usize);
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] #[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct OperationId { pub struct OperationId {
replica_id: ReplicaId, replica_id: ReplicaId,
operation_count: OperationCount, operation_count: OperationCount,
@ -69,10 +72,10 @@ impl sum_tree::Summary for OperationId {
} }
} }
#[derive(Clone)] #[derive(Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct RoomName(Arc<str>); pub struct RoomName(Arc<str>);
#[derive(Clone)] #[derive(Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct RoomToken(Arc<str>); pub struct RoomToken(Arc<str>);
pub trait Request: Message { pub trait Request: Message {
@ -80,9 +83,25 @@ pub trait Request: Message {
} }
pub trait Message: 'static + Send + Sync { pub trait Message: 'static + Send + Sync {
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Vec<u8>>
where
Self: Sized;
fn to_bytes(&self) -> Vec<u8>; fn to_bytes(&self) -> Vec<u8>;
} }
impl<T> Message for T
where
T: 'static + Send + Sync + Serialize + for<'a> Deserialize<'a>,
{
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Vec<u8>> {
serde_json::from_slice(&bytes).map_err(|_| bytes)
}
fn to_bytes(&self) -> Vec<u8> {
serde_json::to_vec(self).unwrap()
}
}
pub trait ServerNetwork: 'static + Send + Sync { pub trait ServerNetwork: 'static + Send + Sync {
fn on_request<H, F, R>(&self, handle_request: H) fn on_request<H, F, R>(&self, handle_request: H)
where where
@ -92,21 +111,28 @@ pub trait ServerNetwork: 'static + Send + Sync {
} }
pub trait ClientNetwork: 'static + Send + Sync { pub trait ClientNetwork: 'static + Send + Sync {
type Room: ClientRoom;
fn request<R: Request>(&self, request: R) -> BoxFuture<Result<R::Response>>; fn request<R: Request>(&self, request: R) -> BoxFuture<Result<R::Response>>;
fn broadcast<M: Message>(&self, credentials: RoomCredentials, message: M); fn room(&self, credentials: RoomCredentials) -> Self::Room;
fn on_message<M, F>(&self, credentials: RoomCredentials, handle_message: F) }
pub trait ClientRoom: 'static + Send + Sync {
fn connect(&mut self) -> BoxFuture<Result<()>>;
fn broadcast<M: Message>(&self, message: M);
fn on_message<M, F>(&self, handle_message: F)
where where
M: Message, M: Message,
F: 'static + Send + Sync + Fn(M); F: 'static + Send + Sync + Fn(M);
} }
struct Client<N> { struct Client<N: ClientNetwork> {
db: Db, db: Db,
network: Arc<N>, network: Arc<N>,
repo_room_credentials: Arc<Mutex<collections::HashMap<RepoId, RoomCredentials>>>, repo_rooms: Arc<Mutex<collections::HashMap<RepoId, N::Room>>>,
} }
#[derive(Clone)] #[derive(Clone, Serialize, Deserialize)]
pub struct RoomCredentials { pub struct RoomCredentials {
name: RoomName, name: RoomName,
token: RoomToken, token: RoomToken,
@ -117,7 +143,7 @@ impl<N: ClientNetwork> Clone for Client<N> {
Self { Self {
db: self.db.clone(), db: self.db.clone(),
network: self.network.clone(), network: self.network.clone(),
repo_room_credentials: Default::default(), repo_rooms: Default::default(),
} }
} }
} }
@ -127,7 +153,7 @@ impl<N: ClientNetwork> Client<N> {
let mut this = Self { let mut this = Self {
db: Db::new(), db: Db::new(),
network: Arc::new(network), network: Arc::new(network),
repo_room_credentials: Default::default(), repo_rooms: Default::default(),
}; };
this.db.on_local_operation({ this.db.on_local_operation({
let this = this.clone(); let this = this.clone();
@ -164,22 +190,22 @@ impl<N: ClientNetwork> Client<N> {
.network .network
.request(messages::PublishRepo { id, name }) .request(messages::PublishRepo { id, name })
.await?; .await?;
this.repo_room_credentials let room = this.network.room(response.credentials);
.lock() room.on_message({
.insert(id, response.credentials.clone());
this.network.on_message(response.credentials, {
let this = this.clone(); let this = this.clone();
move |operation: Operation| { move |operation: Operation| {
this.handle_remote_operation(id, operation); this.handle_remote_operation(id, operation);
} }
}); });
this.repo_rooms.lock().insert(id, room);
Ok(()) Ok(())
} }
} }
fn handle_local_operation(&self, repo_id: RepoId, operation: Operation) { fn handle_local_operation(&self, repo_id: RepoId, operation: Operation) {
if let Some(credentials) = self.repo_room_credentials.lock().get(&repo_id) { if let Some(room) = self.repo_rooms.lock().get(&repo_id) {
self.network.broadcast(credentials.clone(), operation); room.broadcast(operation);
} }
} }
@ -817,18 +843,46 @@ struct Anchor {
bias: Bias, bias: Bias,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AnchorRange { pub struct AnchorRange {
document_id: OperationId, document_id: OperationId,
revision_id: RevisionId, revision_id: RevisionId,
start_insertion_id: OperationId, start_insertion_id: OperationId,
start_offset_in_insertion: usize, start_offset_in_insertion: usize,
#[serde(with = "bias_serialization")]
start_bias: Bias, start_bias: Bias,
end_insertion_id: OperationId, end_insertion_id: OperationId,
end_offset_in_insertion: usize, end_offset_in_insertion: usize,
#[serde(with = "bias_serialization")]
end_bias: Bias, end_bias: Bias,
} }
mod bias_serialization {
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use sum_tree::Bias;
pub fn serialize<S>(field: &Bias, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match field {
Bias::Left => "left".serialize(serializer),
Bias::Right => "right".serialize(serializer),
}
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Bias, D::Error>
where
D: Deserializer<'de>,
{
match String::deserialize(deserializer)?.as_str() {
"left" => Ok(Bias::Left),
"right" => Ok(Bias::Right),
_ => Err(serde::de::Error::custom("invalid bias")),
}
}
}
struct RopeBuilder<'a> { struct RopeBuilder<'a> {
old_visible_cursor: rope::Cursor<'a>, old_visible_cursor: rope::Cursor<'a>,
old_hidden_cursor: rope::Cursor<'a>, old_hidden_cursor: rope::Cursor<'a>,
@ -939,12 +993,14 @@ struct Revision {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use gpui::executor::Deterministic;
use super::*; use super::*;
use crate::test::TestNetwork; use crate::test::TestNetwork;
#[gpui::test] #[gpui::test]
async fn test_repo() { async fn test_repo(deterministic: Arc<Deterministic>) {
let network = TestNetwork::default(); let network = TestNetwork::new(deterministic.build_background());
let server = Server::new(network.server()); let server = Server::new(network.server());
let client_a = Client::new(network.client()); let client_a = Client::new(network.client());

View file

@ -1,27 +1,18 @@
use crate::{Message, RepoId, Request, RoomCredentials}; use crate::{Message, RepoId, Request, RoomCredentials};
use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
#[derive(Clone, Serialize, Deserialize)]
pub struct PublishRepo { pub struct PublishRepo {
pub id: RepoId, pub id: RepoId,
pub name: Arc<str>, pub name: Arc<str>,
} }
impl Message for PublishRepo {
fn to_bytes(&self) -> Vec<u8> {
todo!()
}
}
impl Request for PublishRepo { impl Request for PublishRepo {
type Response = PublishRepoResponse; type Response = PublishRepoResponse;
} }
#[derive(Clone, Serialize, Deserialize)]
pub struct PublishRepoResponse { pub struct PublishRepoResponse {
pub credentials: RoomCredentials, pub credentials: RoomCredentials,
} }
impl Message for PublishRepoResponse {
fn to_bytes(&self) -> Vec<u8> {
todo!()
}
}

View file

@ -1,8 +1,9 @@
use crate::{AnchorRange, Message, OperationId, RevisionId}; use crate::{AnchorRange, Message, OperationId, RevisionId};
use serde::{Deserialize, Serialize};
use smallvec::SmallVec; use smallvec::SmallVec;
use std::sync::Arc; use std::sync::Arc;
#[derive(Clone, Debug)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Operation { pub enum Operation {
CreateDocument(CreateDocument), CreateDocument(CreateDocument),
Edit(Edit), Edit(Edit),
@ -27,26 +28,20 @@ impl Operation {
} }
} }
impl Message for Operation { #[derive(Clone, Debug, Serialize, Deserialize)]
fn to_bytes(&self) -> Vec<u8> {
todo!()
}
}
#[derive(Clone, Debug)]
pub struct CreateBranch { pub struct CreateBranch {
pub id: OperationId, pub id: OperationId,
pub parent: RevisionId, pub parent: RevisionId,
pub name: Arc<str>, pub name: Arc<str>,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CreateDocument { pub struct CreateDocument {
pub id: OperationId, pub id: OperationId,
pub parent: RevisionId, pub parent: RevisionId,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Edit { pub struct Edit {
pub id: OperationId, pub id: OperationId,
pub parent: RevisionId, pub parent: RevisionId,

View file

@ -1,6 +1,10 @@
use crate::{ClientNetwork, Message, RoomCredentials, RoomName, RoomToken, ServerNetwork}; use crate::{
ClientNetwork, ClientRoom, Message, RoomCredentials, RoomName, RoomToken, ServerNetwork,
};
use anyhow::Result; use anyhow::Result;
use futures::{future::BoxFuture, FutureExt}; use collections::HashMap;
use futures::{channel::mpsc, future::BoxFuture, FutureExt, StreamExt};
use gpui::executor::Background;
use parking_lot::Mutex; use parking_lot::Mutex;
use std::{ use std::{
any::{type_name, Any, TypeId}, any::{type_name, Any, TypeId},
@ -8,10 +12,17 @@ use std::{
sync::Arc, sync::Arc,
}; };
#[derive(Default)]
pub struct TestNetwork(Arc<Mutex<NetworkState>>); pub struct TestNetwork(Arc<Mutex<NetworkState>>);
impl TestNetwork { impl TestNetwork {
pub fn new(executor: Arc<Background>) -> Self {
Self(Arc::new(Mutex::new(NetworkState {
executor,
request_handlers: Default::default(),
rooms: Default::default(),
})))
}
pub fn server(&self) -> TestServer { pub fn server(&self) -> TestServer {
TestServer(self.0.clone()) TestServer(self.0.clone())
} }
@ -21,8 +32,8 @@ impl TestNetwork {
} }
} }
#[derive(Default)]
struct NetworkState { struct NetworkState {
executor: Arc<Background>,
request_handlers: BTreeMap< request_handlers: BTreeMap<
TypeId, TypeId,
Box<dyn Send + Fn(Box<dyn Any>) -> BoxFuture<'static, Result<Box<dyn Any>>>>, Box<dyn Send + Fn(Box<dyn Any>) -> BoxFuture<'static, Result<Box<dyn Any>>>>,
@ -31,7 +42,7 @@ struct NetworkState {
} }
pub struct Room { pub struct Room {
inboxes: BTreeMap<RoomToken, Vec<Box<dyn Message>>>, inboxes: BTreeMap<RoomToken, mpsc::UnboundedSender<Vec<u8>>>,
} }
pub struct TestServer(Arc<Mutex<NetworkState>>); pub struct TestServer(Arc<Mutex<NetworkState>>);
@ -62,13 +73,15 @@ impl ServerNetwork for TestServer {
pub struct TestClient(Arc<Mutex<NetworkState>>); pub struct TestClient(Arc<Mutex<NetworkState>>);
impl ClientNetwork for TestClient { impl ClientNetwork for TestClient {
type Room = TestClientRoom;
fn request<R: crate::Request>( fn request<R: crate::Request>(
&self, &self,
request: R, request: R,
) -> futures::future::BoxFuture<anyhow::Result<R::Response>> { ) -> futures::future::BoxFuture<anyhow::Result<R::Response>> {
let request = self let network = self.0.lock();
.0 let executor = network.executor.clone();
.lock() let request = network
.request_handlers .request_handlers
.get(&TypeId::of::<R>()) .get(&TypeId::of::<R>())
.expect(&format!( .expect(&format!(
@ -76,22 +89,118 @@ impl ClientNetwork for TestClient {
type_name::<R>() type_name::<R>()
))(Box::new(request)); ))(Box::new(request));
async move { async move {
request executor.simulate_random_delay().await;
let response = request
.await .await
.map(|response| *response.downcast::<R::Response>().unwrap()) .map(|response| *response.downcast::<R::Response>().unwrap());
executor.simulate_random_delay().await;
response
} }
.boxed() .boxed()
} }
fn broadcast<M: Message>(&self, credentials: RoomCredentials, message: M) { fn room(&self, credentials: RoomCredentials) -> Self::Room {
todo!() TestClientRoom {
outbox: Default::default(),
credentials,
message_handlers: Default::default(),
network: self.0.clone(),
}
}
}
pub struct TestClientRoom {
outbox: Option<mpsc::UnboundedSender<Vec<u8>>>,
credentials: RoomCredentials,
message_handlers:
Arc<Mutex<HashMap<TypeId, Box<dyn Send + Sync + Fn(Vec<u8>) -> Result<(), Vec<u8>>>>>>,
network: Arc<Mutex<NetworkState>>,
}
impl ClientRoom for TestClientRoom {
fn connect(&mut self) -> BoxFuture<Result<()>> {
assert!(
self.outbox.is_none(),
"client should not connect more than once"
);
let (inbox_tx, mut inbox_rx) = mpsc::unbounded();
let existing_inbox = self
.network
.lock()
.rooms
.get_mut(&self.credentials.name)
.expect("room should exist")
.inboxes
.insert(self.credentials.token.clone(), inbox_tx);
assert!(
existing_inbox.is_none(),
"client should not connect twice with the same token"
);
let message_handlers = self.message_handlers.clone();
self.network
.lock()
.executor
.spawn(async move {
while let Some(mut message) = inbox_rx.next().await {
for handler in message_handlers.lock().values() {
match handler(message) {
Ok(()) => break,
Err(unhandled_message) => message = unhandled_message,
}
}
}
})
.detach();
// Send outbound messages to other clients in the room.
let (outbox_tx, mut outbox_rx) = mpsc::unbounded();
self.outbox = Some(outbox_tx);
let executor = self.network.lock().executor.clone();
let network = self.network.clone();
let credentials = self.credentials.clone();
self.network
.lock()
.executor
.spawn(async move {
while let Some(message) = outbox_rx.next().await {
let inboxes = network
.lock()
.rooms
.get(&credentials.name)
.map(|room| room.inboxes.clone());
if let Some(inboxes) = inboxes {
for (inbox_token, inbox) in inboxes {
executor.simulate_random_delay().await;
if inbox_token != credentials.token {
let _ = inbox.unbounded_send(message.clone());
}
}
}
}
})
.detach();
async { Ok(()) }.boxed()
} }
fn on_message<M, F>(&self, credentials: RoomCredentials, handle_message: F) fn broadcast<M: Message>(&self, message: M) {
let tx = self.outbox.as_ref().expect("must be connected");
tx.unbounded_send(message.to_bytes())
.expect("channel must be open");
}
fn on_message<M, F>(&self, handle_message: F)
where where
M: Message, M: Message,
F: 'static + Send + Sync + Fn(M), F: 'static + Send + Sync + Fn(M),
{ {
todo!() self.message_handlers.lock().insert(
TypeId::of::<M>(),
Box::new(move |bytes| {
handle_message(M::from_bytes(bytes)?);
Ok(())
}),
);
} }
} }