Rework networking code and serialization

Tests aren't passing yet, but I need to wind down for the night.

Decide to try out `serde_bare`.

From GPT: `serde_bare` is a Rust library that provides a fast and efficient Serializer and
Deserializer for the "BARE" (Basic Ad-hoc Runtime Encoding) data format. This
format focuses on being simple, small, fast and working well with anonymous
types, making it useful for sending small ad-hoc messages between systems.

To type messages on the wire, I'm wrapping them in "envelope" enums. These envelopes
then implement an unwrap method that returns a Box<dyn Any>, and we require messages
to be Into their envelope type. It's some boilerplate, but I ultimately like leaning
on Rust more than an external schema, which adds complexity.

I also reworked network abstraction to be just in terms of bytes. Typed handlers
are moved into network-neutral code. It's still broken, but hopefully the direction
is clear.

Heads up: I turned on the `backtrace` feature for `anyhow`.
This commit is contained in:
Nathan Sobo 2023-07-18 23:40:17 -06:00
parent 8deafe90fc
commit afb0329914
7 changed files with 249 additions and 161 deletions

View file

@ -11,13 +11,14 @@ doctest = false
rope = { path = "../rope" }
sum_tree = { path = "../sum_tree" }
collections = { path = "../collections" }
util = { path = "../util" }
anyhow.workspace = true
futures.workspace = true
lazy_static.workspace = true
parking_lot.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_bare = "0.5"
smallvec.workspace = true
uuid = { version = "1.3", features = ["v4", "fast-rng", "serde"] }

View file

