Implement clone_repo and start handling synchronization requests

This commit is contained in:
Antonio Scandurra 2023-07-19 16:04:44 +02:00
parent e6b7bbee25
commit 27b06c1d09
3 changed files with 215 additions and 15 deletions

View file

@ -6,7 +6,7 @@ mod sync;
mod test;
use anyhow::{anyhow, Result};
use collections::{btree_map, BTreeMap, HashMap};
use collections::{btree_map, BTreeMap, Bound, HashMap};
use dense_id::DenseId;
use futures::{future::BoxFuture, FutureExt};
use messages::{MessageEnvelope, Operation, RequestEnvelope};
@ -51,12 +51,12 @@ type RevisionId = SmallVec<[OperationId; 2]>;
pub struct ReplicaId(u32);
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
struct OperationCount(usize);
pub struct OperationCount(usize);
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct OperationId {
replica_id: ReplicaId,
operation_count: OperationCount,
pub replica_id: ReplicaId,
pub operation_count: OperationCount,
}
impl OperationId {
@ -204,7 +204,7 @@ impl<N: ClientNetwork> Clone for Client<N> {
Self {
db: self.db.clone(),
network: self.network.clone(),
repo_rooms: Default::default(),
repo_rooms: self.repo_rooms.clone(),
}
}
}
@ -235,7 +235,29 @@ impl<N: ClientNetwork> Client<N> {
}
pub fn clone_repo(&self, name: impl Into<Arc<str>>) -> impl Future<Output = Result<Repo>> {
async move { todo!() }
let this = self.clone();
let name = name.into();
async move {
let response = this.request(messages::CloneRepo { name }).await?;
let repo_id = response.repo_id;
let room = RepoRoom::new(
this.clone(),
repo_id,
this.network.room(response.credentials),
);
room.handle_messages(Self::handle_remote_operation);
this.repo_rooms.lock().insert(repo_id, room);
this.db
.snapshot
.lock()
.repos
.insert(repo_id, Default::default());
Ok(Repo {
id: repo_id,
db: this.db.clone(),
})
}
}
pub fn publish_repo(
@ -262,7 +284,18 @@ impl<N: ClientNetwork> Client<N> {
}
}
fn handle_remote_operation(self, repo_id: RepoId, operation: Operation) {}
fn handle_remote_operation(self, repo_id: RepoId, operation: Operation) {
let update = self
.db
.snapshot
.lock()
.repos
.update(&repo_id, |repo| repo.apply_operation(operation));
assert!(
update.is_some(),
"received an operation for an unknown repo"
);
}
fn request<R: Request>(&self, request: R) -> BoxFuture<Result<R::Response>> {
let envelope: RequestEnvelope = request.into();
@ -285,6 +318,7 @@ struct Server<N> {
>,
>,
repo_ids_by_name: Arc<Mutex<BTreeMap<Arc<str>, RepoId>>>,
next_replica_ids_by_repo_id: Arc<Mutex<BTreeMap<RepoId, ReplicaId>>>,
}
impl<N: ServerNetwork> Clone for Server<N> {
@ -292,8 +326,9 @@ impl<N: ServerNetwork> Clone for Server<N> {
Self {
db: self.db.clone(),
network: self.network.clone(),
repo_ids_by_name: Default::default(),
request_handlers: Default::default(),
repo_ids_by_name: self.repo_ids_by_name.clone(),
request_handlers: self.request_handlers.clone(),
next_replica_ids_by_repo_id: self.next_replica_ids_by_repo_id.clone(),
}
}
}
@ -304,11 +339,14 @@ impl<N: ServerNetwork> Server<N> {
let this = Self {
db: Db::new(),
network: network.clone(),
repo_ids_by_name: Default::default(),
request_handlers: Default::default(),
repo_ids_by_name: Default::default(),
next_replica_ids_by_repo_id: Default::default(),
};
this.handle_requests(Self::handle_publish_repo);
this.handle_requests(Self::handle_clone_repo);
this.handle_requests(Self::handle_sync_repo);
let request_handlers = this.request_handlers.clone();
network.handle_requests(move |user, request_bytes| {
@ -364,6 +402,9 @@ impl<N: ServerNetwork> Server<N> {
entry.insert(request.id);
}
}
self.next_replica_ids_by_repo_id
.lock()
.insert(request.id, ReplicaId(1));
let name = RoomName(request.id.to_string().into());
self.network.create_room(&name).await?;
@ -373,6 +414,76 @@ impl<N: ServerNetwork> Server<N> {
credentials: RoomCredentials { name, token },
})
}
async fn handle_clone_repo(
self,
user: User,
request: messages::CloneRepo,
) -> Result<messages::CloneRepoResponse> {
let repo_id = *self
.repo_ids_by_name
.lock()
.get(&request.name)
.ok_or_else(|| anyhow!("repo not found"))?;
let name = RoomName(repo_id.to_string().into());
let token = self.network.grant_room_access(&name, user.login.as_ref());
let replica_id = {
let mut next_replica_ids = self.next_replica_ids_by_repo_id.lock();
let next_replica_id = next_replica_ids.get_mut(&repo_id).unwrap();
let replica_id = *next_replica_id;
next_replica_id.0 += 1;
replica_id
};
Ok(messages::CloneRepoResponse {
repo_id,
replica_id,
credentials: RoomCredentials { name, token },
})
}
async fn handle_sync_repo(
self,
_user: User,
request: messages::SyncRepo,
) -> Result<messages::SyncRepoResponse> {
let repo = self
.db
.snapshot
.lock()
.repos
.get(&request.id)
.ok_or_else(|| anyhow!("repo not found"))?
.clone();
let mut response = messages::SyncRepoResponse {
operations: Default::default(),
};
for (replica_id, end_op_count) in repo.max_operation_ids.iter() {
let end_op = OperationId {
replica_id: *replica_id,
operation_count: *end_op_count,
};
if let Some(start_op_count) = request.max_operation_ids.get(&replica_id) {
let start_op = OperationId {
replica_id: *replica_id,
operation_count: *start_op_count,
};
response.operations.extend(
repo.operations
.range((Bound::Excluded(&start_op), Bound::Included(&end_op)))
.map(|(_, op)| op.clone()),
);
} else {
let start_op = OperationId::new(*replica_id);
response.operations.extend(
repo.operations
.range((Bound::Included(&start_op), Bound::Included(&end_op)))
.map(|(_, op)| op.clone()),
);
}
}
Ok(response)
}
}
#[derive(Clone)]
@ -432,6 +543,12 @@ impl Repo {
.update(&self.id, |repo| {
let (operation, result) = f(repo);
repo.operations.insert(operation.id(), operation.clone());
let replica_id = operation.id().replica_id;
let count = operation.id().operation_count;
if repo.max_operation_ids.get(&replica_id).copied() < Some(count) {
repo.max_operation_ids.insert(replica_id, count);
}
if let Some(operation_created) = self.db.local_operation_created.as_ref() {
operation_created(self.id, operation);
}
@ -1058,6 +1175,7 @@ struct RepoSnapshot {
branches: TreeMap<OperationId, BranchSnapshot>,
operations: TreeMap<OperationId, Operation>,
revisions: TreeMap<RevisionId, Revision>,
max_operation_ids: TreeMap<ReplicaId, OperationCount>,
}
impl RepoSnapshot {
@ -1087,6 +1205,10 @@ impl RepoSnapshot {
});
(operation, branch_id)
}
fn apply_operation(&mut self, operation: Operation) {
todo!()
}
}
#[derive(Clone, Debug)]

View file

@ -1,20 +1,25 @@
use crate::{
operations::{CreateBranch, CreateDocument, Edit},
OperationId, RepoId, Request, RevisionId, RoomCredentials,
OperationCount, OperationId, ReplicaId, RepoId, Request, RevisionId, RoomCredentials,
};
use collections::BTreeMap;
use serde::{Deserialize, Serialize};
use std::{any::Any, sync::Arc};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum RequestEnvelope {
PublishRepo(PublishRepo),
CloneRepo(CloneRepo),
SyncRepo(SyncRepo),
}
impl RequestEnvelope {
pub fn unwrap(self) -> Box<dyn Any> {
Box::new(match self {
RequestEnvelope::PublishRepo(request) => request,
})
match self {
RequestEnvelope::PublishRepo(request) => Box::new(request),
RequestEnvelope::CloneRepo(request) => Box::new(request),
RequestEnvelope::SyncRepo(request) => Box::new(request),
}
}
}
@ -45,6 +50,49 @@ pub struct PublishRepoResponse {
pub credentials: RoomCredentials,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CloneRepo {
pub name: Arc<str>,
}
impl Request for CloneRepo {
type Response = CloneRepoResponse;
}
impl Into<RequestEnvelope> for CloneRepo {
fn into(self) -> RequestEnvelope {
RequestEnvelope::CloneRepo(self)
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct CloneRepoResponse {
pub repo_id: RepoId,
pub replica_id: ReplicaId,
pub credentials: RoomCredentials,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SyncRepo {
pub id: RepoId,
pub max_operation_ids: BTreeMap<ReplicaId, OperationCount>,
}
impl Request for SyncRepo {
type Response = SyncRepoResponse;
}
impl Into<RequestEnvelope> for SyncRepo {
fn into(self) -> RequestEnvelope {
RequestEnvelope::SyncRepo(self)
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct SyncRepoResponse {
pub operations: Vec<Operation>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum MessageEnvelope {
Operation(Operation),

View file

@ -1,4 +1,8 @@
use std::{cmp::Ordering, fmt::Debug};
use std::{
cmp::Ordering,
fmt::Debug,
ops::{Bound, RangeBounds},
};
use crate::{Bias, Dimension, Edit, Item, KeyedItem, SeekTarget, SumTree, Summary};
@ -93,6 +97,32 @@ impl<K: Clone + Debug + Default + Ord, V: Clone + Debug> TreeMap<K, V> {
cursor.item().map(|item| (&item.key, &item.value))
}
pub fn range<'a, R>(&self, range: R) -> impl Iterator<Item = (&K, &V)>
where
K: 'a,
R: RangeBounds<&'a K>,
{
let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
match range.start_bound() {
Bound::Included(start) => {
let start = MapKeyRef(Some(*start));
cursor.seek(&start, Bias::Left, &());
}
Bound::Excluded(start) => {
let start = MapKeyRef(Some(*start));
cursor.seek(&start, Bias::Right, &());
}
Bound::Unbounded => cursor.next(&()),
}
cursor
.map(|entry| (&entry.key, &entry.value))
.take_while(move |(key, _)| match range.end_bound() {
Bound::Included(end) => key <= end,
Bound::Excluded(end) => key < end,
Bound::Unbounded => true,
})
}
pub fn iter_from<'a>(&'a self, from: &'a K) -> impl Iterator<Item = (&K, &V)> + '_ {
let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
let from_key = MapKeyRef(Some(from));