From ff5035ea3761026deadc595e483fd1bd8057230c Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Mon, 21 Aug 2023 12:00:43 -0700 Subject: [PATCH] Start work on storing channel buffers --- Cargo.lock | 2 + Cargo.toml | 1 + crates/collab/Cargo.toml | 2 + .../20221109000000_test_schema.sql | 27 +- .../20230819154600_add_channel_buffers.sql | 25 ++ crates/collab/src/db.rs | 2 + crates/collab/src/db/ids.rs | 1 + crates/collab/src/db/queries.rs | 4 + crates/collab/src/db/queries/buffer_tests.rs | 41 +++ crates/collab/src/db/queries/buffers.rs | 271 ++++++++++++++++++ crates/collab/src/db/tables.rs | 3 + crates/collab/src/db/tables/buffer.rs | 32 +++ .../collab/src/db/tables/buffer_operation.rs | 37 +++ .../collab/src/db/tables/buffer_snapshot.rs | 30 ++ crates/collab/src/db/test_db.rs | 8 +- crates/rpc/Cargo.toml | 2 +- 16 files changed, 484 insertions(+), 4 deletions(-) create mode 100644 crates/collab/migrations/20230819154600_add_channel_buffers.sql create mode 100644 crates/collab/src/db/queries/buffer_tests.rs create mode 100644 crates/collab/src/db/queries/buffers.rs create mode 100644 crates/collab/src/db/tables/buffer.rs create mode 100644 crates/collab/src/db/tables/buffer_operation.rs create mode 100644 crates/collab/src/db/tables/buffer_snapshot.rs diff --git a/Cargo.lock b/Cargo.lock index 101a495b6e..b10d8730fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1444,6 +1444,7 @@ dependencies = [ "pretty_assertions", "project", "prometheus", + "prost 0.8.0", "rand 0.8.5", "reqwest", "rpc", @@ -1456,6 +1457,7 @@ dependencies = [ "settings", "sha-1 0.9.8", "sqlx", + "text", "theme", "time 0.3.24", "tokio", diff --git a/Cargo.toml b/Cargo.toml index cd15a72366..a35b3eea23 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,6 +96,7 @@ log = { version = "0.4.16", features = ["kv_unstable_serde"] } ordered-float = { version = "2.1.1" } parking_lot = { version = "0.11.1" } postage = { version = "0.5", features = ["futures-traits"] } +prost = { version = "0.8" } rand = { version = "0.8.5" } refineable = { path = "./crates/refineable" } regex = { version = "1.5" } diff --git a/crates/collab/Cargo.toml b/crates/collab/Cargo.toml index b8d0c26960..49d17bdc63 100644 --- a/crates/collab/Cargo.toml +++ b/crates/collab/Cargo.toml @@ -16,6 +16,7 @@ required-features = ["seed-support"] [dependencies] collections = { path = "../collections" } live_kit_server = { path = "../live_kit_server" } +text = { path = "../text" } rpc = { path = "../rpc" } util = { path = "../util" } @@ -35,6 +36,7 @@ log.workspace = true nanoid = "0.4" parking_lot.workspace = true prometheus = "0.13" +prost.workspace = true rand.workspace = true reqwest = { version = "0.11", features = ["json"], optional = true } scrypt = "0.7" diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql index 3dceaecef4..1e4663a6f6 100644 --- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql +++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql @@ -189,7 +189,8 @@ CREATE INDEX "index_followers_on_room_id" ON "followers" ("room_id"); CREATE TABLE "channels" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, "name" VARCHAR NOT NULL, - "created_at" TIMESTAMP NOT NULL DEFAULT now + "created_at" TIMESTAMP NOT NULL DEFAULT now, + "main_buffer_id" INTEGER REFERENCES buffers (id) ); CREATE TABLE "channel_paths" ( @@ -208,3 +209,27 @@ CREATE TABLE "channel_members" ( ); CREATE UNIQUE INDEX "index_channel_members_on_channel_id_and_user_id" ON "channel_members" ("channel_id", "user_id"); + +CREATE TABLE "buffers" ( + "id" INTEGER PRIMARY KEY AUTOINCREMENT, + "epoch" INTEGER NOT NULL DEFAULT 0 +); + +CREATE TABLE "buffer_operations" ( + "buffer_id" INTEGER NOT NULL REFERENCES buffers (id) ON DELETE CASCADE, + "epoch" INTEGER NOT NULL, + "replica_id" INTEGER NOT NULL, + "lamport_timestamp" INTEGER NOT NULL, + "local_timestamp" INTEGER NOT NULL, + "version" BLOB NOT NULL, + "is_undo" BOOLEAN NOT NULL, + "value" BLOB NOT NULL, + PRIMARY KEY(buffer_id, epoch, lamport_timestamp, replica_id) +); + +CREATE TABLE "buffer_snapshots" ( + "buffer_id" INTEGER NOT NULL REFERENCES buffers (id) ON DELETE CASCADE, + "epoch" INTEGER NOT NULL, + "text" TEXT NOT NULL, + PRIMARY KEY(buffer_id, epoch) +); diff --git a/crates/collab/migrations/20230819154600_add_channel_buffers.sql b/crates/collab/migrations/20230819154600_add_channel_buffers.sql new file mode 100644 index 0000000000..a4d936fd74 --- /dev/null +++ b/crates/collab/migrations/20230819154600_add_channel_buffers.sql @@ -0,0 +1,25 @@ +CREATE TABLE "buffers" ( + "id" SERIAL PRIMARY KEY, + "epoch" INTEGER NOT NULL DEFAULT 0 +); + +CREATE TABLE "buffer_operations" ( + "buffer_id" INTEGER NOT NULL REFERENCES buffers (id) ON DELETE CASCADE, + "epoch" INTEGER NOT NULL, + "replica_id" INTEGER NOT NULL, + "local_timestamp" INTEGER NOT NULL, + "lamport_timestamp" INTEGER NOT NULL, + "version" BYTEA NOT NULL, + "is_undo" BOOLEAN NOT NULL, + "value" BYTEA NOT NULL, + PRIMARY KEY(buffer_id, epoch, lamport_timestamp, replica_id) +); + +CREATE TABLE "buffer_snapshots" ( + "buffer_id" INTEGER NOT NULL REFERENCES buffers (id) ON DELETE CASCADE, + "epoch" INTEGER NOT NULL, + "text" TEXT NOT NULL, + PRIMARY KEY(buffer_id, epoch) +); + +ALTER TABLE "channels" ADD COLUMN "main_buffer_id" INTEGER REFERENCES buffers (id); diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index d322b03589..19915777dc 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -52,6 +52,8 @@ pub struct Database { runtime: Option, } +// The `Database` type has so many methods that its impl blocks are split into +// separate files in the `queries` folder. impl Database { pub async fn new(options: ConnectOptions, executor: Executor) -> Result { Ok(Self { diff --git a/crates/collab/src/db/ids.rs b/crates/collab/src/db/ids.rs index 514c973dad..54f9463cca 100644 --- a/crates/collab/src/db/ids.rs +++ b/crates/collab/src/db/ids.rs @@ -110,6 +110,7 @@ fn value_to_integer(v: Value) -> Result { } } +id_type!(BufferId); id_type!(AccessTokenId); id_type!(ChannelId); id_type!(ChannelMemberId); diff --git a/crates/collab/src/db/queries.rs b/crates/collab/src/db/queries.rs index f67bde30b8..c4a1d57eb4 100644 --- a/crates/collab/src/db/queries.rs +++ b/crates/collab/src/db/queries.rs @@ -1,6 +1,7 @@ use super::*; pub mod access_tokens; +pub mod buffers; pub mod channels; pub mod contacts; pub mod projects; @@ -8,3 +9,6 @@ pub mod rooms; pub mod servers; pub mod signups; pub mod users; + +#[cfg(test)] +pub mod buffer_tests; diff --git a/crates/collab/src/db/queries/buffer_tests.rs b/crates/collab/src/db/queries/buffer_tests.rs new file mode 100644 index 0000000000..f0e78e1fe4 --- /dev/null +++ b/crates/collab/src/db/queries/buffer_tests.rs @@ -0,0 +1,41 @@ +use super::*; +use crate::test_both_dbs; +use language::proto; +use text::Buffer; + +test_both_dbs!(test_buffers, test_buffers_postgres, test_buffers_sqlite); + +async fn test_buffers(db: &Arc) { + let buffer_id = db.create_buffer().await.unwrap(); + + let mut buffer = Buffer::new(0, 0, "".to_string()); + let mut operations = Vec::new(); + operations.push(buffer.edit([(0..0, "hello world")])); + operations.push(buffer.edit([(5..5, ", cruel")])); + operations.push(buffer.edit([(0..5, "goodbye")])); + operations.push(buffer.undo().unwrap().1); + assert_eq!(buffer.text(), "hello, cruel world"); + + let operations = operations + .into_iter() + .map(|op| proto::serialize_operation(&language::Operation::Buffer(op))) + .collect::>(); + + db.update_buffer(buffer_id, &operations).await.unwrap(); + + let buffer_data = db.get_buffer(buffer_id).await.unwrap(); + + let mut buffer_2 = Buffer::new(0, 0, buffer_data.base_text); + buffer_2 + .apply_ops(buffer_data.operations.into_iter().map(|operation| { + let operation = proto::deserialize_operation(operation).unwrap(); + if let language::Operation::Buffer(operation) = operation { + operation + } else { + unreachable!() + } + })) + .unwrap(); + + assert_eq!(buffer_2.text(), "hello, cruel world"); +} diff --git a/crates/collab/src/db/queries/buffers.rs b/crates/collab/src/db/queries/buffers.rs new file mode 100644 index 0000000000..f5ff2e3367 --- /dev/null +++ b/crates/collab/src/db/queries/buffers.rs @@ -0,0 +1,271 @@ +use super::*; +use prost::Message; + +pub struct Buffer { + pub base_text: String, + pub operations: Vec, +} + +impl Database { + pub async fn create_buffer(&self) -> Result { + self.transaction(|tx| async move { + let buffer = buffer::ActiveModel::new().insert(&*tx).await?; + Ok(buffer.id) + }) + .await + } + + pub async fn update_buffer( + &self, + buffer_id: BufferId, + operations: &[proto::Operation], + ) -> Result<()> { + self.transaction(|tx| async move { + let buffer = buffer::Entity::find_by_id(buffer_id) + .one(&*tx) + .await? + .ok_or_else(|| anyhow!("no such buffer"))?; + buffer_operation::Entity::insert_many(operations.iter().filter_map(|operation| { + match operation.variant.as_ref()? { + proto::operation::Variant::Edit(operation) => { + let value = + serialize_edit_operation(&operation.ranges, &operation.new_text); + let version = serialize_version(&operation.version); + Some(buffer_operation::ActiveModel { + buffer_id: ActiveValue::Set(buffer_id), + epoch: ActiveValue::Set(buffer.epoch), + replica_id: ActiveValue::Set(operation.replica_id as i32), + lamport_timestamp: ActiveValue::Set(operation.lamport_timestamp as i32), + local_timestamp: ActiveValue::Set(operation.local_timestamp as i32), + is_undo: ActiveValue::Set(false), + version: ActiveValue::Set(version), + value: ActiveValue::Set(value), + }) + } + proto::operation::Variant::Undo(operation) => { + let value = serialize_undo_operation(&operation.counts); + let version = serialize_version(&operation.version); + Some(buffer_operation::ActiveModel { + buffer_id: ActiveValue::Set(buffer_id), + epoch: ActiveValue::Set(buffer.epoch), + replica_id: ActiveValue::Set(operation.replica_id as i32), + lamport_timestamp: ActiveValue::Set(operation.lamport_timestamp as i32), + local_timestamp: ActiveValue::Set(operation.local_timestamp as i32), + is_undo: ActiveValue::Set(true), + version: ActiveValue::Set(version), + value: ActiveValue::Set(value), + }) + } + proto::operation::Variant::UpdateSelections(_) => None, + proto::operation::Variant::UpdateDiagnostics(_) => None, + proto::operation::Variant::UpdateCompletionTriggers(_) => None, + } + })) + .exec(&*tx) + .await?; + + Ok(()) + }) + .await + } + + pub async fn get_buffer(&self, id: BufferId) -> Result { + self.transaction(|tx| async move { + let buffer = buffer::Entity::find_by_id(id) + .one(&*tx) + .await? + .ok_or_else(|| anyhow!("no such buffer"))?; + + let base_text = if buffer.epoch > 0 { + buffer_snapshot::Entity::find() + .filter( + buffer_snapshot::Column::BufferId + .eq(id) + .and(buffer_snapshot::Column::Epoch.eq(buffer.epoch)), + ) + .one(&*tx) + .await? + .ok_or_else(|| anyhow!("no such snapshot"))? + .text + } else { + String::new() + }; + + let mut rows = buffer_operation::Entity::find() + .filter( + buffer_operation::Column::BufferId + .eq(id) + .and(buffer_operation::Column::Epoch.eq(buffer.epoch)), + ) + .stream(&*tx) + .await?; + let mut operations = Vec::new(); + while let Some(row) = rows.next().await { + let row = row?; + let version = deserialize_version(&row.version)?; + let operation = if row.is_undo { + let counts = deserialize_undo_operation(&row.value)?; + proto::operation::Variant::Undo(proto::operation::Undo { + replica_id: row.replica_id as u32, + local_timestamp: row.local_timestamp as u32, + lamport_timestamp: row.lamport_timestamp as u32, + version, + counts, + }) + } else { + let (ranges, new_text) = deserialize_edit_operation(&row.value)?; + proto::operation::Variant::Edit(proto::operation::Edit { + replica_id: row.replica_id as u32, + local_timestamp: row.local_timestamp as u32, + lamport_timestamp: row.lamport_timestamp as u32, + version, + ranges, + new_text, + }) + }; + operations.push(proto::Operation { + variant: Some(operation), + }) + } + + Ok(Buffer { + base_text, + operations, + }) + }) + .await + } +} + +mod storage { + #![allow(non_snake_case)] + + use prost::Message; + + pub const VERSION: usize = 1; + + #[derive(Message)] + pub struct VectorClock { + #[prost(message, repeated, tag = "1")] + pub entries: Vec, + } + + #[derive(Message)] + pub struct VectorClockEntry { + #[prost(uint32, tag = "1")] + pub replica_id: u32, + #[prost(uint32, tag = "2")] + pub timestamp: u32, + } + + #[derive(Message)] + pub struct TextEdit { + #[prost(message, repeated, tag = "1")] + pub ranges: Vec, + #[prost(string, repeated, tag = "2")] + pub texts: Vec, + } + + #[derive(Message)] + pub struct Range { + #[prost(uint64, tag = "1")] + pub start: u64, + #[prost(uint64, tag = "2")] + pub end: u64, + } + + #[derive(Message)] + pub struct Undo { + #[prost(message, repeated, tag = "1")] + pub entries: Vec, + } + + #[derive(Message)] + pub struct UndoCount { + #[prost(uint32, tag = "1")] + pub replica_id: u32, + #[prost(uint32, tag = "2")] + pub local_timestamp: u32, + #[prost(uint32, tag = "3")] + pub count: u32, + } +} + +fn serialize_version(version: &Vec) -> Vec { + storage::VectorClock { + entries: version + .iter() + .map(|entry| storage::VectorClockEntry { + replica_id: entry.replica_id, + timestamp: entry.timestamp, + }) + .collect(), + } + .encode_to_vec() +} + +fn deserialize_version(bytes: &[u8]) -> Result> { + let clock = storage::VectorClock::decode(bytes).map_err(|error| anyhow!("{}", error))?; + Ok(clock + .entries + .into_iter() + .map(|entry| proto::VectorClockEntry { + replica_id: entry.replica_id, + timestamp: entry.timestamp, + }) + .collect()) +} + +fn serialize_edit_operation(ranges: &[proto::Range], texts: &[String]) -> Vec { + storage::TextEdit { + ranges: ranges + .iter() + .map(|range| storage::Range { + start: range.start, + end: range.end, + }) + .collect(), + texts: texts.to_vec(), + } + .encode_to_vec() +} + +fn deserialize_edit_operation(bytes: &[u8]) -> Result<(Vec, Vec)> { + let edit = storage::TextEdit::decode(bytes).map_err(|error| anyhow!("{}", error))?; + let ranges = edit + .ranges + .into_iter() + .map(|range| proto::Range { + start: range.start, + end: range.end, + }) + .collect(); + Ok((ranges, edit.texts)) +} + +fn serialize_undo_operation(counts: &Vec) -> Vec { + storage::Undo { + entries: counts + .iter() + .map(|entry| storage::UndoCount { + replica_id: entry.replica_id, + local_timestamp: entry.local_timestamp, + count: entry.count, + }) + .collect(), + } + .encode_to_vec() +} + +fn deserialize_undo_operation(bytes: &[u8]) -> Result> { + let undo = storage::Undo::decode(bytes).map_err(|error| anyhow!("{}", error))?; + Ok(undo + .entries + .iter() + .map(|entry| proto::UndoCount { + replica_id: entry.replica_id, + local_timestamp: entry.local_timestamp, + count: entry.count, + }) + .collect()) +} diff --git a/crates/collab/src/db/tables.rs b/crates/collab/src/db/tables.rs index c4c7e4f312..fbf4bff2a6 100644 --- a/crates/collab/src/db/tables.rs +++ b/crates/collab/src/db/tables.rs @@ -1,4 +1,7 @@ pub mod access_token; +pub mod buffer; +pub mod buffer_operation; +pub mod buffer_snapshot; pub mod channel; pub mod channel_member; pub mod channel_path; diff --git a/crates/collab/src/db/tables/buffer.rs b/crates/collab/src/db/tables/buffer.rs new file mode 100644 index 0000000000..84e62cc071 --- /dev/null +++ b/crates/collab/src/db/tables/buffer.rs @@ -0,0 +1,32 @@ +use crate::db::BufferId; +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] +#[sea_orm(table_name = "buffers")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: BufferId, + pub epoch: i32, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm(has_many = "super::buffer_operation::Entity")] + Operations, + #[sea_orm(has_many = "super::buffer_snapshot::Entity")] + Snapshots, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Operations.def() + } +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Snapshots.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/collab/src/db/tables/buffer_operation.rs b/crates/collab/src/db/tables/buffer_operation.rs new file mode 100644 index 0000000000..59626c1e77 --- /dev/null +++ b/crates/collab/src/db/tables/buffer_operation.rs @@ -0,0 +1,37 @@ +use crate::db::BufferId; +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] +#[sea_orm(table_name = "buffer_operations")] +pub struct Model { + #[sea_orm(primary_key)] + pub buffer_id: BufferId, + #[sea_orm(primary_key)] + pub epoch: i32, + #[sea_orm(primary_key)] + pub lamport_timestamp: i32, + #[sea_orm(primary_key)] + pub replica_id: i32, + pub local_timestamp: i32, + pub version: Vec, + pub is_undo: bool, + pub value: Vec, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::buffer::Entity", + from = "Column::BufferId", + to = "super::buffer::Column::Id" + )] + Buffer, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Buffer.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/collab/src/db/tables/buffer_snapshot.rs b/crates/collab/src/db/tables/buffer_snapshot.rs new file mode 100644 index 0000000000..ca8712a053 --- /dev/null +++ b/crates/collab/src/db/tables/buffer_snapshot.rs @@ -0,0 +1,30 @@ +use crate::db::BufferId; +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] +#[sea_orm(table_name = "buffer_snapshots")] +pub struct Model { + #[sea_orm(primary_key)] + pub buffer_id: BufferId, + #[sea_orm(primary_key)] + pub epoch: i32, + pub text: String, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::buffer::Entity", + from = "Column::BufferId", + to = "super::buffer::Column::Id" + )] + Buffer, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Buffer.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/collab/src/db/test_db.rs b/crates/collab/src/db/test_db.rs index 40013d5b03..71e352eb86 100644 --- a/crates/collab/src/db/test_db.rs +++ b/crates/collab/src/db/test_db.rs @@ -96,13 +96,17 @@ macro_rules! test_both_dbs { ($test_name:ident, $postgres_test_name:ident, $sqlite_test_name:ident) => { #[gpui::test] async fn $postgres_test_name() { - let test_db = TestDb::postgres(Deterministic::new(0).build_background()); + let test_db = crate::db::test_db::TestDb::postgres( + gpui::executor::Deterministic::new(0).build_background(), + ); $test_name(test_db.db()).await; } #[gpui::test] async fn $sqlite_test_name() { - let test_db = TestDb::sqlite(Deterministic::new(0).build_background()); + let test_db = crate::db::test_db::TestDb::sqlite( + gpui::executor::Deterministic::new(0).build_background(), + ); $test_name(test_db.db()).await; } }; diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index 008fa9c316..3c307be4fb 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -23,7 +23,7 @@ async-tungstenite = "0.16" base64 = "0.13" futures.workspace = true parking_lot.workspace = true -prost = "0.8" +prost.workspace = true rand.workspace = true rsa = "0.4" serde.workspace = true