This commit is contained in:
Antonio Scandurra 2023-07-18 19:10:20 +02:00
parent 00b0189660
commit be7d4d6ea9
4 changed files with 84 additions and 40 deletions

View file

@ -15,7 +15,7 @@ use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use std::{
cmp::{self, Ordering},
fmt::Debug,
fmt::{self, Debug, Display},
future::Future,
ops::Range,
path::Path,
@ -29,14 +29,20 @@ use uuid::Uuid;
)]
pub struct RepoId(Uuid);
type RevisionId = SmallVec<[OperationId; 2]>;
impl RepoId {
fn new() -> Self {
RepoId(Uuid::new_v4())
}
}
impl Display for RepoId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0.as_hyphenated())
}
}
type RevisionId = SmallVec<[OperationId; 2]>;
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct ReplicaId(u32);
@ -78,6 +84,11 @@ pub struct RoomName(Arc<str>);
#[derive(Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct RoomToken(Arc<str>);
#[derive(Clone)]
pub struct User {
login: Arc<str>,
}
pub trait Request: Message {
type Response: Message;
}
@ -105,9 +116,12 @@ where
pub trait ServerNetwork: 'static + Send + Sync {
fn on_request<H, F, R>(&self, handle_request: H)
where
H: 'static + Send + Sync + Fn(R) -> F,
F: 'static + Send + Sync + futures::Future<Output = Result<R::Response>>,
H: 'static + Send + Fn(User, R) -> F,
F: 'static + Send + futures::Future<Output = Result<R::Response>>,
R: Request;
fn create_room(&self, room: &RoomName) -> BoxFuture<Result<()>>;
fn grant_room_access(&self, room: &RoomName, user: &str) -> RoomToken;
}
pub trait ClientNetwork: 'static + Send + Sync {
@ -212,49 +226,67 @@ impl<N: ClientNetwork> Client<N> {
fn handle_remote_operation(&self, repo_id: RepoId, operation: Operation) {}
}
#[derive(Clone)]
struct Server {
struct Server<N> {
db: Db,
network: Arc<N>,
repo_ids_by_name: Arc<Mutex<HashMap<Arc<str>, RepoId>>>,
}
impl Server {
fn new(network: impl ServerNetwork) -> Self {
impl<N: ServerNetwork> Clone for Server<N> {
fn clone(&self) -> Self {
Self {
db: self.db.clone(),
network: self.network.clone(),
repo_ids_by_name: Default::default(),
}
}
}
impl<N: ServerNetwork> Server<N> {
fn new(network: N) -> Self {
let this = Self {
db: Db::new(),
network: Arc::new(network),
repo_ids_by_name: Default::default(),
};
this.on_request(network, Self::handle_publish_repo);
this.on_request(Self::handle_publish_repo);
this
}
fn on_request<F, Fut, R, S>(&self, network: S, handle_request: F)
fn on_request<F, Fut, R>(&self, handle_request: F)
where
F: 'static + Send + Sync + Fn(Self, R) -> Fut,
Fut: 'static + Send + Sync + Future<Output = Result<R::Response>>,
F: 'static + Send + Fn(Self, User, R) -> Fut,
Fut: 'static + Send + Future<Output = Result<R::Response>>,
R: Request,
S: ServerNetwork,
{
network.on_request({
self.network.on_request({
let this = self.clone();
move |request| handle_request(this.clone(), request)
move |user, request| handle_request(this.clone(), user, request)
});
}
async fn handle_publish_repo(
self,
user: User,
request: messages::PublishRepo,
) -> Result<messages::PublishRepoResponse> {
// TODO: handle repositories that had already been published.
match self.repo_ids_by_name.lock().entry(request.name) {
hash_map::Entry::Occupied(_) => Err(anyhow!("repo name taken")),
match self.repo_ids_by_name.lock().entry(request.name.clone()) {
hash_map::Entry::Occupied(_) => return Err(anyhow!("repo name taken")),
hash_map::Entry::Vacant(entry) => {
let mut db = self.db.snapshot.lock();
db.repos.insert(request.id, Default::default());
entry.insert(request.id);
todo!()
}
}
let room = RoomName(request.id.to_string().into());
self.network.create_room(&room).await?;
let token = self.network.grant_room_access(&room, user.login.as_ref());
Ok(messages::PublishRepoResponse {
credentials: RoomCredentials { name: room, token },
})
}
}
@ -941,17 +973,13 @@ struct RepoSnapshot {
branches: TreeMap<OperationId, BranchSnapshot>,
operations: TreeMap<OperationId, Operation>,
revisions: TreeMap<RevisionId, Revision>,
name: Option<Arc<str>>,
}
impl RepoSnapshot {
fn new(replica_id: ReplicaId) -> Self {
Self {
last_operation_id: OperationId::new(replica_id),
branches: Default::default(),
operations: Default::default(),
revisions: Default::default(),
name: None,
..Default::default()
}
}
@ -1003,7 +1031,7 @@ mod tests {
let network = TestNetwork::new(deterministic.build_background());
let server = Server::new(network.server());
let client_a = Client::new(network.client());
let client_a = Client::new(network.client("client-a"));
let repo_a = client_a.create_repo();
let branch_a = repo_a.create_empty_branch("main");
@ -1017,7 +1045,7 @@ mod tests {
assert_eq!(doc2.text().to_string(), "def");
client_a.publish_repo(&repo_a, "repo-1").await.unwrap();
let db_b = Client::new(network.client());
let db_b = Client::new(network.client("client-b"));
let repo_b = db_b.clone_repo("repo-1").await.unwrap();
}
}

View file

@ -1,4 +1,4 @@
use crate::{Message, RepoId, Request, RoomCredentials};
use crate::{RepoId, Request, RoomCredentials};
use serde::{Deserialize, Serialize};
use std::sync::Arc;

View file

@ -1,4 +1,4 @@
use crate::{AnchorRange, Message, OperationId, RevisionId};
use crate::{AnchorRange, OperationId, RevisionId};
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use std::sync::Arc;

View file

@ -1,5 +1,5 @@
use crate::{
ClientNetwork, ClientRoom, Message, RoomCredentials, RoomName, RoomToken, ServerNetwork,
ClientNetwork, ClientRoom, Message, RoomCredentials, RoomName, RoomToken, ServerNetwork, User,
};
use anyhow::Result;
use collections::HashMap;
@ -27,8 +27,13 @@ impl TestNetwork {
TestServer(self.0.clone())
}
pub fn client(&self) -> TestClient {
TestClient(self.0.clone())
pub fn client(&self, login: impl Into<Arc<str>>) -> TestClient {
TestClient {
user: User {
login: login.into(),
},
network: self.0.clone(),
}
}
}
@ -36,7 +41,7 @@ struct NetworkState {
executor: Arc<Background>,
request_handlers: BTreeMap<
TypeId,
Box<dyn Send + Fn(Box<dyn Any>) -> BoxFuture<'static, Result<Box<dyn Any>>>>,
Box<dyn Send + Fn(User, Box<dyn Any>) -> BoxFuture<'static, Result<Box<dyn Any>>>>,
>,
rooms: BTreeMap<RoomName, Room>,
}
@ -50,15 +55,15 @@ pub struct TestServer(Arc<Mutex<NetworkState>>);
impl ServerNetwork for TestServer {
fn on_request<H, F, R>(&self, handle_request: H)
where
H: 'static + Send + Sync + Fn(R) -> F,
F: 'static + Send + Sync + futures::Future<Output = Result<R::Response>>,
H: 'static + Send + Fn(User, R) -> F,
F: 'static + Send + futures::Future<Output = Result<R::Response>>,
R: crate::Request,
{
self.0.lock().request_handlers.insert(
TypeId::of::<R>(),
Box::new(move |request| {
Box::new(move |user, request| {
let request = request.downcast::<R>().unwrap();
let response = handle_request(*request);
let response = handle_request(user, *request);
async move {
response
.await
@ -68,9 +73,20 @@ impl ServerNetwork for TestServer {
}),
);
}
fn create_room(&self, room: &RoomName) -> BoxFuture<Result<()>> {
todo!()
}
fn grant_room_access(&self, room: &RoomName, user: &str) -> RoomToken {
todo!()
}
}
pub struct TestClient(Arc<Mutex<NetworkState>>);
pub struct TestClient {
user: User,
network: Arc<Mutex<NetworkState>>,
}
impl ClientNetwork for TestClient {
type Room = TestClientRoom;
@ -79,7 +95,7 @@ impl ClientNetwork for TestClient {
&self,
request: R,
) -> futures::future::BoxFuture<anyhow::Result<R::Response>> {
let network = self.0.lock();
let network = self.network.lock();
let executor = network.executor.clone();
let request = network
.request_handlers
@ -87,7 +103,7 @@ impl ClientNetwork for TestClient {
.expect(&format!(
"handler for request {} not found",
type_name::<R>()
))(Box::new(request));
))(self.user.clone(), Box::new(request));
async move {
executor.simulate_random_delay().await;
let response = request
@ -104,7 +120,7 @@ impl ClientNetwork for TestClient {
outbox: Default::default(),
credentials,
message_handlers: Default::default(),
network: self.0.clone(),
network: self.network.clone(),
}
}
}