Sketch in a bi-directional sync (not yet tested)

This commit is contained in:
Antonio Scandurra 2023-07-19 17:41:29 +02:00 committed by Nathan Sobo
parent 27b06c1d09
commit 9e03e9d6df
3 changed files with 231 additions and 69 deletions

View file

@ -8,7 +8,7 @@ mod test;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use collections::{btree_map, BTreeMap, Bound, HashMap}; use collections::{btree_map, BTreeMap, Bound, HashMap};
use dense_id::DenseId; use dense_id::DenseId;
use futures::{future::BoxFuture, FutureExt}; use futures::{channel::mpsc, future::BoxFuture, FutureExt, StreamExt};
use messages::{MessageEnvelope, Operation, RequestEnvelope}; use messages::{MessageEnvelope, Operation, RequestEnvelope};
use operations::CreateBranch; use operations::CreateBranch;
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
@ -28,6 +28,8 @@ use sum_tree::{Bias, SumTree, TreeMap};
use util::ResultExt; use util::ResultExt;
use uuid::Uuid; use uuid::Uuid;
const CHUNK_SIZE: usize = 64;
#[derive( #[derive(
Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
)] )]
@ -139,46 +141,88 @@ pub trait ClientRoom: 'static + Send + Sync {
fn handle_messages(&self, handle_message: impl 'static + Send + Fn(Vec<u8>)); fn handle_messages(&self, handle_message: impl 'static + Send + Fn(Vec<u8>));
} }
struct Client<N: ClientNetwork> { pub trait Executor: 'static + Send + Sync {
fn spawn<F>(&self, future: F)
where
F: 'static + Send + Future<Output = ()>;
}
struct Client<E, N: ClientNetwork> {
db: Db, db: Db,
network: Arc<N>, network: Arc<N>,
repo_rooms: Arc<Mutex<HashMap<RepoId, RepoRoom<N>>>>, checkouts: Arc<Mutex<HashMap<RepoId, Checkout<E, N>>>>,
executor: Arc<E>,
} }
struct RepoRoom<N: ClientNetwork> { struct Checkout<E, N: ClientNetwork> {
network_room: N::Room, repo: Repo,
network_room: Arc<N::Room>,
operations_tx: mpsc::UnboundedSender<Operation>,
message_handlers: message_handlers:
Arc<RwLock<HashMap<TypeId, Box<dyn Send + Sync + Fn(Client<N>, RepoId, Box<dyn Any>)>>>>, Arc<RwLock<HashMap<TypeId, Box<dyn Send + Sync + Fn(Client<E, N>, RepoId, Box<dyn Any>)>>>>,
} }
impl<N: ClientNetwork> RepoRoom<N> { impl<E, N: ClientNetwork> Clone for Checkout<E, N> {
fn new(client: Client<N>, repo_id: RepoId, network_room: N::Room) -> Self { fn clone(&self) -> Self {
Self {
repo: self.repo.clone(),
network_room: self.network_room.clone(),
operations_tx: self.operations_tx.clone(),
message_handlers: self.message_handlers.clone(),
}
}
}
impl<E: Executor, N: ClientNetwork> Checkout<E, N> {
fn new(client: Client<E, N>, repo: Repo, network_room: N::Room) -> Self {
let (operations_tx, operations_rx) = mpsc::unbounded();
let this = Self { let this = Self {
network_room, repo: repo.clone(),
network_room: Arc::new(network_room),
operations_tx,
message_handlers: Default::default(), message_handlers: Default::default(),
}; };
{ {
let handlers = this.message_handlers.clone(); let handlers = this.message_handlers.clone();
let client = client.clone();
this.network_room.handle_messages(move |message| { this.network_room.handle_messages(move |message| {
if let Some(envelope) = if let Some(envelope) =
serde_bare::from_slice::<MessageEnvelope>(&message).log_err() serde_bare::from_slice::<MessageEnvelope>(&message).log_err()
{ {
let message = envelope.unwrap(); let message = envelope.unwrap();
if let Some(handler) = handlers.read().get(&message.as_ref().type_id()) { if let Some(handler) = handlers.read().get(&message.as_ref().type_id()) {
handler(client.clone(), repo_id, message); handler(client.clone(), repo.id, message);
} }
}; };
}); });
} }
client.executor.spawn({
let this = this.clone();
let client = client.clone();
async move {
this.sync(&client).await.expect("network is infallible");
let mut operations_rx = operations_rx.ready_chunks(CHUNK_SIZE);
while let Some(operations) = operations_rx.next().await {
client
.request(messages::PublishOperations {
repo_id: this.repo.id,
operations,
})
.await
.expect("network is infallible");
}
}
});
this this
} }
fn handle_messages<M: Message, H>(&self, handle_message: H) fn handle_messages<M: Message, H>(&self, handle_message: H)
where where
M: Message, M: Message,
H: 'static + Fn(Client<N>, RepoId, M) + Send + Sync, H: 'static + Fn(Client<E, N>, RepoId, M) + Send + Sync,
{ {
self.message_handlers.write().insert( self.message_handlers.write().insert(
TypeId::of::<M>(), TypeId::of::<M>(),
@ -188,9 +232,38 @@ impl<N: ClientNetwork> RepoRoom<N> {
); );
} }
fn broadcast<M: Message>(&self, message: M) { fn broadcast<M: Message>(&self, message: &M) {
self.network_room.broadcast(message.to_bytes()); self.network_room.broadcast(message.to_bytes());
} }
fn broadcast_operation(&self, operation: Operation) {
self.broadcast(&operation);
self.operations_tx.unbounded_send(operation).unwrap();
}
async fn sync(&self, client: &Client<E, N>) -> Result<()> {
let response = client
.request(messages::SyncRepo {
id: self.repo.id,
max_operation_ids: self.repo.read(|repo| (&repo.max_operation_ids).into()),
})
.await?;
let operations = self
.repo
.operations_since(&(&response.max_operation_ids).into());
for chunk in operations.chunks(CHUNK_SIZE) {
client
.request(messages::PublishOperations {
repo_id: self.repo.id,
operations: chunk.to_vec(),
})
.await?;
}
Ok(())
}
} }
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
@ -199,22 +272,24 @@ pub struct RoomCredentials {
token: RoomToken, token: RoomToken,
} }
impl<N: ClientNetwork> Clone for Client<N> { impl<E, N: ClientNetwork> Clone for Client<E, N> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
db: self.db.clone(), db: self.db.clone(),
network: self.network.clone(), network: self.network.clone(),
repo_rooms: self.repo_rooms.clone(), checkouts: self.checkouts.clone(),
executor: self.executor.clone(),
} }
} }
} }
impl<N: ClientNetwork> Client<N> { impl<E: Executor, N: ClientNetwork> Client<E, N> {
pub fn new(network: N) -> Self { pub fn new(executor: E, network: N) -> Self {
let mut this = Self { let mut this = Self {
db: Db::new(), db: Db::new(),
network: Arc::new(network), network: Arc::new(network),
repo_rooms: Default::default(), checkouts: Default::default(),
executor: Arc::new(executor),
}; };
this.db.on_local_operation({ this.db.on_local_operation({
let this = this.clone(); let this = this.clone();
@ -240,23 +315,25 @@ impl<N: ClientNetwork> Client<N> {
async move { async move {
let response = this.request(messages::CloneRepo { name }).await?; let response = this.request(messages::CloneRepo { name }).await?;
let repo_id = response.repo_id; let repo_id = response.repo_id;
let room = RepoRoom::new( let repo = Repo {
this.clone(), id: repo_id,
repo_id, db: this.db.clone(),
this.network.room(response.credentials), };
);
room.handle_messages(Self::handle_remote_operation);
this.repo_rooms.lock().insert(repo_id, room);
this.db this.db
.snapshot .snapshot
.lock() .lock()
.repos .repos
.insert(repo_id, Default::default()); .insert(repo_id, Default::default());
Ok(Repo { let checkout = Checkout::new(
id: repo_id, this.clone(),
db: this.db.clone(), repo.clone(),
}) this.network.room(response.credentials),
);
checkout.handle_messages(Self::handle_remote_operation);
this.checkouts.lock().insert(repo_id, checkout);
Ok(repo)
} }
} }
@ -266,21 +343,27 @@ impl<N: ClientNetwork> Client<N> {
name: impl Into<Arc<str>>, name: impl Into<Arc<str>>,
) -> impl Future<Output = Result<()>> { ) -> impl Future<Output = Result<()>> {
let this = self.clone(); let this = self.clone();
let id = repo.id;
let name = name.into(); let name = name.into();
let repo = repo.clone();
async move { async move {
let response = this.request(messages::PublishRepo { id, name }).await?; let response = this
let room = RepoRoom::new(this.clone(), id, this.network.room(response.credentials)); .request(messages::PublishRepo { id: repo.id, name })
room.handle_messages(Self::handle_remote_operation); .await?;
this.repo_rooms.lock().insert(id, room); let checkout = Checkout::new(
this.clone(),
repo.clone(),
this.network.room(response.credentials),
);
checkout.handle_messages(Self::handle_remote_operation);
this.checkouts.lock().insert(repo.id, checkout);
Ok(()) Ok(())
} }
} }
fn handle_local_operation(&self, repo_id: RepoId, operation: Operation) { fn handle_local_operation(&self, repo_id: RepoId, operation: Operation) {
if let Some(room) = self.repo_rooms.lock().get(&repo_id) { if let Some(checkout) = self.checkouts.lock().get(&repo_id) {
room.broadcast(operation); checkout.broadcast_operation(operation);
} }
} }
@ -454,35 +537,11 @@ impl<N: ServerNetwork> Server<N> {
.get(&request.id) .get(&request.id)
.ok_or_else(|| anyhow!("repo not found"))? .ok_or_else(|| anyhow!("repo not found"))?
.clone(); .clone();
let mut response = messages::SyncRepoResponse {
operations: Default::default(),
};
for (replica_id, end_op_count) in repo.max_operation_ids.iter() {
let end_op = OperationId {
replica_id: *replica_id,
operation_count: *end_op_count,
};
if let Some(start_op_count) = request.max_operation_ids.get(&replica_id) {
let start_op = OperationId {
replica_id: *replica_id,
operation_count: *start_op_count,
};
response.operations.extend(
repo.operations
.range((Bound::Excluded(&start_op), Bound::Included(&end_op)))
.map(|(_, op)| op.clone()),
);
} else {
let start_op = OperationId::new(*replica_id);
response.operations.extend(
repo.operations
.range((Bound::Included(&start_op), Bound::Included(&end_op)))
.map(|(_, op)| op.clone()),
);
}
}
Ok(response) Ok(messages::SyncRepoResponse {
operations: repo.operations_since(&(&request.max_operation_ids).into()),
max_operation_ids: (&repo.max_operation_ids).into(),
})
} }
} }
@ -506,6 +565,17 @@ impl Db {
) { ) {
self.local_operation_created = Some(Arc::new(operation_created)); self.local_operation_created = Some(Arc::new(operation_created));
} }
fn repo(&self, id: RepoId) -> Option<Repo> {
self.snapshot
.lock()
.repos
.contains_key(&id)
.then_some(Repo {
id,
db: self.clone(),
})
}
} }
#[derive(Clone)] #[derive(Clone)]
@ -523,6 +593,10 @@ impl Repo {
} }
} }
fn operations_since(&self, version: &TreeMap<ReplicaId, OperationCount>) -> Vec<Operation> {
self.read(|repo| repo.operations_since(version))
}
fn read<F, T>(&self, f: F) -> T fn read<F, T>(&self, f: F) -> T
where where
F: FnOnce(&RepoSnapshot) -> T, F: FnOnce(&RepoSnapshot) -> T,
@ -549,8 +623,8 @@ impl Repo {
repo.max_operation_ids.insert(replica_id, count); repo.max_operation_ids.insert(replica_id, count);
} }
if let Some(operation_created) = self.db.local_operation_created.as_ref() { if let Some(local_operation_created) = self.db.local_operation_created.as_ref() {
operation_created(self.id, operation); local_operation_created(self.id, operation);
} }
result result
}) })
@ -1209,6 +1283,35 @@ impl RepoSnapshot {
fn apply_operation(&mut self, operation: Operation) { fn apply_operation(&mut self, operation: Operation) {
todo!() todo!()
} }
fn operations_since(&self, version: &TreeMap<ReplicaId, OperationCount>) -> Vec<Operation> {
let mut new_operations = Vec::new();
for (replica_id, end_op_count) in self.max_operation_ids.iter() {
let end_op = OperationId {
replica_id: *replica_id,
operation_count: *end_op_count,
};
if let Some(start_op_count) = version.get(&replica_id) {
let start_op = OperationId {
replica_id: *replica_id,
operation_count: *start_op_count,
};
new_operations.extend(
self.operations
.range((Bound::Excluded(&start_op), Bound::Included(&end_op)))
.map(|(_, op)| op.clone()),
);
} else {
let start_op = OperationId::new(*replica_id);
new_operations.extend(
self.operations
.range((Bound::Included(&start_op), Bound::Included(&end_op)))
.map(|(_, op)| op.clone()),
);
}
}
new_operations
}
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -1228,7 +1331,7 @@ struct Revision {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use gpui::executor::Deterministic; use gpui::executor::{Background, Deterministic};
use super::*; use super::*;
use crate::test::TestNetwork; use crate::test::TestNetwork;
@ -1238,7 +1341,7 @@ mod tests {
let network = TestNetwork::new(deterministic.build_background()); let network = TestNetwork::new(deterministic.build_background());
let server = Server::new(network.server()); let server = Server::new(network.server());
let client_a = Client::new(network.client("client-a")); let client_a = Client::new(deterministic.build_background(), network.client("client-a"));
let repo_a = client_a.create_repo(); let repo_a = client_a.create_repo();
let branch_a = repo_a.create_empty_branch("main"); let branch_a = repo_a.create_empty_branch("main");
@ -1252,7 +1355,16 @@ mod tests {
assert_eq!(doc2.text().to_string(), "def"); assert_eq!(doc2.text().to_string(), "def");
client_a.publish_repo(&repo_a, "repo-1").await.unwrap(); client_a.publish_repo(&repo_a, "repo-1").await.unwrap();
let db_b = Client::new(network.client("client-b")); let db_b = Client::new(deterministic.build_background(), network.client("client-b"));
let repo_b = db_b.clone_repo("repo-1").await.unwrap(); let repo_b = db_b.clone_repo("repo-1").await.unwrap();
} }
impl Executor for Arc<Background> {
fn spawn<F>(&self, future: F)
where
F: 'static + Send + Future<Output = ()>,
{
todo!()
}
}
} }

