This commit is contained in:
Antonio Scandurra 2023-07-18 15:11:35 +02:00
parent 6205ac27a5
commit 5267c6d2cb
3 changed files with 85 additions and 28 deletions

View file

@ -4,7 +4,8 @@ mod operations;
#[cfg(test)] #[cfg(test)]
mod test; mod test;
use anyhow::Result; use anyhow::{anyhow, Result};
use collections::{hash_map, HashMap};
use dense_id::DenseId; use dense_id::DenseId;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use operations::{CreateBranch, Operation}; use operations::{CreateBranch, Operation};
@ -74,15 +75,15 @@ pub struct RoomName(Arc<str>);
#[derive(Clone)] #[derive(Clone)]
pub struct RoomToken(Arc<str>); pub struct RoomToken(Arc<str>);
pub trait Request: 'static { pub trait Request: Message {
type Response: 'static; type Response: Message;
} }
pub trait Message: 'static + Send + Sync { pub trait Message: 'static + Send + Sync {
fn to_bytes(&self) -> Vec<u8>; fn to_bytes(&self) -> Vec<u8>;
} }
pub trait ServerNetwork: Send + Sync { pub trait ServerNetwork: 'static + Send + Sync {
fn on_request<H, F, R>(&self, handle_request: H) fn on_request<H, F, R>(&self, handle_request: H)
where where
H: 'static + Send + Sync + Fn(R) -> F, H: 'static + Send + Sync + Fn(R) -> F,
@ -90,9 +91,13 @@ pub trait ServerNetwork: Send + Sync {
R: Request; R: Request;
} }
pub trait ClientNetwork: Send + Sync { pub trait ClientNetwork: 'static + Send + Sync {
fn request<R: Request>(&self, request: R) -> BoxFuture<Result<R::Response>>; fn request<R: Request>(&self, request: R) -> BoxFuture<Result<R::Response>>;
fn broadcast<M: Message>(&self, room: RoomName, token: RoomToken, message: M); fn broadcast<M: Message>(&self, credentials: RoomCredentials, message: M);
fn on_message<M, F>(&self, credentials: RoomCredentials, handle_message: F)
where
M: Message,
F: 'static + Send + Sync + Fn(M);
} }
struct Client<N> { struct Client<N> {
@ -101,7 +106,8 @@ struct Client<N> {
repo_room_credentials: Arc<Mutex<collections::HashMap<RepoId, RoomCredentials>>>, repo_room_credentials: Arc<Mutex<collections::HashMap<RepoId, RoomCredentials>>>,
} }
struct RoomCredentials { #[derive(Clone)]
pub struct RoomCredentials {
name: RoomName, name: RoomName,
token: RoomToken, token: RoomToken,
} }
@ -116,7 +122,7 @@ impl<N: ClientNetwork> Clone for Client<N> {
} }
} }
impl<N: 'static + ClientNetwork> Client<N> { impl<N: ClientNetwork> Client<N> {
pub fn new(network: N) -> Self { pub fn new(network: N) -> Self {
let mut this = Self { let mut this = Self {
db: Db::new(), db: Db::new(),
@ -154,50 +160,77 @@ impl<N: 'static + ClientNetwork> Client<N> {
let id = repo.id; let id = repo.id;
let name = name.into(); let name = name.into();
async move { async move {
this.network let response = this
.network
.request(messages::PublishRepo { id, name }) .request(messages::PublishRepo { id, name })
.await?; .await?;
this.repo_room_credentials
.lock()
.insert(id, response.credentials.clone());
this.network.on_message(response.credentials, {
let this = this.clone();
move |operation: Operation| {
this.handle_remote_operation(id, operation);
}
});
Ok(()) Ok(())
} }
} }
fn handle_local_operation(&self, repo_id: RepoId, operation: Operation) { fn handle_local_operation(&self, repo_id: RepoId, operation: Operation) {
if let Some(credentials) = self.repo_room_credentials.lock().get(&repo_id) { if let Some(credentials) = self.repo_room_credentials.lock().get(&repo_id) {
self.network.broadcast( self.network.broadcast(credentials.clone(), operation);
credentials.name.clone(),
credentials.token.clone(),
operation,
);
} }
} }
fn handle_remote_operation(&self, repo_id: RepoId, operation: Operation) {}
} }
#[derive(Clone)] #[derive(Clone)]
struct Server { struct Server {
db: Db, db: Db,
repo_ids_by_name: Arc<Mutex<HashMap<Arc<str>, RepoId>>>,
} }
impl Server { impl Server {
fn new(network: impl ServerNetwork) -> Self { fn new(network: impl ServerNetwork) -> Self {
let this = Self { db: Db::new() }; let this = Self {
this.on_request(network); db: Db::new(),
repo_ids_by_name: Default::default(),
};
this.on_request(network, Self::handle_publish_repo);
this this
} }
fn on_request(&self, network: impl ServerNetwork) { fn on_request<F, Fut, R, S>(&self, network: S, handle_request: F)
where
F: 'static + Send + Sync + Fn(Self, R) -> Fut,
Fut: 'static + Send + Sync + Future<Output = Result<R::Response>>,
R: Request,
S: ServerNetwork,
{
network.on_request({ network.on_request({
let this = self.clone(); let this = self.clone();
move |request: messages::PublishRepo| { move |request| handle_request(this.clone(), request)
let this = this.clone();
async move { this.handle_publish_repo(request).await }
}
}); });
} }
async fn handle_publish_repo(&self, request: messages::PublishRepo) -> Result<()> { async fn handle_publish_repo(
self,
request: messages::PublishRepo,
) -> Result<messages::PublishRepoResponse> {
// TODO: handle repositories that had already been published.
match self.repo_ids_by_name.lock().entry(request.name) {
hash_map::Entry::Occupied(_) => Err(anyhow!("repo name taken")),
hash_map::Entry::Vacant(entry) => {
let mut db = self.db.snapshot.lock();
db.repos.insert(request.id, Default::default());
entry.insert(request.id);
todo!() todo!()
} }
} }
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct Db { pub struct Db {

View file

@ -1,4 +1,4 @@
use crate::{RepoId, Request}; use crate::{Message, RepoId, Request, RoomCredentials};
use std::sync::Arc; use std::sync::Arc;
pub struct PublishRepo { pub struct PublishRepo {
@ -6,6 +6,22 @@ pub struct PublishRepo {
pub name: Arc<str>, pub name: Arc<str>,
} }
impl Request for PublishRepo { impl Message for PublishRepo {
type Response = (); fn to_bytes(&self) -> Vec<u8> {
todo!()
}
}
impl Request for PublishRepo {
type Response = PublishRepoResponse;
}
pub struct PublishRepoResponse {
pub credentials: RoomCredentials,
}
impl Message for PublishRepoResponse {
fn to_bytes(&self) -> Vec<u8> {
todo!()
}
} }

View file

@ -1,4 +1,4 @@
use crate::{ClientNetwork, Message, RoomName, RoomToken, ServerNetwork}; use crate::{ClientNetwork, Message, RoomCredentials, RoomName, RoomToken, ServerNetwork};
use anyhow::Result; use anyhow::Result;
use futures::{future::BoxFuture, FutureExt}; use futures::{future::BoxFuture, FutureExt};
use parking_lot::Mutex; use parking_lot::Mutex;
@ -83,7 +83,15 @@ impl ClientNetwork for TestClient {
.boxed() .boxed()
} }
fn broadcast<M: Message>(&self, room: RoomName, token: RoomToken, message: M) { fn broadcast<M: Message>(&self, credentials: RoomCredentials, message: M) {
todo!()
}
fn on_message<M, F>(&self, credentials: RoomCredentials, handle_message: F)
where
M: Message,
F: 'static + Send + Sync + Fn(M),
{
todo!() todo!()
} }
} }