@ -6,15 +6,17 @@ mod sync;
mod test;
use anyhow::{anyhow, Result};
use collections::{hash_map, HashMap};
use collections::{btree_map, BTreeMap, HashMap};
use dense_id::DenseId;
use futures::future::BoxFuture;
use operations::{CreateBranch, Operation};
use parking_lot::Mutex;
use futures::{future::BoxFuture, FutureExt};
use messages::{MessageEnvelope, Operation, RequestEnvelope};
use operations::CreateBranch;
use parking_lot::{Mutex, RwLock};
use rope::Rope;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use std::{
any::{Any, TypeId},
cmp::{self, Ordering},
fmt::{self, Debug, Display},
future::Future,
@ -23,6 +25,7 @@ use std::{
sync::Arc,
};
use sum_tree::{Bias, SumTree, TreeMap};
use util::ResultExt;
use uuid::Uuid;
#[derive(
@ -90,12 +93,12 @@ pub struct User {
login: Arc<str>,
}
pub trait Request: Message {
pub trait Request: Message + Into<RequestEnvelope> {
type Response: Message;
}
pub trait Message: 'static + Send {
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Vec<u8>>
fn from_bytes(bytes: Vec<u8>) -> Result<Self>
where
Self: Sized;
fn to_bytes(&self) -> Vec<u8>;
@ -105,22 +108,20 @@ impl<T> Message for T
where
T: 'static + Send + Serialize + for<'a> Deserialize<'a>,
{
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Vec<u8>> {
serde_json::from_slice(&bytes).map_err(|_| bytes)
fn from_bytes(bytes: Vec<u8>) -> Result<Self> {
Ok(serde_bare::from_slice(&bytes)?)
}
fn to_bytes(&self) -> Vec<u8> {
serde_json::to_vec(self).unwrap()
serde_bare::to_vec(self).unwrap()
}
}
pub trait ServerNetwork: 'static + Send + Sync {
fn on_request<H, F, R>(&self, handle_request: H)
fn handle_requests<H, F>(&self, handle_request: H)
where
H: 'static + Send + Fn(User, R) -> F,
F: 'static + Send + futures::Future<Output = Result<R::Response>>,
R: Request;
H: 'static + Send + Fn(User, Vec<u8>) -> Result<F>,
F: 'static + Send + futures::Future<Output = Result<Vec<u8>>>;
fn create_room(&self, room: &RoomName) -> BoxFuture<Result<()>>;
fn grant_room_access(&self, room: &RoomName, user: &str) -> RoomToken;
}
@ -128,23 +129,68 @@ pub trait ServerNetwork: 'static + Send + Sync {
pub trait ClientNetwork: 'static + Send + Sync {
type Room: ClientRoom;
fn request<R: Request>(&self, request: R) -> BoxFuture<Result<R::Response>>;
fn request(&self, request: Vec<u8>) -> BoxFuture<Result<Vec<u8>>>;
fn room(&self, credentials: RoomCredentials) -> Self::Room;
}
pub trait ClientRoom: 'static + Send + Sync {
fn connect(&mut self) -> BoxFuture<Result<()>>;
fn broadcast<M: Message>(&self, message: M);
fn on_message<M, F>(&self, handle_message: F)
where
M: Message,
F: 'static + Send + Fn(M);
fn broadcast(&self, message: Vec<u8>);
fn handle_messages(&self, handle_message: impl 'static + Send + Fn(Vec<u8>));
}
struct Client<N: ClientNetwork> {
db: Db,
network: Arc<N>,
repo_rooms: Arc<Mutex<collections::HashMap<RepoId, N::Room>>>,
repo_rooms: Arc<Mutex<HashMap<RepoId, RepoRoom<N>>>>,
}
struct RepoRoom<N: ClientNetwork> {
network_room: N::Room,
message_handlers:
Arc<RwLock<HashMap<TypeId, Box<dyn Send + Sync + Fn(Client<N>, RepoId, Box<dyn Any>)>>>>,
}
impl<N: ClientNetwork> RepoRoom<N> {
fn new(client: Client<N>, repo_id: RepoId, network_room: N::Room) -> Self {
let this = Self {
network_room,
message_handlers: Default::default(),
};
{
let handlers = this.message_handlers.clone();
this.network_room.handle_messages(move |message| {
if let Some(envelope) =
serde_bare::from_slice::<MessageEnvelope>(&message).log_err()
{
let request = envelope.unwrap();
if let Some(handler) = handlers.read().get(&request.type_id()) {
handler(client.clone(), repo_id, request);
}
};
});
}
this
}
fn handle_messages<M: Message, H>(&self, handle_message: H)
where
M: Message,
H: 'static + Fn(Client<N>, RepoId, M) + Send + Sync,
{
self.message_handlers.write().insert(
TypeId::of::<M>(),
Box::new(move |client, repo_id, message| {
handle_message(client, repo_id, *message.downcast().unwrap())
}),
);
}
fn broadcast<M: Message>(&self, message: M) {
self.network_room.broadcast(message.to_bytes());
}
}
#[derive(Clone, Serialize, Deserialize)]
@ -201,17 +247,9 @@ impl<N: ClientNetwork> Client<N> {
let id = repo.id;
let name = name.into();
async move {
let response = this
.network
.request(messages::PublishRepo { id, name })
.await?;
let room = this.network.room(response.credentials);
room.on_message({
let this = this.clone();
move |operation: Operation| {
this.handle_remote_operation(id, operation);
}
});
let response = this.request(messages::PublishRepo { id, name }).await?;
let room = RepoRoom::new(this.clone(), id, this.network.room(response.credentials));
room.handle_messages(Self::handle_remote_operation);
this.repo_rooms.lock().insert(id, room);
Ok(())
@ -224,13 +262,29 @@ impl<N: ClientNetwork> Client<N> {
}
}
fn handle_remote_operation(&self, repo_id: RepoId, operation: Operation) {}
fn handle_remote_operation(self, repo_id: RepoId, operation: Operation) {}
fn request<R: Request>(&self, request: R) -> BoxFuture<Result<R::Response>> {
let envelope: RequestEnvelope = request.into();
let response = self.network.request(envelope.to_bytes());
async { Ok(R::Response::from_bytes(response.await?)?) }.boxed()
}
}
struct Server<N> {
db: Db,
network: Arc<N>,
repo_ids_by_name: Arc<Mutex<HashMap<Arc<str>, RepoId>>>,
request_handlers: Arc<
RwLock<
BTreeMap<
TypeId,
Box<
dyn Send + Sync + Fn(User, Box<dyn Any>) -> BoxFuture<'static, Result<Vec<u8>>>,
>,
>,
>,
>,
repo_ids_by_name: Arc<Mutex<BTreeMap<Arc<str>, RepoId>>>,
}
impl<N: ServerNetwork> Clone for Server<N> {
@ -239,31 +293,61 @@ impl<N: ServerNetwork> Clone for Server<N> {
db: self.db.clone(),
network: self.network.clone(),
repo_ids_by_name: Default::default(),
request_handlers: Default::default(),
}
}
}
impl<N: ServerNetwork> Server<N> {
fn new(network: N) -> Self {
let network = Arc::new(network);
let this = Self {
db: Db::new(),
network: Arc::new(network),
network: network.clone(),
repo_ids_by_name: Default::default(),
request_handlers: Default::default(),
};
this.on_request(Self::handle_publish_repo);
this.clone().handle_requests(Self::handle_publish_repo);
let request_handlers = this.request_handlers.clone();
network.handle_requests(move |user, request_bytes| {
let envelope = MessageEnvelope::from_bytes(request_bytes)?;
let request = envelope.unwrap();
let request_handlers = request_handlers.read();
let request_handler = request_handlers
.get(&request.type_id())
.ok_or_else(|| anyhow!("no request handler"))?;
let response = (request_handler)(user, request);
Ok(response)
});
this
}
fn on_request<F, Fut, R>(&self, handle_request: F)
fn handle_requests<F, Fut, R>(self, handle_request: F)
where
F: 'static + Send + Fn(Self, User, R) -> Fut,
F: 'static + Send + Sync + Fn(Self, User, R) -> Fut,
Fut: 'static + Send + Future<Output = Result<R::Response>>,
R: Request,
{
self.network.on_request({
let this = self.clone();
move |user, request| handle_request(this.clone(), user, request)
});
let request_handlers = self.request_handlers.clone();
request_handlers.write().insert(
TypeId::of::<R>(),
Box::new({
let this = self.clone();
move |user, request| {
let request = *request.downcast::<R>().unwrap();
let response = handle_request(this.clone(), user, request);
async move {
let response = response.await;
response.map(|response| response.to_bytes())
}
.boxed()
}
}),
);
}
async fn handle_publish_repo(
@ -273,20 +357,20 @@ impl<N: ServerNetwork> Server<N> {
) -> Result<messages::PublishRepoResponse> {
// TODO: handle repositories that had already been published.
match self.repo_ids_by_name.lock().entry(request.name.clone()) {
hash_map::Entry::Occupied(_) => return Err(anyhow!("repo name taken")),
hash_map::Entry::Vacant(entry) => {
btree_map::Entry::Occupied(_) => return Err(anyhow!("repo name taken")),
btree_map::Entry::Vacant(entry) => {
let mut db = self.db.snapshot.lock();
db.repos.insert(request.id, Default::default());
entry.insert(request.id);
}
}
let room = RoomName(request.id.to_string().into());
self.network.create_room(&room).await?;
let token = self.network.grant_room_access(&room, user.login.as_ref());
let name = RoomName(request.id.to_string().into());
self.network.create_room(&name).await?;
let token = self.network.grant_room_access(&name, user.login.as_ref());
Ok(messages::PublishRepoResponse {
credentials: RoomCredentials { name: room, token },
credentials: RoomCredentials { name, token },
})
}
}

View file

@ -1,8 +1,30 @@
use crate::{RepoId, Request, RoomCredentials};
use crate::{
operations::{CreateBranch, CreateDocument, Edit},
OperationId, RepoId, Request, RevisionId, RoomCredentials,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::{any::Any, sync::Arc};
#[derive(Clone, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum RequestEnvelope {
PublishRepo(PublishRepo),
}
impl RequestEnvelope {
pub fn unwrap(self) -> Box<dyn Any> {
Box::new(match self {
RequestEnvelope::PublishRepo(request) => request,
})
}
}
impl From<Operation> for MessageEnvelope {
fn from(value: Operation) -> Self {
Self::Operation(value)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PublishRepo {
pub id: RepoId,
pub name: Arc<str>,
@ -12,7 +34,51 @@ impl Request for PublishRepo {
type Response = PublishRepoResponse;
}
impl Into<RequestEnvelope> for PublishRepo {
fn into(self) -> RequestEnvelope {
RequestEnvelope::PublishRepo(self)
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct PublishRepoResponse {
pub credentials: RoomCredentials,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum MessageEnvelope {
Operation(Operation),
}
impl MessageEnvelope {
pub fn unwrap(self) -> Box<dyn Any> {
Box::new(match self {
MessageEnvelope::Operation(message) => message,
})
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Operation {
CreateDocument(CreateDocument),
Edit(Edit),
CreateBranch(CreateBranch),
}
impl Operation {
pub fn id(&self) -> OperationId {
match self {
Operation::CreateDocument(op) => op.id,
Operation::Edit(op) => op.id,
Operation::CreateBranch(op) => op.id,
}
}
pub fn parent(&self) -> &RevisionId {
match self {
Operation::CreateDocument(op) => &op.parent,
Operation::Edit(op) => &op.parent,
Operation::CreateBranch(op) => &op.parent,
}
}
}

View file

@ -3,31 +3,6 @@ use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use std::sync::Arc;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Operation {
CreateDocument(CreateDocument),
Edit(Edit),
CreateBranch(CreateBranch),
}
impl Operation {
pub fn id(&self) -> OperationId {
match self {
Operation::CreateDocument(op) => op.id,
Operation::Edit(op) => op.id,
Operation::CreateBranch(op) => op.id,
}
}
pub fn parent(&self) -> &RevisionId {
match self {
Operation::CreateDocument(op) => &op.parent,
Operation::Edit(op) => &op.parent,
Operation::CreateBranch(op) => &op.parent,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CreateBranch {
pub id: OperationId,

View file

@ -1,16 +1,9 @@
use crate::{
ClientNetwork, ClientRoom, Message, RoomCredentials, RoomName, RoomToken, ServerNetwork, User,
};
use crate::{ClientNetwork, ClientRoom, RoomCredentials, RoomName, RoomToken, ServerNetwork, User};
use anyhow::{anyhow, Result};
use collections::HashMap;
use futures::{channel::mpsc, future::BoxFuture, FutureExt, StreamExt};
use gpui::executor::Background;
use parking_lot::Mutex;
use std::{
any::{type_name, Any, TypeId},
collections::BTreeMap,
sync::Arc,
};
use std::{collections::BTreeMap, sync::Arc};
pub struct TestNetwork(Arc<Mutex<NetworkState>>);
@ -18,7 +11,7 @@ impl TestNetwork {
pub fn new(executor: Arc<Background>) -> Self {
Self(Arc::new(Mutex::new(NetworkState {
executor,
request_handlers: Default::default(),
request_handler: None,
rooms: Default::default(),
})))
}
@ -39,10 +32,8 @@ impl TestNetwork {
struct NetworkState {
executor: Arc<Background>,
request_handlers: BTreeMap<
TypeId,
Box<dyn Send + Fn(User, Box<dyn Any>) -> BoxFuture<'static, Result<Box<dyn Any>>>>,
>,
request_handler:
Option<Box<dyn Send + Fn(User, Vec<u8>) -> Result<BoxFuture<'static, Result<Vec<u8>>>>>>,
rooms: BTreeMap<RoomName, Room>,
}
@ -56,27 +47,6 @@ pub struct Room {
pub struct TestServer(Arc<Mutex<NetworkState>>);
impl ServerNetwork for TestServer {
fn on_request<H, F, R>(&self, handle_request: H)
where
H: 'static + Send + Fn(User, R) -> F,
F: 'static + Send + futures::Future<Output = Result<R::Response>>,
R: crate::Request,
{
self.0.lock().request_handlers.insert(
TypeId::of::<R>(),
Box::new(move |user, request| {
let request = request.downcast::<R>().unwrap();
let response = handle_request(user, *request);
async move {
response
.await
.map(|response| Box::new(response) as Box<dyn Any>)
}
.boxed()
}),
);
}
fn create_room(&self, name: &RoomName) -> BoxFuture<Result<()>> {
let network = self.0.clone();
let room = name.clone();
@ -98,6 +68,16 @@ impl ServerNetwork for TestServer {
room.authorized_users.insert(token.clone(), user.into());
token
}
fn handle_requests<H, F>(&self, handle_request: H)
where
H: 'static + Send + Fn(User, Vec<u8>) -> Result<F>,
F: 'static + Send + futures::Future<Output = Result<Vec<u8>>>,
{
self.0.lock().request_handler = Some(Box::new(move |user, request| {
handle_request(user, request.clone()).map(FutureExt::boxed)
}));
}
}
pub struct TestClient {
@ -108,35 +88,17 @@ pub struct TestClient {
impl ClientNetwork for TestClient {
type Room = TestClientRoom;
fn request<R: crate::Request>(
&self,
request: R,
) -> futures::future::BoxFuture<anyhow::Result<R::Response>> {
let network = self.network.lock();
let executor = network.executor.clone();
let request = network
.request_handlers
.get(&TypeId::of::<R>())
.expect(&format!(
"handler for request {} not found",
type_name::<R>()
))(self.user.clone(), Box::new(request));
async move {
executor.simulate_random_delay().await;
let response = request
.await
.map(|response| *response.downcast::<R::Response>().unwrap());
executor.simulate_random_delay().await;
response
}
.boxed()
fn request(&self, request: Vec<u8>) -> BoxFuture<Result<Vec<u8>>> {
let response =
self.network.lock().request_handler.as_ref().unwrap()(self.user.clone(), request);
async move { response?.await }.boxed()
}
fn room(&self, credentials: RoomCredentials) -> Self::Room {
TestClientRoom {
outbox: Default::default(),
credentials,
message_handlers: Default::default(),
message_handler: Default::default(),
network: self.network.clone(),
}
}
@ -145,8 +107,7 @@ impl ClientNetwork for TestClient {
pub struct TestClientRoom {
outbox: Option<mpsc::UnboundedSender<Vec<u8>>>,
credentials: RoomCredentials,
message_handlers:
Arc<Mutex<HashMap<TypeId, Box<dyn Send + Fn(Vec<u8>) -> Result<(), Vec<u8>>>>>>,
message_handler: Arc<Mutex<Option<Box<dyn Send + Fn(Vec<u8>)>>>>,
network: Arc<Mutex<NetworkState>>,
}
@ -182,17 +143,14 @@ impl ClientRoom for TestClientRoom {
"client should not connect twice with the same token"
);
}
let message_handlers = self.message_handlers.clone();
let message_handler = self.message_handler.clone();
self.network
.lock()
.executor
.spawn(async move {
while let Some(mut message) = inbox_rx.next().await {
for handler in message_handlers.lock().values() {
match handler(message) {
Ok(()) => break,
Err(unhandled_message) => message = unhandled_message,
}
while let Some(message) = inbox_rx.next().await {
if let Some(handler) = message_handler.lock().as_ref() {
handler(message);
}
}
})
@ -229,23 +187,14 @@ impl ClientRoom for TestClientRoom {
async { Ok(()) }.boxed()
}
fn broadcast<M: Message>(&self, message: M) {
fn broadcast(&self, message: Vec<u8>) {
let tx = self.outbox.as_ref().expect("must be connected");
tx.unbounded_send(message.to_bytes())
.expect("channel must be open");
tx.unbounded_send(message).expect("channel must be open");
}
fn on_message<M, F>(&self, handle_message: F)
where
M: Message,
F: 'static + Send + Fn(M),
{
self.message_handlers.lock().insert(
TypeId::of::<M>(),
Box::new(move |bytes| {
handle_message(M::from_bytes(bytes)?);
Ok(())
}),
);
fn handle_messages(&self, handle_message: impl 'static + Send + Fn(Vec<u8>)) {
self.message_handler
.lock()
.replace(Box::new(handle_message));
}
}