View file

@ -11,6 +11,7 @@ pub enum RequestEnvelope {
PublishRepo(PublishRepo), PublishRepo(PublishRepo),
CloneRepo(CloneRepo), CloneRepo(CloneRepo),
SyncRepo(SyncRepo), SyncRepo(SyncRepo),
PublishOperations(PublishOperations),
} }
impl RequestEnvelope { impl RequestEnvelope {
@ -19,6 +20,7 @@ impl RequestEnvelope {
RequestEnvelope::PublishRepo(request) => Box::new(request), RequestEnvelope::PublishRepo(request) => Box::new(request),
RequestEnvelope::CloneRepo(request) => Box::new(request), RequestEnvelope::CloneRepo(request) => Box::new(request),
RequestEnvelope::SyncRepo(request) => Box::new(request), RequestEnvelope::SyncRepo(request) => Box::new(request),
RequestEnvelope::PublishOperations(request) => Box::new(request),
} }
} }
} }
@ -91,6 +93,23 @@ impl Into<RequestEnvelope> for SyncRepo {
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
pub struct SyncRepoResponse { pub struct SyncRepoResponse {
pub operations: Vec<Operation>, pub operations: Vec<Operation>,
pub max_operation_ids: BTreeMap<ReplicaId, OperationCount>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PublishOperations {
pub repo_id: RepoId,
pub operations: Vec<Operation>,
}
impl Request for PublishOperations {
type Response = ();
}
impl Into<RequestEnvelope> for PublishOperations {
fn into(self) -> RequestEnvelope {
RequestEnvelope::PublishOperations(self)
}
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]

View file

@ -1,5 +1,6 @@
use std::{ use std::{
cmp::Ordering, cmp::Ordering,
collections::BTreeMap,
fmt::Debug, fmt::Debug,
ops::{Bound, RangeBounds}, ops::{Bound, RangeBounds},
}; };
@ -29,7 +30,11 @@ pub struct TreeSet<K>(TreeMap<K, ()>)
where where
K: Clone + Debug + Default + Ord; K: Clone + Debug + Default + Ord;
impl<K: Clone + Debug + Default + Ord, V: Clone + Debug> TreeMap<K, V> { impl<K, V> TreeMap<K, V>
where
K: Clone + Debug + Default + Ord,
V: Clone + Debug,
{
pub fn from_ordered_entries(entries: impl IntoIterator<Item = (K, V)>) -> Self { pub fn from_ordered_entries(entries: impl IntoIterator<Item = (K, V)>) -> Self {
let tree = SumTree::from_iter( let tree = SumTree::from_iter(
entries entries
@ -58,6 +63,10 @@ impl<K: Clone + Debug + Default + Ord, V: Clone + Debug> TreeMap<K, V> {
} }
} }
pub fn contains_key<'a>(&self, key: &'a K) -> bool {
self.get(key).is_some()
}
pub fn insert(&mut self, key: K, value: V) { pub fn insert(&mut self, key: K, value: V) {
self.0.insert_or_replace(MapEntry { key, value }, &()); self.0.insert_or_replace(MapEntry { key, value }, &());
} }
@ -192,6 +201,28 @@ impl<K: Clone + Debug + Default + Ord, V: Clone + Debug> TreeMap<K, V> {
} }
} }
impl<K, V> Into<BTreeMap<K, V>> for &TreeMap<K, V>
where
K: Clone + Debug + Default + Ord,
V: Clone + Debug,
{
fn into(self) -> BTreeMap<K, V> {
self.iter()
.map(|(replica_id, count)| (replica_id.clone(), count.clone()))
.collect()
}
}
impl<K, V> From<&BTreeMap<K, V>> for TreeMap<K, V>
where
K: Clone + Debug + Default + Ord,
V: Clone + Debug,
{
fn from(value: &BTreeMap<K, V>) -> Self {
TreeMap::from_ordered_entries(value.into_iter().map(|(k, v)| (k.clone(), v.clone())))
}
}
#[derive(Debug)] #[derive(Debug)]
struct MapSeekTargetAdaptor<'a, T>(&'a T); struct MapSeekTargetAdaptor<'a, T>(&'a T);