Co-Authored-By: Nathan Sobo <nathan@zed.dev>
This commit is contained in:
Antonio Scandurra 2023-06-30 18:03:21 +02:00
parent cdeabcab4e
commit f4d71b2b24
6 changed files with 277 additions and 29 deletions

3
Cargo.lock generated
View file

@ -1734,6 +1734,9 @@ dependencies = [
name = "crdb"
version = "0.1.0"
dependencies = [
"anyhow",
"collections",
"futures 0.3.28",
"gpui",
"lazy_static",
"parking_lot 0.11.2",

View file

@ -10,7 +10,10 @@ doctest = false
[dependencies]
rope = { path = "../rope" }
sum_tree = { path = "../sum_tree" }
collections = { path = "../collections" }
anyhow.workspace = true
futures.workspace = true
lazy_static.workspace = true
parking_lot.workspace = true
smallvec.workspace = true

View file

@ -1,7 +1,12 @@
mod dense_id;
mod messages;
mod operations;
#[cfg(test)]
mod test;
use anyhow::Result;
use dense_id::DenseId;
use futures::future::BoxFuture;
use operations::{CreateBranch, Operation};
use parking_lot::Mutex;
use rope::Rope;
@ -9,6 +14,7 @@ use smallvec::{smallvec, SmallVec};
use std::{
cmp::{self, Ordering},
fmt::Debug,
future::Future,
ops::Range,
path::Path,
sync::Arc,
@ -16,8 +22,8 @@ use std::{
use sum_tree::{Bias, SumTree, TreeMap};
use uuid::Uuid;
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
struct RepoId(Uuid);
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct RepoId(Uuid);
type RevisionId = SmallVec<[OperationId; 2]>;
@ -62,22 +68,146 @@ impl sum_tree::Summary for OperationId {
}
}
#[derive(Clone, Default)]
pub struct Db {
snapshot: Arc<Mutex<DbSnapshot>>,
#[derive(Clone)]
pub struct RoomName(Arc<str>);
#[derive(Clone)]
pub struct RoomToken(Arc<str>);
pub trait Request: 'static {
type Response: 'static;
}
impl Db {
pub trait Message {
fn to_bytes(&self) -> Vec<u8>;
}
pub trait ServerNetwork {
fn on_request<H, F, R>(&self, handle_request: H)
where
H: 'static + Fn(R) -> F,
F: 'static + Send + Sync + futures::Future<Output = Result<R::Response>>,
R: Request;
}
pub trait ClientNetwork {
fn request<R: Request>(&self, request: R) -> BoxFuture<Result<R::Response>>;
fn broadcast<M: Message>(&self, room: RoomName, token: RoomToken, message: M);
}
struct Client<N> {
db: Db,
network: Arc<N>,
repo_room_credentials: Arc<Mutex<collections::HashMap<RepoId, RoomCredentials>>>,
}
struct RoomCredentials {
name: RoomName,
token: RoomToken,
}
impl<N: ClientNetwork> Clone for Client<N> {
fn clone(&self) -> Self {
Self {
db: self.db.clone(),
network: self.network.clone(),
repo_room_credentials: Default::default(),
}
}
}
impl<N: 'static + ClientNetwork> Client<N> {
pub fn new(network: N) -> Self {
let mut this = Self {
db: Db::new(),
network: Arc::new(network),
repo_room_credentials: Default::default(),
};
this.db.on_local_operation({
let this = this.clone();
move |repo_id, operation| this.handle_local_operation(repo_id, operation)
});
this
}
pub fn create_repo(&self) -> Repo {
let id = RepoId::new();
let snapshot = RepoSnapshot::default();
let repo = Repo {
id,
db: self.clone(),
db: self.db.clone(),
};
self.snapshot.lock().repos.insert(id, snapshot);
self.db.snapshot.lock().repos.insert(id, snapshot);
repo
}
pub fn clone_repo(&self, name: impl Into<Arc<str>>) -> impl Future<Output = Result<Repo>> {
async move { todo!() }
}
pub fn publish_repo(
&self,
repo: &Repo,
name: impl Into<Arc<str>>,
) -> impl Future<Output = Result<()>> {
let this = self.clone();
let id = repo.id;
let name = name.into();
async move {
this.network
.request(messages::PublishRepo { id, name })
.await?;
Ok(())
}
}
fn handle_local_operation(&self, repo_id: RepoId, operation: Operation) {
if let Some(credentials) = self.repo_room_credentials.lock().get(&repo_id) {
self.network.broadcast(
credentials.name.clone(),
credentials.token.clone(),
operation,
);
}
}
}
#[derive(Clone)]
struct Server {
db: Db,
}
impl Server {
async fn new(network: impl ServerNetwork) -> Self {
let this = Self { db: Db::new() };
// network.on_request({
// let this = this.clone();
// move |request| {
// let this = this.clone();
// async move { todo!() }
// }
// });
this
}
}
#[derive(Clone)]
pub struct Db {
snapshot: Arc<Mutex<DbSnapshot>>,
local_operation_created: Option<Arc<dyn Fn(RepoId, Operation)>>,
}
impl Db {
fn new() -> Self {
Self {
snapshot: Default::default(),
local_operation_created: None,
}
}
fn on_local_operation(&mut self, operation_created: impl 'static + Fn(RepoId, Operation)) {
self.local_operation_created = Some(Arc::new(operation_created));
}
}
#[derive(Clone)]
@ -106,13 +236,20 @@ impl Repo {
fn update<F, T>(&self, f: F) -> T
where
F: FnOnce(&mut RepoSnapshot) -> T,
F: FnOnce(&mut RepoSnapshot) -> (Operation, T),
{
self.db
.snapshot
.lock()
.repos
.update(&self.id, f)
.update(&self.id, |repo| {
let (operation, result) = f(repo);
repo.operations.insert(operation.id(), operation.clone());
if let Some(operation_created) = self.db.local_operation_created.as_ref() {
operation_created(self.id, operation);
}
result
})
.expect("repo must exist")
}
}
@ -185,8 +322,7 @@ impl Branch {
repo.branches
.update(&self.id, |branch| branch.head = smallvec![operation_id]);
repo.revisions.insert(smallvec![operation_id], revision);
repo.operations.insert(operation.id(), operation);
result
(operation, result)
})
}
@ -701,31 +837,29 @@ impl<'a> RopeBuilder<'a> {
}
}
pub trait OperationSender: Debug {
fn send(&self, operation: Operation);
}
#[derive(Clone, Debug, Default)]
struct RepoSnapshot {
last_operation_id: OperationId,
branches: TreeMap<OperationId, BranchSnapshot>,
operations: TreeMap<OperationId, Operation>,
revisions: TreeMap<RevisionId, Revision>,
name: Option<Arc<str>>,
}
impl RepoSnapshot {
fn new(replica_id: ReplicaId, sender: Arc<dyn OperationSender>) -> Self {
fn new(replica_id: ReplicaId) -> Self {
Self {
last_operation_id: OperationId::new(replica_id),
branches: Default::default(),
operations: Default::default(),
revisions: Default::default(),
name: None,
}
}
fn create_empty_branch(&mut self, name: impl Into<Arc<str>>) -> OperationId {
let branch_id = self.last_operation_id.tick();
fn create_empty_branch(&mut self, name: impl Into<Arc<str>>) -> (Operation, OperationId) {
let name = name.into();
let branch_id = self.last_operation_id.tick();
self.branches.insert(
branch_id,
BranchSnapshot {
@ -740,8 +874,7 @@ impl RepoSnapshot {
name,
parent: Default::default(),
});
self.operations.insert(branch_id, operation.clone());
branch_id
(operation, branch_id)
}
}
@ -763,20 +896,28 @@ struct Revision {
#[cfg(test)]
mod tests {
use super::*;
use crate::test::TestNetwork;
#[test]
fn test_repo() {
let db = Db::default();
let repo = db.create_repo();
let branch = repo.create_empty_branch("main");
#[gpui::test]
async fn test_repo() {
let network = TestNetwork::default();
let server = Server::new(network.server());
let doc1 = branch.create_document();
let client_a = Client::new(network.client());
let repo_a = client_a.create_repo();
let branch_a = repo_a.create_empty_branch("main");
let doc1 = branch_a.create_document();
doc1.edit([(0..0, "abc")]);
let doc2 = branch.create_document();
let doc2 = branch_a.create_document();
doc2.edit([(0..0, "def")]);
assert_eq!(doc1.text().to_string(), "abc");
assert_eq!(doc2.text().to_string(), "def");
client_a.publish_repo(&repo_a, "repo-1").await.unwrap();
let db_b = Client::new(network.client());
let repo_b = db_b.clone_repo("repo-1");
}
}

View file

@ -0,0 +1,11 @@
use crate::{RepoId, Request};
use std::sync::Arc;
pub struct PublishRepo {
pub id: RepoId,
pub name: Arc<str>,
}
impl Request for PublishRepo {
type Response = ();
}

View file

@ -1,4 +1,4 @@
use crate::{AnchorRange, OperationId, RevisionId};
use crate::{AnchorRange, Message, OperationId, RevisionId};
use smallvec::SmallVec;
use std::sync::Arc;
@ -27,6 +27,12 @@ impl Operation {
}
}
impl Message for Operation {
fn to_bytes(&self) -> Vec<u8> {
serde_
}
}
#[derive(Clone, Debug)]
pub struct CreateBranch {
pub id: OperationId,

84
crates/crdb/src/test.rs Normal file
View file

@ -0,0 +1,84 @@
use crate::{ClientNetwork, Message, RoomName, RoomToken, ServerNetwork};
use anyhow::Result;
use futures::{future::BoxFuture, FutureExt};
use parking_lot::Mutex;
use std::{
any::{Any, TypeId},
collections::BTreeMap,
sync::Arc,
};
#[derive(Default)]
pub struct TestNetwork(Arc<Mutex<NetworkState>>);
impl TestNetwork {
pub fn server(&self) -> TestServer {
TestServer(self.0.clone())
}
pub fn client(&self) -> TestClient {
TestClient(self.0.clone())
}
}
#[derive(Default)]
struct NetworkState {
request_handlers:
BTreeMap<TypeId, Box<dyn Fn(Box<dyn Any>) -> BoxFuture<'static, Result<Box<dyn Any>>>>>,
rooms: BTreeMap<RoomName, Room>,
}
pub struct Room {
inboxes: BTreeMap<RoomToken, Vec<Box<dyn Message>>>,
}
pub struct TestServer(Arc<Mutex<NetworkState>>);
impl ServerNetwork for TestServer {
fn on_request<H, F, R>(&self, handle_request: H)
where
H: 'static + Fn(R) -> F,
F: 'static + Send + Sync + futures::Future<Output = Result<R::Response>>,
R: crate::Request,
{
self.0.lock().request_handlers.insert(
TypeId::of::<R>(),
Box::new(move |request| {
let request = request.downcast::<R>().unwrap();
let response = handle_request(*request);
async move {
response
.await
.map(|response| Box::new(response) as Box<dyn Any>)
}
.boxed()
}),
);
}
}
pub struct TestClient(Arc<Mutex<NetworkState>>);
impl ClientNetwork for TestClient {
fn request<R: crate::Request>(
&self,
request: R,
) -> futures::future::BoxFuture<anyhow::Result<R::Response>> {
let request = self
.0
.lock()
.request_handlers
.get(&TypeId::of::<R>())
.unwrap()(Box::new(request));
async move {
request
.await
.map(|response| *response.downcast::<R::Response>().unwrap())
}
.boxed()
}
fn broadcast<M: Message>(&self, room: RoomName, token: RoomToken, message: M) {
todo!()
}
}