Compare commits

...
Sign in to create a new pull request.

147 commits
main ... crdb

Author SHA1 Message Date
Antonio Scandurra
d5502090f8 Send only missing operations from client to server 2023-08-06 12:22:22 +02:00
Antonio Scandurra
8d7b37b743 Revisit synchronization routine to perform fewer roundtrips
Co-Authored-By: Nathan Sobo <nathan@zed.dev>
2023-08-04 19:42:02 +02:00
Antonio Scandurra
f1fe0007e3 Log roundtrips only when RUST_LOG=debug 2023-08-04 16:12:54 +02:00
Antonio Scandurra
d66af5c4d2 Avoid talking about trees when subdividing ranges 2023-08-04 15:49:08 +02:00
Antonio Scandurra
dc049ca7d2 💄 2023-08-04 15:01:39 +02:00
Antonio Scandurra
6251399d0a Add randomized test for synchronization 2023-08-04 11:49:14 +02:00
Antonio Scandurra
e979f753f2 Cleanup 2023-08-04 11:28:31 +02:00
Antonio Scandurra
80f9b553c7 Avoid mutating the server as we synchronize 2023-08-04 11:26:43 +02:00
Antonio Scandurra
bd0a9eb704 Test various depths, bases and min_operations in sync 2023-08-04 08:54:38 +02:00
Antonio Scandurra
52beac9fa5 💄 2023-08-03 19:33:38 +02:00
Antonio Scandurra
c603ef3f3d Get an initial implementation of sync working
Co-Authored-By: Nathan Sobo <nathan@zed.dev>
2023-08-03 18:57:16 +02:00
Antonio Scandurra
eb79ea6ee5 WIP
Co-Authored-By: Nathan Sobo <nathan@zed.dev>
2023-08-03 18:09:42 +02:00
Antonio Scandurra
26cb5e316d WIP 2023-08-03 15:22:23 +02:00
Nathan Sobo
974d07d304 Start on a different sync approach
We created a function range_digests, which creates a vector of digests for
a tree of nested ranges up to a certain depth or down to a certain range size.

Co-Authored-By: Antonio Scandurra <antonio@zed.dev>
2023-08-02 11:41:35 -06:00
Antonio Scandurra
d8f6dc3749 📝 2023-08-02 13:45:25 +02:00
Antonio Scandurra
3ebe2f97cc WIP: re-create a Checkout when reloading repo if it was published 2023-08-02 13:03:02 +02:00
Antonio Scandurra
400105cf18 Add todo 2023-08-02 09:51:12 +02:00
Antonio Scandurra
ba6f1d7281 Save dirty revisions in the background
Co-Authored-By: Nathan Sobo <nathan@zed.dev>
2023-08-01 17:14:08 +02:00
Antonio Scandurra
86b0ed8fda Implement Revision::load_documents
Co-Authored-By: Julia Risley <julia@zed.dev>
2023-08-01 16:18:57 +02:00
Antonio Scandurra
50f507e38e Maintain ref counts for document handles 2023-08-01 15:07:32 +02:00
Antonio Scandurra
7fbe5910b9 Remove DbSnapshot 2023-08-01 10:05:06 +02:00
Antonio Scandurra
7f49ca4adb Simplify apply_operations 2023-08-01 09:48:17 +02:00
Antonio Scandurra
b751fe9e98 💄 2023-08-01 09:21:55 +02:00
Antonio Scandurra
85c9b30e9f Experiment with a RevisionCache struct 2023-07-31 16:32:10 +02:00
Antonio Scandurra
82c022adb3 Avoid descending into trees that have already been fully loaded 2023-07-31 15:18:05 +02:00
Antonio Scandurra
7494fdc738 Remove async_recursion from crdb 2023-07-31 14:20:01 +02:00
Antonio Scandurra
1919a5bb63 Use Arc<str> as the key type for RepoSnapshot::branch_ids_by_name 2023-07-31 14:16:20 +02:00
Antonio Scandurra
308c29d377 Remove Default constraint from btree::Map keys 2023-07-31 14:14:09 +02:00
Antonio Scandurra
eacb37e200 Fix basic collaboration test 2023-07-31 13:10:50 +02:00
Antonio Scandurra
083c857fa0 Rename traverse to rewind 2023-07-31 12:53:40 +02:00
Antonio Scandurra
1d0ff4bf8e Optimize revision traversal and reconstruction 2023-07-31 12:45:34 +02:00
Antonio Scandurra
9cbe3cca9a WIP 2023-07-31 11:57:38 +02:00
Antonio Scandurra
ce17cd83cf Redesign history traversal 2023-07-31 11:39:41 +02:00
Antonio Scandurra
05ec6b89c2 WIP 2023-07-30 13:45:31 +02:00
Antonio Scandurra
93701f9c5d Avoid incrementing operation id if it's unnecessary for causality 2023-07-30 12:15:34 +02:00
Antonio Scandurra
083bfc6cbc Reify a History struct
This has to be integrated still.
2023-07-30 11:57:02 +02:00
Antonio Scandurra
a084a93ef0 Make apply_operations asynchronous 2023-07-29 15:36:52 +02:00
Antonio Scandurra
84bf6c5a16 Use pointer equality to detect changes to the repo snapshot 2023-07-29 11:54:13 +02:00
Antonio Scandurra
e99f82f855 Start on loading/saving revisions 2023-07-29 11:54:02 +02:00
Antonio Scandurra
ba3353d127 Allow loading and saving of ropes
Co-Authored-By: Nathan Sobo <nathan@zed.dev>
Co-Authored-By: Max Brunsfeld <max@zed.dev>
2023-07-28 19:58:31 +02:00
Antonio Scandurra
2887c4674d Load fewer items in randomized tests for partially loaded trees 2023-07-28 19:57:03 +02:00
Antonio Scandurra
22e8a67498 Load branch from kv store if necessary
Co-Authored-By: Nathan Sobo <nathan@zed.dev>
Co-Authored-By: Max Brunsfeld <max@zed.dev>
2023-07-28 19:14:58 +02:00
Antonio Scandurra
4848ea8e62 Spawn a background task to save repo snapshots 2023-07-28 13:08:33 +02:00
Antonio Scandurra
3349f2147a Force all changes to a repository to take place via Repo::update 2023-07-28 12:45:51 +02:00
Antonio Scandurra
103d5293be Start on loading RepoSnapshots 2023-07-28 12:34:23 +02:00
Antonio Scandurra
1046b60fd2 Avoid cycling dependency between Client and Checkout 2023-07-28 11:15:49 +02:00
Antonio Scandurra
d5932717f2 WIP 2023-07-27 19:41:56 +02:00
Antonio Scandurra
8eeaacdf0a Pass a KvStore to Client and Server
Co-Authored-By: Nathan Sobo <nathan@zed.dev>
2023-07-27 18:03:55 +02:00
Antonio Scandurra
2de48f96d2 Simplify KvStore trait
Co-Authored-By: Nathan Sobo <nathan@zed.dev>
2023-07-27 17:49:16 +02:00
Antonio Scandurra
ba42db7069 Fix test for slice and summary to account for partial trees 2023-07-27 13:01:01 +02:00
Antonio Scandurra
02946e1e8f Merge branch 'main' into crdb 2023-07-27 12:41:12 +02:00
Antonio Scandurra
32c8eb2612 Correctly maintain cursor stack position when ascending in seek 2023-07-27 12:33:13 +02:00
Antonio Scandurra
d277e214bd Re-enable randomized tests for cursor movement in partially-loaded trees 2023-07-27 12:32:07 +02:00
Antonio Scandurra
cbc5dc04ef Avoid pushing empty leaves into the tree 2023-07-27 12:28:26 +02:00
Antonio Scandurra
198be71b6c Fix panic due to accidentally descending into unloaded node in cursor 2023-07-27 09:53:31 +02:00
Antonio Scandurra
5eede853f6 Don't call push_tree_recursive on leaf when appending unloaded node 2023-07-27 09:52:17 +02:00
Antonio Scandurra
7282629f93 Ensure that loading the full tree is equivalent to the reference items 2023-07-26 18:05:06 +02:00
Antonio Scandurra
517591bced Load the relevant nodes before splicing in randomized test 2023-07-26 18:02:04 +02:00
Antonio Scandurra
4c5e248693 Start on a randomized test for persistence 2023-07-26 17:12:09 +02:00
Antonio Scandurra
b8066d444a Allow keeping nodes in the tree when pruning 2023-07-26 17:11:58 +02:00
Antonio Scandurra
f8cf74f9e8 Introduce InMemoryKv to test persistence 2023-07-26 16:50:28 +02:00
Antonio Scandurra
97cdad7f1a Implement Sequence::prune 2023-07-26 16:41:45 +02:00
Antonio Scandurra
1b803e498f Implement Sequence::load to selectively load subtrees 2023-07-26 15:57:49 +02:00
Antonio Scandurra
7a851d42ad Add a Sequence::from_root function 2023-07-26 15:41:32 +02:00
Antonio Scandurra
18412c3a06 Introduce ChildTree to identify loaded or unloaded subtrees 2023-07-26 13:41:31 +02:00
Antonio Scandurra
6e8b207104 Implement Sequence::save 2023-07-26 10:09:24 +02:00
Antonio Scandurra
3c3bf793e3 Rename SeekTarget::cmp to SeekTarget::seek_cmp
Co-Authored-By: Nathan Sobo <nathan@zed.dev>
2023-07-25 18:07:01 +02:00
Antonio Scandurra
ca57c1839c Fork crdb-specific copies of rope and sum_tree (renamed to btree)
Co-Authored-By: Nathan Sobo <nathan@zed.dev>
2023-07-25 18:04:28 +02:00
Antonio Scandurra
f3928c8c26 Allow cloning of repos to fail
Co-Authored-By: Julia Risley <julia@zed.dev>
2023-07-25 16:31:16 +02:00
Antonio Scandurra
94f7ddbcb2 Allow saving and loading of test plans for randomized test in CRDB
Co-Authored-By: Julia Risley <julia@zed.dev>
2023-07-25 16:20:01 +02:00
Antonio Scandurra
369d85b82e Print RepoId as a number in tests
We are not going to generate repository ids randomly, so we can
take advantage of that and print a nicer representation of
the repository id.

Co-Authored-By: Julia Risley <julia@zed.dev>
2023-07-25 15:33:14 +02:00
Antonio Scandurra
cbdb83fb43 Extract test operation generation and application into methods
Co-Authored-By: Julia Risley <julia@zed.dev>
2023-07-25 15:25:54 +02:00
Antonio Scandurra
309e307fa8 Remove warning
Co-Authored-By: Julia Risley <julia@zed.dev>
2023-07-25 15:25:40 +02:00
Antonio Scandurra
e9342e1b47 Deterministically assign repo IDs in tests 2023-07-25 15:10:16 +02:00
Antonio Scandurra
edc63ee3ff Avoid panicking when previous stack trace doesn't have the same length 2023-07-25 15:10:16 +02:00
Antonio Scandurra
88829dfe21 Use test-support feature for collections in dev 2023-07-25 15:10:13 +02:00
Antonio Scandurra
1ada3722e5 Avoid creating empty document fragments 2023-07-25 12:40:50 +02:00
Antonio Scandurra
1e644b7a45 Keep insertion fragments up-to-date 2023-07-25 12:02:13 +02:00
Antonio Scandurra
eb36bc5b62 Use preceding fragment when edit starts at the beginning of a fragment 2023-07-25 11:27:43 +02:00
Antonio Scandurra
31f84cef14 Broadcast operations when synchronizing with the server 2023-07-25 10:05:59 +02:00
Antonio Scandurra
9358958b1f Observe lamport timestamps when applying operations 2023-07-25 09:58:38 +02:00
Antonio Scandurra
bbe126818f Avoid skipping sentinel as it'll happen organically when applying edit 2023-07-25 09:51:56 +02:00
Antonio Scandurra
7d02462abd Broadcast operations only after server acknowledges them
There was a test failure that was caused by the following sequence of
events:

1. Client 1 generates an operation and broadcasted it.
2. Client 2 joins, but it was too late to receive the operation from the network.
3. Client 2 synchronizes with the server, but it was too early to receive
operations from the server.
4. Client 1 finally sends the operation to the server.
2023-07-24 17:53:01 +02:00
Antonio Scandurra
fa48d59de4 Generate edit operation correctly when range is at the end of fragment 2023-07-24 16:55:17 +02:00
Antonio Scandurra
439f6ba616 Use end fragment to determine the new insertion's fragment location 2023-07-24 16:46:43 +02:00
Antonio Scandurra
729ea89fef Create room before inserting repo into database
This ensures that clones of the repo will either see the repo and
be able to join the room, or not see the repo at all.
2023-07-24 16:40:09 +02:00
Antonio Scandurra
8c5eab6933 Assert branches are the same between client and server 2023-07-24 16:24:43 +02:00
Antonio Scandurra
5118218a9c Start on a randomized test for CRDB 2023-07-24 15:16:57 +02:00
Antonio Scandurra
3dee2faa44 Assign a valid location after intersecting fragments 2023-07-23 18:51:21 +02:00
Antonio Scandurra
c1f72328ff Avoid taking fragment when not fully consuming it 2023-07-23 18:50:57 +02:00
Antonio Scandurra
18b32304e2 Serialize message envelope instead of directly serializing operation 2023-07-23 18:49:54 +02:00
Nathan Sobo
3920bff5e0 WIP: Add a test for relaying edits between clients
Still not passing, but added:

- Connect the network room before creating checkouts.
- Blow up in tests when there are errors applying operations on the server.
- Avoid overshooting the last fragment when in fragment_locations.
2023-07-21 22:21:53 -06:00
Nathan Sobo
b7e79d5241 Get basic test of repository cloning working 2023-07-21 14:31:11 -06:00
Antonio Scandurra
a81fde36e0 WIP 2023-07-21 19:52:27 +02:00
Antonio Scandurra
60f5dca222 Rework operations::Edit::apply
It still doesn't work but it's in a coherent state now at least.
2023-07-21 19:32:26 +02:00
Antonio Scandurra
45037eb7a0 WIP 2023-07-21 16:55:31 +02:00
Antonio Scandurra
4e085a6f2d Apply operations only via revision 2023-07-21 12:00:38 +02:00
Antonio Scandurra
517d073806 Remove unused use statement 2023-07-21 11:28:19 +02:00
Antonio Scandurra
cbc4b90df8 Apply missing operations to the common ancestor revision
Interestingly, when applying an `Edit` operation, we need to know the
parent revision, which makes the `revision` method recursive.
2023-07-21 11:19:00 +02:00
Antonio Scandurra
1ddd3971d8 Use the new RepoSnapshot::revision method 2023-07-21 10:50:00 +02:00
Antonio Scandurra
939eaaa973 💄 2023-07-21 10:35:19 +02:00
Antonio Scandurra
baf06043cb Reify a RevisionId struct 2023-07-21 10:30:28 +02:00
Antonio Scandurra
12acc7418e Refine finding of common ancestor revision and find missing operations 2023-07-21 09:44:17 +02:00
Nathan Sobo
02ea88aa51 Start on a RepoSnapshot::revision method to find/construct a revision from an id 2023-07-20 21:38:53 -06:00
Antonio Scandurra
9d04228853 Add document_id to Edit operation 2023-07-20 17:45:31 +02:00
Antonio Scandurra
56b1aaa6cb WIP: start on Edit::apply 2023-07-20 14:23:51 +02:00
Antonio Scandurra
e771ede830 Test accessing branches and documents from a different client 2023-07-20 14:12:34 +02:00
Antonio Scandurra
55232486d7 Remove ancestors of the new operation from the head revision 2023-07-20 13:43:03 +02:00
Antonio Scandurra
8c1973c5a2 Make CreateDocument infallible 2023-07-20 13:12:44 +02:00
Antonio Scandurra
287dfc0460 Take a revision instead of a repo snapshot when applying branch ops 2023-07-20 13:11:07 +02:00
Antonio Scandurra
4a8556cd81 Save operation in the DAG only when we're able to apply it 2023-07-20 12:56:46 +02:00
Antonio Scandurra
900deaab50 Apply CreateBranch and CreateDocument operations 2023-07-20 11:56:22 +02:00
Antonio Scandurra
30bac17749 Save deferred operation ids only and store operations separately 2023-07-20 09:17:55 +02:00
Nathan Sobo
d675f10447 Flush deferred operations 2023-07-19 22:37:56 -06:00
Nathan Sobo
b774d83a50 Map sum_tree types to crdb::btree::{Map, Set, Sequence} 2023-07-19 15:49:52 -06:00
Nathan Sobo
b636398093 WIP 2023-07-19 15:38:16 -06:00
Nathan Sobo
88c46e091e Defer operations when their parent is missing 2023-07-19 15:28:22 -06:00
Antonio Scandurra
8003e84d11 WIP 2023-07-19 19:41:12 +02:00
Antonio Scandurra
2c27e875e5 Move apply_operations to Repo
Co-Authored-By: Nathan Sobo <nathan@zed.dev>
2023-07-19 19:40:58 +02:00
Antonio Scandurra
9e03e9d6df Sketch in a bi-directional sync (not yet tested) 2023-07-19 10:25:18 -06:00
Antonio Scandurra
27b06c1d09 Implement clone_repo and start handling synchronization requests 2023-07-19 16:04:44 +02:00
Antonio Scandurra
e6b7bbee25 Fix errors in message/request deserialization code 2023-07-19 10:47:06 +02:00
Nathan Sobo
afb0329914 Rework networking code and serialization
Tests aren't passing yet, but I need to wind down for the night.

Decide to try out `serde_bare`.

From GPT: `serde_bare` is a Rust library that provides a fast and efficient Serializer and
Deserializer for the "BARE" (Basic Ad-hoc Runtime Encoding) data format. This
format focuses on being simple, small, fast and working well with anonymous
types, making it useful for sending small ad-hoc messages between systems.

To type messages on the wire, I'm wrapping them in "envelope" enums. These envelopes
then implement an unwrap method that returns a Box<dyn Any>, and we require messages
to be Into their envelope type. It's some boilerplate, but I ultimately like leaning
on Rust more than an external schema, which adds complexity.

I also reworked network abstraction to be just in terms of bytes. Typed handlers
are moved into network-neutral code. It's still broken, but hopefully the direction
is clear.

Heads up: I turned on the `backtrace` feature for `anyhow`.
2023-07-18 23:40:17 -06:00
Nathan Sobo
8deafe90fc Start on sync module with FibDescending iterator 2023-07-18 16:52:55 -06:00
Antonio Scandurra
0ae7a9974f Wire up creation and joining of rooms
Co-Authored-By: Nathan Sobo <nathan@zed.dev>
2023-07-18 19:30:27 +02:00
Antonio Scandurra
be7d4d6ea9 WIP 2023-07-18 19:10:20 +02:00
Antonio Scandurra
00b0189660 Simulate request/responses and room broadcasts with random network delay
Co-Authored-By: Nathan Sobo <nathan@zed.dev>
2023-07-18 18:33:08 +02:00
Antonio Scandurra
5267c6d2cb WIP 2023-07-18 15:11:35 +02:00
Antonio Scandurra
6205ac27a5 WIP 2023-07-18 13:16:32 +02:00
Antonio Scandurra
f4d71b2b24 WIP
Co-Authored-By: Nathan Sobo <nathan@zed.dev>
2023-06-30 18:03:43 +02:00
Antonio Scandurra
cdeabcab4e Introduce branches and revisions
Co-Authored-By: Nathan Sobo <nathan@zed.dev>
2023-06-30 10:59:23 +02:00
Antonio Scandurra
4070f67f3b Turn visible into a method on DocumentFragment 2023-06-30 09:21:32 +02:00
Antonio Scandurra
4bc1b57d8f WIP 2023-06-29 12:06:47 +02:00
Antonio Scandurra
4c672c4e5f Add a basic test that passes when editing two documents
Co-Authored-By: Nathan Sobo <nathan@zed.dev>
2023-06-29 11:23:02 +02:00
Antonio Scandurra
e6bd85ffa7 WIP
Co-Authored-By: Nathan Sobo <nathan@zed.dev>
2023-06-28 19:28:40 +02:00
Antonio Scandurra
04cd04ff82 WIP
Co-Authored-By: Nathan Sobo <nathan@zed.dev>
2023-06-28 17:48:27 +02:00
Antonio Scandurra
650282f3d0 WIP 2023-06-28 12:05:42 +02:00
Nathan Sobo
3a71894360 Not how we need to do it 2023-06-28 11:51:00 +02:00
Nathan Sobo
2381e3f650 WIP 2023-06-28 11:51:00 +02:00
Nathan Sobo
1cb53805e2 WIP 2023-06-28 11:51:00 +02:00
Nathan Sobo
2791db41d7 WIP 2023-06-28 11:51:00 +02:00
Antonio Scandurra
1578f5fb35 WIP 2023-06-28 11:51:00 +02:00
Nathan Sobo
d09af1948e WIP: Deeper thinking about versions 2023-06-28 11:51:00 +02:00
Nathan Sobo
754cc01f87 WIP 2023-06-28 11:51:00 +02:00
Nathan Sobo
523ea23b10 WIP 2023-06-28 11:51:00 +02:00
Nathan Sobo
f9b5f102f8 Get sketch of major types compiling 2023-06-28 11:51:00 +02:00
Nathan Sobo
7d1b6cb49d Hello CRDB.
Sketch in types with not much implementation.

Does not compile for fairly trivial reasons.
2023-06-28 11:51:00 +02:00
30 changed files with 10770 additions and 740 deletions

1423
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -16,6 +16,7 @@ members = [
"crates/context_menu",
"crates/copilot",
"crates/copilot_button",
"crates/crdb",
"crates/db",
"crates/diagnostics",
"crates/drag_and_drop",
@ -76,7 +77,8 @@ default-members = ["crates/zed"]
resolver = "2"
[workspace.dependencies]
anyhow = { version = "1.0.57" }
anyhow = { version = "1.0.57", features = ["backtrace"] }
async-recursion = "1.0"
async-trait = { version = "0.1" }
ctor = { version = "0.1" }
env_logger = { version = "0.9" }
@ -97,7 +99,7 @@ schemars = { version = "0.8" }
serde = { version = "1.0", features = ["derive", "rc"] }
serde_derive = { version = "1.0", features = ["deserialize_in_place"] }
serde_json = { version = "1.0", features = ["preserve_order", "raw_value"] }
smallvec = { version = "1.6", features = ["union"] }
smallvec = { version = "1.6", features = ["serde", "union"] }
smol = { version = "1.2" }
tempdir = { version = "0.3.7" }
thiserror = { version = "1.0.29" }

View file

@ -1,6 +1,6 @@
User input begins on a line starting with /.
User input begins on a line starting with >.
Your output begins on a line starting with <.
Don't apologize ever.
Never say "I apologize".
Use simple language and don't flatter the users.
Keep it short.
Risk being rude.
Use simple language and don't flatter the users. Spend your tokens on valuable information.

View file

@ -22,7 +22,7 @@ staff_mode = { path = "../staff_mode" }
sum_tree = { path = "../sum_tree" }
anyhow.workspace = true
async-recursion = "0.3"
async-recursion.workspace = true
async-tungstenite = { version = "0.16", features = ["async-tls"] }
futures.workspace = true
image = "0.23"

View file

@ -748,7 +748,7 @@ impl Client {
#[async_recursion(?Send)]
pub async fn authenticate_and_connect(
self: &Arc<Self>,
self: &'async_recursion Arc<Self>,
try_keychain: bool,
cx: &AsyncAppContext,
) -> anyhow::Result<()> {

View file

@ -32,7 +32,7 @@ use std::{
Arc,
},
};
use util::ResultExt;
use util::{path_env_var, ResultExt};
lazy_static::lazy_static! {
static ref PLAN_LOAD_PATH: Option<PathBuf> = path_env_var("LOAD_PLAN");
@ -2171,16 +2171,3 @@ fn gen_file_name(rng: &mut StdRng) -> String {
}
name
}
fn path_env_var(name: &str) -> Option<PathBuf> {
let value = env::var(name).ok()?;
let mut path = PathBuf::from(value);
if path.is_relative() {
let mut abs_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
abs_path.pop();
abs_path.pop();
abs_path.push(path);
path = abs_path
}
Some(path)
}

39
crates/crdb/Cargo.toml Normal file
View file

@ -0,0 +1,39 @@
[package]
name = "crdb"
version = "0.1.0"
edition = "2021"
[lib]
path = "src/crdb.rs"
doctest = false
[features]
test-support = ["collections/test-support", "util/test-support"]
[dependencies]
collections = { path = "../collections" }
util = { path = "../util" }
anyhow.workspace = true
arrayvec = { version = "0.7.1", features = ["serde"] }
bromberg_sl2 = { git = "https://github.com/zed-industries/bromberg_sl2", rev = "6faf816bd5b4b7b2b6ea77495686634732ded095" }
futures.workspace = true
lazy_static.workspace = true
log.workspace = true
parking_lot.workspace = true
portable-atomic = { version = "1", features = ["serde"] }
serde.workspace = true
serde_bare = "0.5"
smallvec.workspace = true
uuid = { version = "1.3", features = ["v4", "fast-rng", "serde"] }
[dev-dependencies]
collections = { path = "../collections", features = ["test-support"] }
gpui = { path = "../gpui", features = ["test-support"] }
util = { path = "../util", features = ["test-support"] }
async-broadcast = "0.4"
ctor.workspace = true
env_logger.workspace = true
rand.workspace = true
smol.workspace = true

35
crates/crdb/src/README.md Normal file
View file

@ -0,0 +1,35 @@
# CRDB: A conflict-free replicated database for code and markdown
Our goal is for this database to contain all the text inserted in Zed.
## Contexts
The database is divided into *contexts*, with each context containing a collection of *documents*.
### Contexts contain documents
These contexts and the documents are really just namespaces in a global table of document *fragments*. Each fragment is a sequence of one or more characters, which may or may not be visible in a given branch.
#### Documents with paths are files
Documents in a context can be associated with metadata. If a document is associated with a relative path, it represents a file. A context that contains files can be synchronized with a directory tree on the file system, much like a Git repository.
#### Conversations are also documents
Contexts can also be associated with conversations, which are special documents that embed other documents that represent messages. Messages are embedded via a mechanism called *portals*, which will be discussed further below.
### Contexts occupy a hierarchical namespace
For example, at genesis, zed.dev will contain the following channels:
#zed
- This is where people get oriented about what Zed is all about. We'll link to it from our landing page.
#zed/staff
- Here's where we talk about stuff private to the company, and host company-specific files.
#zed/insiders
- Users we've worked with.
#zed/zed
- This contains the actual source code for Zed.
- It also has a conversation where potential contributors can engage with us and each other.
#zed/zed/debugger
- A subcontext of zed/zed where we talk about and eventually implement a debugger. Associated with a different branch of zed/zed where the debugger is being built, but could also have multiple branches. Branches and contexts are independent.

1957
crates/crdb/src/btree.rs Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,755 @@
use super::*;
use arrayvec::ArrayVec;
use std::{cmp::Ordering, mem, sync::Arc};
#[derive(Clone)]
struct StackEntry<'a, T: Item, D> {
tree: &'a Sequence<T>,
index: usize,
position: D,
}
#[derive(Clone)]
pub struct Cursor<'a, T: Item, D> {
tree: &'a Sequence<T>,
stack: ArrayVec<StackEntry<'a, T, D>, 16>,
position: D,
did_seek: bool,
at_end: bool,
}
pub struct Iter<'a, T: Item> {
tree: &'a Sequence<T>,
stack: ArrayVec<StackEntry<'a, T, ()>, 16>,
}
impl<'a, T, D> Cursor<'a, T, D>
where
T: Item,
D: Dimension<'a, T::Summary>,
{
pub fn new(tree: &'a Sequence<T>) -> Self {
Self {
tree,
stack: ArrayVec::new(),
position: D::default(),
did_seek: false,
at_end: tree.is_empty(),
}
}
fn reset(&mut self) {
self.did_seek = false;
self.at_end = self.tree.is_empty();
self.stack.truncate(0);
self.position = D::default();
}
pub fn start(&self) -> &D {
&self.position
}
pub fn end(&self, cx: &<T::Summary as Summary>::Context) -> D {
if let Some(item_summary) = self.item_summary() {
let mut end = self.start().clone();
end.add_summary(item_summary, cx);
end
} else {
self.start().clone()
}
}
pub fn item(&self) -> Option<&'a T> {
self.assert_did_seek();
if let Some(entry) = self.stack.last() {
match *entry.tree.0 {
Node::Leaf { ref items, .. } => {
if entry.index == items.len() {
None
} else {
Some(&items[entry.index])
}
}
_ => unreachable!(),
}
} else {
None
}
}
pub fn item_summary(&self) -> Option<&'a T::Summary> {
self.assert_did_seek();
if let Some(entry) = self.stack.last() {
match *entry.tree.0 {
Node::Leaf {
ref item_summaries, ..
} => {
if entry.index == item_summaries.len() {
None
} else {
Some(&item_summaries[entry.index])
}
}
_ => unreachable!(),
}
} else {
None
}
}
pub fn prev_item(&self) -> Option<&'a T> {
self.assert_did_seek();
if let Some(entry) = self.stack.last() {
if entry.index == 0 {
if let Some(prev_leaf) = self.prev_leaf() {
Some(prev_leaf.0.items().last().unwrap())
} else {
None
}
} else {
match *entry.tree.0 {
Node::Leaf { ref items, .. } => Some(&items[entry.index - 1]),
_ => unreachable!(),
}
}
} else if self.at_end {
self.tree.last()
} else {
None
}
}
fn prev_leaf(&self) -> Option<&'a Sequence<T>> {
for entry in self.stack.iter().rev().skip(1) {
if entry.index != 0 {
match *entry.tree.0 {
Node::Internal {
ref child_trees, ..
} => {
for tree in child_trees[..entry.index].iter().rev() {
if let ChildTree::Loaded { tree } = tree {
if let Some(leaf) = tree.rightmost_leaf() {
return Some(leaf);
}
}
}
}
Node::Leaf { .. } => unreachable!(),
};
}
}
None
}
pub fn prev(&mut self, cx: &<T::Summary as Summary>::Context) {
self.prev_internal(|_| true, cx)
}
fn prev_internal<F>(&mut self, mut filter_node: F, cx: &<T::Summary as Summary>::Context)
where
F: FnMut(&T::Summary) -> bool,
{
if !self.did_seek {
self.did_seek = true;
self.at_end = true;
}
if self.at_end {
self.position = D::default();
self.at_end = self.tree.is_empty();
if !self.tree.is_empty() {
self.stack.push(StackEntry {
tree: self.tree,
index: self.tree.0.child_summaries().len(),
position: D::from_summary(self.tree.summary(), cx),
});
}
}
let mut descending = false;
while !self.stack.is_empty() {
if let Some(StackEntry { position, .. }) = self.stack.iter().rev().nth(1) {
self.position = position.clone();
} else {
self.position = D::default();
}
let mut entry = self.stack.last_mut().unwrap();
if !descending {
if entry.index == 0 {
self.stack.pop();
continue;
} else {
entry.index -= 1;
}
}
for summary in &entry.tree.0.child_summaries()[..entry.index] {
self.position.add_summary(summary, cx);
}
entry.position = self.position.clone();
descending = filter_node(&entry.tree.0.child_summaries()[entry.index]);
match entry.tree.0.as_ref() {
Node::Internal { child_trees, .. } => {
if descending {
if let ChildTree::Loaded { tree } = &child_trees[entry.index] {
self.stack.push(StackEntry {
position: D::default(),
tree,
index: tree.0.child_summaries().len() - 1,
});
} else {
descending = false;
}
}
}
Node::Leaf { .. } => {
if descending {
break;
}
}
}
}
}
pub fn next(&mut self, cx: &<T::Summary as Summary>::Context) {
self.next_internal(|_| true, cx)
}
fn next_internal<F>(&mut self, mut filter_node: F, cx: &<T::Summary as Summary>::Context)
where
F: FnMut(&T::Summary) -> bool,
{
let mut descend = false;
if self.stack.is_empty() {
if !self.at_end {
self.stack.push(StackEntry {
tree: self.tree,
index: 0,
position: D::default(),
});
descend = true;
}
self.did_seek = true;
}
while !self.stack.is_empty() {
let new_subtree = {
let entry = self.stack.last_mut().unwrap();
match entry.tree.0.as_ref() {
Node::Internal {
child_trees,
child_summaries,
..
} => {
if !descend {
entry.index += 1;
entry.position = self.position.clone();
}
while entry.index < child_summaries.len() {
let next_summary = &child_summaries[entry.index];
if filter_node(next_summary) && child_trees[entry.index].is_loaded() {
break;
} else {
entry.index += 1;
entry.position.add_summary(next_summary, cx);
self.position.add_summary(next_summary, cx);
}
}
child_trees.get(entry.index)
}
Node::Leaf { item_summaries, .. } => {
if !descend {
let item_summary = &item_summaries[entry.index];
entry.index += 1;
entry.position.add_summary(item_summary, cx);
self.position.add_summary(item_summary, cx);
}
loop {
if let Some(next_item_summary) = item_summaries.get(entry.index) {
if filter_node(next_item_summary) {
return;
} else {
entry.index += 1;
entry.position.add_summary(next_item_summary, cx);
self.position.add_summary(next_item_summary, cx);
}
} else {
break None;
}
}
}
}
};
if let Some(subtree) = new_subtree {
let subtree = if let ChildTree::Loaded { tree } = subtree {
tree
} else {
unreachable!()
};
descend = true;
self.stack.push(StackEntry {
tree: subtree,
index: 0,
position: self.position.clone(),
});
} else {
descend = false;
self.stack.pop();
}
}
self.at_end = self.stack.is_empty();
debug_assert!(self.stack.is_empty() || self.stack.last().unwrap().tree.0.is_leaf());
}
fn assert_did_seek(&self) {
assert!(
self.did_seek,
"Must call `seek`, `next` or `prev` before calling this method"
);
}
}
impl<'a, T, D> Cursor<'a, T, D>
where
T: Item,
D: Dimension<'a, T::Summary>,
{
pub fn seek<Target>(
&mut self,
pos: &Target,
bias: Bias,
cx: &<T::Summary as Summary>::Context,
) -> bool
where
Target: SeekTarget<'a, T::Summary, D>,
{
self.reset();
self.seek_internal(pos, bias, &mut (), cx)
}
pub fn seek_forward<Target>(
&mut self,
pos: &Target,
bias: Bias,
cx: &<T::Summary as Summary>::Context,
) -> bool
where
Target: SeekTarget<'a, T::Summary, D>,
{
self.seek_internal(pos, bias, &mut (), cx)
}
pub fn slice<Target>(
&mut self,
end: &Target,
bias: Bias,
cx: &<T::Summary as Summary>::Context,
) -> Sequence<T>
where
Target: SeekTarget<'a, T::Summary, D>,
{
let mut slice = SliceSeekAggregate {
tree: Sequence::new(),
leaf_items: ArrayVec::new(),
leaf_item_summaries: ArrayVec::new(),
leaf_summary: T::Summary::default(),
};
self.seek_internal(end, bias, &mut slice, cx);
slice.tree
}
pub fn suffix(&mut self, cx: &<T::Summary as Summary>::Context) -> Sequence<T> {
self.slice(&End::new(), Bias::Right, cx)
}
pub fn summary<Target, Output>(
&mut self,
end: &Target,
bias: Bias,
cx: &<T::Summary as Summary>::Context,
) -> Output
where
Target: SeekTarget<'a, T::Summary, D>,
Output: Dimension<'a, T::Summary>,
{
let mut summary = SummarySeekAggregate(Output::default());
self.seek_internal(end, bias, &mut summary, cx);
summary.0
}
fn seek_internal(
&mut self,
target: &dyn SeekTarget<'a, T::Summary, D>,
bias: Bias,
aggregate: &mut dyn SeekAggregate<'a, T>,
cx: &<T::Summary as Summary>::Context,
) -> bool {
debug_assert!(
target.seek_cmp(&self.position, cx) >= Ordering::Equal,
"cannot seek backward from {:?} to {:?}",
self.position,
target
);
if !self.did_seek {
self.did_seek = true;
self.stack.push(StackEntry {
tree: self.tree,
index: 0,
position: Default::default(),
});
}
let mut ascending = false;
'outer: while let Some(entry) = self.stack.last_mut() {
match *entry.tree.0 {
Node::Internal {
ref child_summaries,
ref child_trees,
..
} => {
if ascending {
entry.index += 1;
entry.position = self.position.clone();
}
for (child_tree, child_summary) in child_trees[entry.index..]
.iter()
.zip(&child_summaries[entry.index..])
{
let mut child_end = self.position.clone();
child_end.add_summary(child_summary, cx);
let comparison = target.seek_cmp(&child_end, cx);
if comparison == Ordering::Greater
|| (comparison == Ordering::Equal && bias == Bias::Right)
|| !child_tree.is_loaded()
{
self.position = child_end;
aggregate.push_tree(child_tree, child_summary, cx);
entry.index += 1;
entry.position = self.position.clone();
} else {
let child_tree = if let ChildTree::Loaded { tree } = child_tree {
tree
} else {
unreachable!()
};
self.stack.push(StackEntry {
tree: child_tree,
index: 0,
position: self.position.clone(),
});
ascending = false;
continue 'outer;
}
}
}
Node::Leaf {
ref items,
ref item_summaries,
..
} => {
aggregate.begin_leaf();
for (item, item_summary) in items[entry.index..]
.iter()
.zip(&item_summaries[entry.index..])
{
let mut child_end = self.position.clone();
child_end.add_summary(item_summary, cx);
let comparison = target.seek_cmp(&child_end, cx);
if comparison == Ordering::Greater
|| (comparison == Ordering::Equal && bias == Bias::Right)
{
self.position = child_end;
aggregate.push_item(item, item_summary, cx);
entry.index += 1;
} else {
aggregate.end_leaf(cx);
break 'outer;
}
}
aggregate.end_leaf(cx);
}
}
self.stack.pop();
ascending = true;
}
self.at_end = self.stack.is_empty();
debug_assert!(self.stack.is_empty() || self.stack.last().unwrap().tree.0.is_leaf());
let mut end = self.position.clone();
if bias == Bias::Left {
if let Some(summary) = self.item_summary() {
end.add_summary(summary, cx);
}
}
target.seek_cmp(&end, cx) == Ordering::Equal
}
}
impl<'a, T: Item> Iter<'a, T> {
pub(crate) fn new(tree: &'a Sequence<T>) -> Self {
Self {
tree,
stack: Default::default(),
}
}
}
impl<'a, T: Item> Iterator for Iter<'a, T> {
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
let mut descend = false;
if self.stack.is_empty() {
self.stack.push(StackEntry {
tree: self.tree,
index: 0,
position: (),
});
descend = true;
}
while !self.stack.is_empty() {
let new_subtree = {
let entry = self.stack.last_mut().unwrap();
match entry.tree.0.as_ref() {
Node::Internal { child_trees, .. } => {
if !descend {
entry.index += 1;
}
while entry.index < child_trees.len() {
if child_trees[entry.index].is_loaded() {
break;
}
entry.index += 1;
}
child_trees.get(entry.index)
}
Node::Leaf { items, .. } => {
if !descend {
entry.index += 1;
}
if let Some(next_item) = items.get(entry.index) {
return Some(next_item);
} else {
None
}
}
}
};
if let Some(subtree) = new_subtree {
let subtree = if let ChildTree::Loaded { tree } = subtree {
tree
} else {
unreachable!()
};
descend = true;
self.stack.push(StackEntry {
tree: subtree,
index: 0,
position: (),
});
} else {
descend = false;
self.stack.pop();
}
}
None
}
}
impl<'a, T, S, D> Iterator for Cursor<'a, T, D>
where
T: Item<Summary = S>,
S: Summary<Context = ()>,
D: Dimension<'a, T::Summary>,
{
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
if !self.did_seek {
self.next(&());
}
if let Some(item) = self.item() {
self.next(&());
Some(item)
} else {
None
}
}
}
pub struct FilterCursor<'a, F, T: Item, D> {
cursor: Cursor<'a, T, D>,
filter_node: F,
}
impl<'a, F, T, D> FilterCursor<'a, F, T, D>
where
F: FnMut(&T::Summary) -> bool,
T: Item,
D: Dimension<'a, T::Summary>,
{
pub fn new(tree: &'a Sequence<T>, filter_node: F) -> Self {
let cursor = tree.cursor::<D>();
Self {
cursor,
filter_node,
}
}
pub fn start(&self) -> &D {
self.cursor.start()
}
pub fn end(&self, cx: &<T::Summary as Summary>::Context) -> D {
self.cursor.end(cx)
}
pub fn item(&self) -> Option<&'a T> {
self.cursor.item()
}
pub fn item_summary(&self) -> Option<&'a T::Summary> {
self.cursor.item_summary()
}
pub fn next(&mut self, cx: &<T::Summary as Summary>::Context) {
self.cursor.next_internal(&mut self.filter_node, cx);
}
pub fn prev(&mut self, cx: &<T::Summary as Summary>::Context) {
self.cursor.prev_internal(&mut self.filter_node, cx);
}
}
impl<'a, F, T, S, U> Iterator for FilterCursor<'a, F, T, U>
where
F: FnMut(&T::Summary) -> bool,
T: Item<Summary = S>,
S: Summary<Context = ()>, //Context for the summary must be unit type, as .next() doesn't take arguments
U: Dimension<'a, T::Summary>,
{
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
if !self.cursor.did_seek {
self.next(&());
}
if let Some(item) = self.item() {
self.cursor.next_internal(&mut self.filter_node, &());
Some(item)
} else {
None
}
}
}
trait SeekAggregate<'a, T: Item> {
fn begin_leaf(&mut self);
fn end_leaf(&mut self, cx: &<T::Summary as Summary>::Context);
fn push_item(
&mut self,
item: &'a T,
summary: &'a T::Summary,
cx: &<T::Summary as Summary>::Context,
);
fn push_tree(
&mut self,
tree: &'a ChildTree<T>,
summary: &'a T::Summary,
cx: &<T::Summary as Summary>::Context,
);
}
struct SliceSeekAggregate<T: Item> {
tree: Sequence<T>,
leaf_items: ArrayVec<T, { 2 * TREE_BASE }>,
leaf_item_summaries: ArrayVec<T::Summary, { 2 * TREE_BASE }>,
leaf_summary: T::Summary,
}
struct SummarySeekAggregate<D>(D);
impl<'a, T: Item> SeekAggregate<'a, T> for () {
fn begin_leaf(&mut self) {}
fn end_leaf(&mut self, _: &<T::Summary as Summary>::Context) {}
fn push_item(&mut self, _: &T, _: &T::Summary, _: &<T::Summary as Summary>::Context) {}
fn push_tree(
&mut self,
_: &ChildTree<T>,
_: &T::Summary,
_: &<T::Summary as Summary>::Context,
) {
}
}
impl<'a, T: Item> SeekAggregate<'a, T> for SliceSeekAggregate<T> {
fn begin_leaf(&mut self) {}
fn end_leaf(&mut self, cx: &<T::Summary as Summary>::Context) {
self.tree.append(
Sequence(Arc::new(Node::Leaf {
saved_id: SavedId::default(),
summary: mem::take(&mut self.leaf_summary),
items: mem::take(&mut self.leaf_items),
item_summaries: mem::take(&mut self.leaf_item_summaries),
})),
cx,
);
}
fn push_item(&mut self, item: &T, summary: &T::Summary, cx: &<T::Summary as Summary>::Context) {
self.leaf_items.push(item.clone());
self.leaf_item_summaries.push(summary.clone());
Summary::add_summary(&mut self.leaf_summary, summary, cx);
}
fn push_tree(
&mut self,
tree: &ChildTree<T>,
summary: &T::Summary,
cx: &<T::Summary as Summary>::Context,
) {
self.tree.append_internal(tree.clone(), summary.clone(), cx);
}
}
impl<'a, T: Item, D> SeekAggregate<'a, T> for SummarySeekAggregate<D>
where
D: Dimension<'a, T::Summary>,
{
fn begin_leaf(&mut self) {}
fn end_leaf(&mut self, _: &<T::Summary as Summary>::Context) {}
fn push_item(&mut self, _: &T, summary: &'a T::Summary, cx: &<T::Summary as Summary>::Context) {
self.0.add_summary(summary, cx);
}
fn push_tree(
&mut self,
_: &ChildTree<T>,
summary: &'a T::Summary,
cx: &<T::Summary as Summary>::Context,
) {
self.0.add_summary(summary, cx);
}
}

View file

@ -0,0 +1,594 @@
use super::{
Bias, Dimension, Edit, Item, KeyedItem, KvStore, SavedId, SeekTarget, Sequence, Summary,
};
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::{
cmp::Ordering,
collections::BTreeMap,
fmt::{self, Debug},
ops::{Bound, RangeBounds},
};
#[derive(Clone, PartialEq, Eq)]
pub struct Map<K, V>(Sequence<MapEntry<K, V>>)
where
K: Clone + Debug + Ord,
V: Clone + Debug;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct MapEntry<K, V> {
key: K,
value: V,
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct MapKey<K>(Option<K>);
impl<K> Default for MapKey<K> {
fn default() -> Self {
Self(None)
}
}
#[derive(Clone, Debug)]
pub struct MapKeyRef<'a, K>(Option<&'a K>);
impl<K> Default for MapKeyRef<'_, K> {
fn default() -> Self {
Self(None)
}
}
#[derive(Clone)]
pub struct Set<K>(Map<K, ()>)
where
K: Clone + Debug + Ord;
impl<K, V> Map<K, V>
where
K: Clone + Debug + Ord,
V: Clone + Debug,
{
pub fn ptr_eq(this: &Self, other: &Self) -> bool {
Sequence::ptr_eq(&this.0, &other.0)
}
pub fn from_ordered_entries(entries: impl IntoIterator<Item = (K, V)>) -> Self {
let tree = Sequence::from_iter(
entries
.into_iter()
.map(|(key, value)| MapEntry { key, value }),
&(),
);
Self(tree)
}
pub async fn load_root(id: SavedId, kv: &dyn KvStore) -> Result<Self>
where
K: Serialize + for<'de> Deserialize<'de>,
V: Serialize + for<'de> Deserialize<'de>,
{
Ok(Self(Sequence::load_root(id, kv).await?))
}
pub async fn load_all(id: SavedId, kv: &dyn KvStore) -> Result<Self>
where
K: Serialize + for<'de> Deserialize<'de>,
V: Serialize + for<'de> Deserialize<'de>,
{
let mut sequence = Sequence::load_root(id, kv).await?;
sequence.load(kv, &(), |_| true).await?;
Ok(Self(sequence))
}
pub async fn load(&mut self, key: &K, kv: &dyn KvStore) -> Result<Option<&V>>
where
K: Serialize + for<'de> Deserialize<'de>,
V: Serialize + for<'de> Deserialize<'de>,
{
self.0
.load(kv, &(), |probe| {
let key_range = (
Bound::Excluded(probe.start.0.as_ref()),
Bound::Included(probe.summary.0.as_ref()),
);
key_range.contains(&Some(key))
})
.await?;
Ok(self.get(key))
}
pub async fn load_from(
&mut self,
start: &K,
kv: &dyn KvStore,
) -> Result<impl Iterator<Item = (&K, &V)>>
where
K: Serialize + for<'de> Deserialize<'de>,
V: Serialize + for<'de> Deserialize<'de>,
{
self.0
.load(kv, &(), |probe| {
probe.start.0.as_ref() >= Some(&start) || probe.summary.0.as_ref() >= Some(&start)
})
.await?;
Ok(self.iter_from(start))
}
pub async fn store(&mut self, key: K, value: V, kv: &dyn KvStore) -> Result<()>
where
K: Serialize + for<'de> Deserialize<'de>,
V: Serialize + for<'de> Deserialize<'de>,
{
self.0
.load(kv, &(), |probe| {
let key_range = (
Bound::Excluded(probe.start.0.as_ref()),
Bound::Included(probe.summary.0.as_ref()),
);
key_range.contains(&Some(&key))
})
.await?;
self.insert(key, value);
Ok(())
}
pub async fn save(&self, kv: &dyn KvStore) -> Result<SavedId>
where
K: Serialize + for<'de> Deserialize<'de>,
V: Serialize + for<'de> Deserialize<'de>,
{
self.0.save(kv).await
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn get<'a>(&self, key: &'a K) -> Option<&V> {
let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
cursor.seek(&MapKeyRef(Some(key)), Bias::Left, &());
if let Some(item) = cursor.item() {
if key == &item.key {
Some(&item.value)
} else {
None
}
} else {
None
}
}
pub fn contains_key<'a>(&self, key: &'a K) -> bool {
self.get(key).is_some()
}
pub fn insert(&mut self, key: K, value: V) {
self.0.insert_or_replace(MapEntry { key, value }, &());
}
pub fn remove(&mut self, key: &K) -> Option<V> {
let mut removed = None;
let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
let key = MapKeyRef(Some(key));
let mut new_tree = cursor.slice(&key, Bias::Left, &());
if key.seek_cmp(&cursor.end(&()), &()) == Ordering::Equal {
removed = Some(cursor.item().unwrap().value.clone());
cursor.next(&());
}
new_tree.append(cursor.suffix(&()), &());
drop(cursor);
self.0 = new_tree;
removed
}
pub fn remove_range(&mut self, start: &impl MapSeekTarget<K>, end: &impl MapSeekTarget<K>) {
let start = MapSeekTargetAdaptor(start);
let end = MapSeekTargetAdaptor(end);
let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
let mut new_tree = cursor.slice(&start, Bias::Left, &());
cursor.seek(&end, Bias::Left, &());
new_tree.append(cursor.suffix(&()), &());
drop(cursor);
self.0 = new_tree;
}
/// Returns the key-value pair with the greatest key less than or equal to the given key.
pub fn closest(&self, key: &K) -> Option<(&K, &V)> {
let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
let key = MapKeyRef(Some(key));
cursor.seek(&key, Bias::Right, &());
cursor.prev(&());
cursor.item().map(|item| (&item.key, &item.value))
}
pub fn iter_from<'a>(&self, from: &'a K) -> impl Iterator<Item = (&K, &V)> {
let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
let from_key = MapKeyRef(Some(from));
cursor.seek(&from_key, Bias::Left, &());
cursor
.into_iter()
.map(|map_entry| (&map_entry.key, &map_entry.value))
}
pub fn update<F, T>(&mut self, key: &K, f: F) -> Option<T>
where
F: FnOnce(&mut V) -> T,
{
let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
let key = MapKeyRef(Some(key));
let mut new_tree = cursor.slice(&key, Bias::Left, &());
let mut result = None;
if key.seek_cmp(&cursor.end(&()), &()) == Ordering::Equal {
let mut updated = cursor.item().unwrap().clone();
result = Some(f(&mut updated.value));
new_tree.push(updated, &());
cursor.next(&());
}
new_tree.append(cursor.suffix(&()), &());
drop(cursor);
self.0 = new_tree;
result
}
pub fn retain<F: FnMut(&K, &V) -> bool>(&mut self, mut predicate: F) {
let mut new_map = Sequence::<MapEntry<K, V>>::default();
let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
cursor.next(&());
while let Some(item) = cursor.item() {
if predicate(&item.key, &item.value) {
new_map.push(item.clone(), &());
}
cursor.next(&());
}
drop(cursor);
self.0 = new_map;
}
pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> + '_ {
self.0.iter().map(|entry| (&entry.key, &entry.value))
}
pub fn values(&self) -> impl Iterator<Item = &V> + '_ {
self.0.iter().map(|entry| &entry.value)
}
pub fn insert_tree(&mut self, other: Map<K, V>) {
let edits = other
.iter()
.map(|(key, value)| {
Edit::Insert(MapEntry {
key: key.to_owned(),
value: value.to_owned(),
})
})
.collect();
self.0.edit(edits, &());
}
}
impl<K, V> Into<BTreeMap<K, V>> for &Map<K, V>
where
K: Clone + Debug + Ord,
V: Clone + Debug,
{
fn into(self) -> BTreeMap<K, V> {
self.iter()
.map(|(replica_id, count)| (replica_id.clone(), count.clone()))
.collect()
}
}
impl<K, V> From<&BTreeMap<K, V>> for Map<K, V>
where
K: Clone + Debug + Ord,
V: Clone + Debug,
{
fn from(value: &BTreeMap<K, V>) -> Self {
Map::from_ordered_entries(value.into_iter().map(|(k, v)| (k.clone(), v.clone())))
}
}
impl<K, V> Debug for Map<K, V>
where
K: Clone + Debug + Ord,
V: Clone + Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_map().entries(self.iter()).finish()
}
}
impl<T> Debug for Set<T>
where
T: Clone + Debug + Ord,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_set().entries(self.iter()).finish()
}
}
#[derive(Debug)]
struct MapSeekTargetAdaptor<'a, T>(&'a T);
impl<'a, K: Debug + Clone + Ord, T: MapSeekTarget<K>> SeekTarget<'a, MapKey<K>, MapKeyRef<'a, K>>
for MapSeekTargetAdaptor<'_, T>
{
fn seek_cmp(&self, cursor_location: &MapKeyRef<K>, _: &()) -> Ordering {
if let Some(key) = &cursor_location.0 {
MapSeekTarget::cmp_cursor(self.0, key)
} else {
Ordering::Greater
}
}
}
pub trait MapSeekTarget<K>: Debug {
fn cmp_cursor(&self, cursor_location: &K) -> Ordering;
}
impl<K: Debug + Ord> MapSeekTarget<K> for K {
fn cmp_cursor(&self, cursor_location: &K) -> Ordering {
self.cmp(cursor_location)
}
}
impl<K, V> Default for Map<K, V>
where
K: Clone + Debug + Ord,
V: Clone + Debug,
{
fn default() -> Self {
Self(Default::default())
}
}
impl<K, V> Item for MapEntry<K, V>
where
K: Clone + Debug + Ord,
V: Clone,
{
type Summary = MapKey<K>;
fn summary(&self) -> Self::Summary {
self.key()
}
}
impl<K, V> KeyedItem for MapEntry<K, V>
where
K: Clone + Debug + Ord,
V: Clone,
{
type Key = MapKey<K>;
fn key(&self) -> Self::Key {
MapKey(Some(self.key.clone()))
}
}
impl<K> Summary for MapKey<K>
where
K: Clone + Debug,
{
type Context = ();
fn add_summary(&mut self, summary: &Self, _: &()) {
*self = summary.clone()
}
}
impl<'a, K> Dimension<'a, MapKey<K>> for MapKeyRef<'a, K>
where
K: Clone + Debug + Ord,
{
fn add_summary(&mut self, summary: &'a MapKey<K>, _: &()) {
self.0 = summary.0.as_ref();
}
}
impl<'a, K> SeekTarget<'a, MapKey<K>, MapKeyRef<'a, K>> for MapKeyRef<'_, K>
where
K: Clone + Debug + Ord,
{
fn seek_cmp(&self, cursor_location: &MapKeyRef<K>, _: &()) -> Ordering {
Ord::cmp(&self.0, &cursor_location.0)
}
}
impl<K> Default for Set<K>
where
K: Clone + Debug + Ord,
{
fn default() -> Self {
Self(Default::default())
}
}
impl<K> Set<K>
where
K: Clone + Debug + Ord,
{
pub fn from_ordered_entries(entries: impl IntoIterator<Item = K>) -> Self {
Self(Map::from_ordered_entries(
entries.into_iter().map(|key| (key, ())),
))
}
pub fn insert(&mut self, key: K) {
self.0.insert(key, ());
}
pub fn contains(&self, key: &K) -> bool {
self.0.get(key).is_some()
}
pub fn iter(&self) -> impl Iterator<Item = &K> + '_ {
self.0.iter().map(|(k, _)| k)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic() {
let mut map = Map::default();
assert_eq!(map.iter().collect::<Vec<_>>(), vec![]);
map.insert(3, "c");
assert_eq!(map.get(&3), Some(&"c"));
assert_eq!(map.iter().collect::<Vec<_>>(), vec![(&3, &"c")]);
map.insert(1, "a");
assert_eq!(map.get(&1), Some(&"a"));
assert_eq!(map.iter().collect::<Vec<_>>(), vec![(&1, &"a"), (&3, &"c")]);
map.insert(2, "b");
assert_eq!(map.get(&2), Some(&"b"));
assert_eq!(map.get(&1), Some(&"a"));
assert_eq!(map.get(&3), Some(&"c"));
assert_eq!(
map.iter().collect::<Vec<_>>(),
vec![(&1, &"a"), (&2, &"b"), (&3, &"c")]
);
assert_eq!(map.closest(&0), None);
assert_eq!(map.closest(&1), Some((&1, &"a")));
assert_eq!(map.closest(&10), Some((&3, &"c")));
map.remove(&2);
assert_eq!(map.get(&2), None);
assert_eq!(map.iter().collect::<Vec<_>>(), vec![(&1, &"a"), (&3, &"c")]);
assert_eq!(map.closest(&2), Some((&1, &"a")));
map.remove(&3);
assert_eq!(map.get(&3), None);
assert_eq!(map.iter().collect::<Vec<_>>(), vec![(&1, &"a")]);
map.remove(&1);
assert_eq!(map.get(&1), None);
assert_eq!(map.iter().collect::<Vec<_>>(), vec![]);
map.insert(4, "d");
map.insert(5, "e");
map.insert(6, "f");
map.retain(|key, _| *key % 2 == 0);
assert_eq!(map.iter().collect::<Vec<_>>(), vec![(&4, &"d"), (&6, &"f")]);
}
#[test]
fn test_iter_from() {
let mut map = Map::default();
map.insert("a", 1);
map.insert("b", 2);
map.insert("baa", 3);
map.insert("baaab", 4);
map.insert("c", 5);
let result = map
.iter_from(&"ba")
.take_while(|(key, _)| key.starts_with(&"ba"))
.collect::<Vec<_>>();
assert_eq!(result.len(), 2);
assert!(result.iter().find(|(k, _)| k == &&"baa").is_some());
assert!(result.iter().find(|(k, _)| k == &&"baaab").is_some());
let result = map
.iter_from(&"c")
.take_while(|(key, _)| key.starts_with(&"c"))
.collect::<Vec<_>>();
assert_eq!(result.len(), 1);
assert!(result.iter().find(|(k, _)| k == &&"c").is_some());
}
#[test]
fn test_insert_tree() {
let mut map = Map::default();
map.insert("a", 1);
map.insert("b", 2);
map.insert("c", 3);
let mut other = Map::default();
other.insert("a", 2);
other.insert("b", 2);
other.insert("d", 4);
map.insert_tree(other);
assert_eq!(map.iter().count(), 4);
assert_eq!(map.get(&"a"), Some(&2));
assert_eq!(map.get(&"b"), Some(&2));
assert_eq!(map.get(&"c"), Some(&3));
assert_eq!(map.get(&"d"), Some(&4));
}
#[test]
fn test_remove_between_and_path_successor() {
use std::path::{Path, PathBuf};
#[derive(Debug)]
pub struct PathDescendants<'a>(&'a Path);
impl MapSeekTarget<PathBuf> for PathDescendants<'_> {
fn cmp_cursor(&self, key: &PathBuf) -> Ordering {
if key.starts_with(&self.0) {
Ordering::Greater
} else {
self.0.cmp(key)
}
}
}
let mut map = Map::default();
map.insert(PathBuf::from("a"), 1);
map.insert(PathBuf::from("a/a"), 1);
map.insert(PathBuf::from("b"), 2);
map.insert(PathBuf::from("b/a/a"), 3);
map.insert(PathBuf::from("b/a/a/a/b"), 4);
map.insert(PathBuf::from("c"), 5);
map.insert(PathBuf::from("c/a"), 6);
map.remove_range(
&PathBuf::from("b/a"),
&PathDescendants(&PathBuf::from("b/a")),
);
assert_eq!(map.get(&PathBuf::from("a")), Some(&1));
assert_eq!(map.get(&PathBuf::from("a/a")), Some(&1));
assert_eq!(map.get(&PathBuf::from("b")), Some(&2));
assert_eq!(map.get(&PathBuf::from("b/a/a")), None);
assert_eq!(map.get(&PathBuf::from("b/a/a/a/b")), None);
assert_eq!(map.get(&PathBuf::from("c")), Some(&5));
assert_eq!(map.get(&PathBuf::from("c/a")), Some(&6));
map.remove_range(&PathBuf::from("c"), &PathDescendants(&PathBuf::from("c")));
assert_eq!(map.get(&PathBuf::from("a")), Some(&1));
assert_eq!(map.get(&PathBuf::from("a/a")), Some(&1));
assert_eq!(map.get(&PathBuf::from("b")), Some(&2));
assert_eq!(map.get(&PathBuf::from("c")), None);
assert_eq!(map.get(&PathBuf::from("c/a")), None);
map.remove_range(&PathBuf::from("a"), &PathDescendants(&PathBuf::from("a")));
assert_eq!(map.get(&PathBuf::from("a")), None);
assert_eq!(map.get(&PathBuf::from("a/a")), None);
assert_eq!(map.get(&PathBuf::from("b")), Some(&2));
map.remove_range(&PathBuf::from("b"), &PathDescendants(&PathBuf::from("b")));
assert_eq!(map.get(&PathBuf::from("b")), None);
}
}

2841
crates/crdb/src/crdb.rs Normal file

File diff suppressed because it is too large Load diff

127
crates/crdb/src/dense_id.rs Normal file
View file

@ -0,0 +1,127 @@
use crate::btree;
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use std::iter;
lazy_static! {
static ref MIN: DenseId = DenseId::min();
static ref MAX: DenseId = DenseId::max();
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct DenseId(SmallVec<[u64; 4]>);
impl DenseId {
pub fn min() -> Self {
Self(smallvec![u64::MIN])
}
pub fn max() -> Self {
Self(smallvec![u64::MAX])
}
pub fn min_ref() -> &'static Self {
&*MIN
}
pub fn max_ref() -> &'static Self {
&*MAX
}
pub fn assign(&mut self, other: &Self) {
self.0.resize(other.0.len(), 0);
self.0.copy_from_slice(&other.0);
}
pub fn between(lhs: &Self, rhs: &Self) -> Self {
let lhs = lhs.0.iter().copied().chain(iter::repeat(u64::MIN));
let rhs = rhs.0.iter().copied().chain(iter::repeat(u64::MAX));
let mut location = SmallVec::new();
for (lhs, rhs) in lhs.zip(rhs) {
let mid = lhs + ((rhs.saturating_sub(lhs)) >> 48);
location.push(mid);
if mid > lhs {
break;
}
}
Self(location)
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl Default for DenseId {
fn default() -> Self {
Self::min()
}
}
impl btree::Item for DenseId {
type Summary = DenseId;
fn summary(&self) -> Self::Summary {
self.clone()
}
}
impl btree::KeyedItem for DenseId {
type Key = DenseId;
fn key(&self) -> Self::Key {
self.clone()
}
}
impl btree::Summary for DenseId {
type Context = ();
fn add_summary(&mut self, summary: &Self, _: &()) {
self.assign(summary);
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::prelude::*;
use std::mem;
#[gpui::test(iterations = 100)]
fn test_dense_id(mut rng: StdRng) {
let mut lhs = Default::default();
let mut rhs = Default::default();
while lhs == rhs {
lhs = DenseId(
(0..rng.gen_range(1..=5))
.map(|_| rng.gen_range(0..=100))
.collect(),
);
rhs = DenseId(
(0..rng.gen_range(1..=5))
.map(|_| rng.gen_range(0..=100))
.collect(),
);
}
if lhs > rhs {
mem::swap(&mut lhs, &mut rhs);
}
let middle = DenseId::between(&lhs, &rhs);
assert!(middle > lhs);
assert!(middle < rhs);
for ix in 0..middle.0.len() - 1 {
assert!(
middle.0[ix] == *lhs.0.get(ix).unwrap_or(&0)
|| middle.0[ix] == *rhs.0.get(ix).unwrap_or(&0)
);
}
}
}

699
crates/crdb/src/history.rs Normal file
View file

@ -0,0 +1,699 @@
use std::{cmp::Ordering, iter, ops::RangeBounds};
use crate::{
btree::{self, Bias, KvStore, SavedId},
messages::Operation,
OperationCount, OperationId, ReplicaId, RevisionId,
};
use anyhow::{anyhow, Result};
use collections::{BTreeSet, Bound, HashMap, HashSet, VecDeque};
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
#[derive(Serialize, Deserialize)]
pub struct SavedHistory {
operations: SavedId,
next_operation_id: OperationId,
max_operation_ids: SavedId,
deferred_operations: SavedId,
}
#[derive(Clone, Debug)]
pub struct History {
operations: btree::Map<OperationId, Operation>,
next_operation_id: OperationId,
max_operation_ids: btree::Map<ReplicaId, OperationCount>,
deferred_operations: btree::Sequence<DeferredOperation>,
}
impl History {
pub fn new(replica_id: ReplicaId) -> Self {
Self {
operations: Default::default(),
next_operation_id: OperationId::new(replica_id),
max_operation_ids: Default::default(),
deferred_operations: Default::default(),
}
}
pub fn ptr_eq(&self, other: &Self) -> bool {
btree::Map::ptr_eq(&self.operations, &other.operations)
&& btree::Map::ptr_eq(&self.max_operation_ids, &other.max_operation_ids)
&& btree::Sequence::ptr_eq(&self.deferred_operations, &other.deferred_operations)
&& self.next_operation_id == other.next_operation_id
}
pub async fn load(saved_history: SavedHistory, kv: &dyn KvStore) -> Result<Self> {
Ok(Self {
operations: btree::Map::load_root(saved_history.operations, kv).await?,
next_operation_id: saved_history.next_operation_id,
max_operation_ids: btree::Map::load_all(saved_history.max_operation_ids, kv).await?,
deferred_operations: btree::Sequence::load_root(saved_history.deferred_operations, kv)
.await?,
})
}
pub async fn save(&self, kv: &dyn KvStore) -> Result<SavedHistory> {
Ok(SavedHistory {
operations: self.operations.save(kv).await?,
next_operation_id: self.next_operation_id,
max_operation_ids: self.max_operation_ids.save(kv).await?,
deferred_operations: self.deferred_operations.save(kv).await?,
})
}
pub fn replica_id(&self) -> ReplicaId {
self.next_operation_id.replica_id
}
pub fn next_operation_id(&mut self) -> OperationId {
self.next_operation_id.tick()
}
pub fn max_operation_ids(&self) -> &btree::Map<ReplicaId, OperationCount> {
&self.max_operation_ids
}
pub async fn insert(
&mut self,
operation: Operation,
kv: &dyn KvStore,
) -> Result<SmallVec<[Operation; 1]>> {
let op_id = operation.id();
self.next_operation_id.observe(op_id);
if self
.max_operation_ids
.load(&op_id.replica_id, kv)
.await?
.copied()
< Some(op_id.operation_count)
{
self.max_operation_ids
.insert(op_id.replica_id, op_id.operation_count);
}
self.operations.store(op_id, operation, kv).await?;
self.deferred_operations
.load(kv, &(), |probe| {
let key_range = (
Bound::Excluded(*probe.start),
Bound::Included(*probe.summary),
);
key_range.contains(&op_id)
})
.await?;
let mut cursor = self.deferred_operations.cursor::<OperationId>();
let mut remaining = cursor.slice(&op_id, Bias::Left, &());
let mut flushed = SmallVec::new();
flushed.extend(
cursor
.slice(&op_id, Bias::Right, &())
.iter()
.map(|deferred| deferred.operation.clone()),
);
remaining.append(cursor.suffix(&()), &());
drop(cursor);
self.deferred_operations = remaining;
Ok(flushed)
}
pub fn insert_local(&mut self, operation: Operation) {
let id = operation.id();
self.next_operation_id.observe(operation.id());
self.max_operation_ids
.insert(id.replica_id, id.operation_count);
self.operations.insert(id, operation);
}
pub async fn defer(&mut self, operation: Operation, kv: &dyn KvStore) -> Result<()> {
for parent in operation.parent().iter() {
self.deferred_operations
.load(kv, &(), |probe| {
let key_range = (
Bound::Excluded(*probe.start),
Bound::Included(*probe.summary),
);
key_range.contains(&operation.id())
})
.await?;
self.deferred_operations.insert_or_replace(
DeferredOperation {
parent: *parent,
operation: operation.clone(),
},
&(),
);
}
Ok(())
}
pub async fn can_apply(&mut self, operation: &Operation, kv: &dyn KvStore) -> Result<bool> {
for parent in operation.parent().iter() {
if self.operations.load(parent, kv).await?.is_none() {
return Ok(false);
}
}
Ok(true)
}
pub async fn has_applied(&mut self, operation: &Operation, kv: &dyn KvStore) -> Result<bool> {
Ok(self.operations.load(&operation.id(), kv).await?.is_some())
}
pub async fn operation(
&mut self,
id: OperationId,
kv: &dyn KvStore,
) -> Result<Option<&Operation>> {
self.operations.load(&id, kv).await
}
pub async fn operations_since(
&mut self,
version: &btree::Map<ReplicaId, OperationCount>,
kv: &dyn KvStore,
) -> Result<Vec<Operation>> {
let mut new_operations = Vec::new();
for (replica_id, end_op_count) in self.max_operation_ids.iter() {
let start_op = OperationId {
replica_id: *replica_id,
operation_count: version
.get(&replica_id)
.map(|count| OperationCount(count.0 + 1))
.unwrap_or_default(),
};
let end_op = OperationId {
replica_id: *replica_id,
operation_count: *end_op_count,
};
new_operations.extend(
self.operations
.load_from(&start_op, kv)
.await?
.take_while(|(op_id, _)| **op_id <= end_op)
.map(|(_, op)| op.clone()),
);
}
Ok(new_operations)
}
pub async fn rewind(&mut self, revision_id: &RevisionId, kv: &dyn KvStore) -> Result<Rewind> {
let mut frontier = VecDeque::new();
let mut traversed = HashMap::default();
for operation_id in revision_id.iter() {
let parent_revision = self
.operation(*operation_id, kv)
.await?
.ok_or_else(|| anyhow!("operation {:?} not found", operation_id))?
.parent()
.clone();
traversed
.entry(parent_revision.clone())
.or_insert(BTreeSet::default())
.insert((revision_id.clone(), *operation_id));
frontier.push_back(Frontier {
source: *operation_id,
revision: parent_revision,
});
}
Ok(Rewind {
history: self,
frontier,
traversed,
ancestors: Default::default(),
reachable_len: revision_id.len(),
start: revision_id.clone(),
})
}
}
struct Frontier {
source: OperationId,
revision: RevisionId,
}
pub struct Rewind<'a> {
history: &'a mut History,
frontier: VecDeque<Frontier>,
traversed: HashMap<RevisionId, BTreeSet<(RevisionId, OperationId)>>,
ancestors: HashMap<RevisionId, HashSet<OperationId>>,
reachable_len: usize,
start: RevisionId,
}
impl Rewind<'_> {
pub async fn next(&mut self, kv: &dyn KvStore) -> Result<Option<RevisionId>> {
while let Some(frontier) = self.frontier.pop_front() {
let reachable_from = self.ancestors.entry(frontier.revision.clone()).or_default();
reachable_from.insert(frontier.source);
if reachable_from.len() == self.reachable_len {
self.reachable_len = frontier.revision.len();
self.frontier.clear();
self.ancestors.clear();
self.start = frontier.revision.clone();
for operation_id in frontier.revision.iter() {
let parent_revision = self
.history
.operation(*operation_id, kv)
.await?
.expect("operation must exist")
.parent()
.clone();
self.traversed
.entry(parent_revision.clone())
.or_default()
.insert((frontier.revision.clone(), *operation_id));
self.frontier.push_back(Frontier {
source: *operation_id,
revision: parent_revision,
});
}
return Ok(Some(frontier.revision));
} else {
for operation_id in frontier.revision.iter() {
let parent_revision = self
.history
.operation(*operation_id, kv)
.await?
.expect("operation must exist")
.parent()
.clone();
self.traversed
.entry(parent_revision.clone())
.or_default()
.insert((frontier.revision.clone(), *operation_id));
self.frontier.push_back(Frontier {
source: frontier.source,
revision: parent_revision,
});
}
}
}
Ok(None)
}
pub fn replay(mut self) -> impl Iterator<Item = ReplayOperation> {
let mut stack = VecDeque::new();
if let Some(children) = self.traversed.remove(&self.start) {
for (child_revision_id, operation_id) in children {
stack.push_back(ReplayOperation {
parent_revision_id: self.start.clone(),
target_revision_id: child_revision_id.clone(),
operation_id,
});
}
}
iter::from_fn(move || {
let entry = stack.pop_front()?;
if let Some(children) = self.traversed.remove(&entry.target_revision_id) {
for (child_revision, operation_id) in children {
stack.push_back(ReplayOperation {
parent_revision_id: entry.target_revision_id.clone(),
target_revision_id: child_revision.clone(),
operation_id,
});
}
}
Some(entry)
})
}
}
#[derive(Clone, Eq, PartialEq)]
pub struct ReplayOperation {
pub parent_revision_id: RevisionId,
pub target_revision_id: RevisionId,
pub operation_id: OperationId,
}
impl std::fmt::Debug for ReplayOperation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{:?} -> {:?} via {:?}",
self.parent_revision_id, self.target_revision_id, self.operation_id
)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct DeferredOperation {
parent: OperationId,
operation: Operation,
}
impl PartialEq for DeferredOperation {
fn eq(&self, other: &Self) -> bool {
self.parent == other.parent && self.operation.id() == other.operation.id()
}
}
impl Eq for DeferredOperation {}
impl PartialOrd for DeferredOperation {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for DeferredOperation {
fn cmp(&self, other: &Self) -> Ordering {
self.parent
.cmp(&other.parent)
.then_with(|| self.operation.id().cmp(&other.operation.id()))
}
}
impl btree::Item for DeferredOperation {
type Summary = OperationId;
fn summary(&self) -> Self::Summary {
self.parent
}
}
impl btree::KeyedItem for DeferredOperation {
type Key = (OperationId, OperationId);
fn key(&self) -> Self::Key {
(self.parent, self.operation.id())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::btree::tests::InMemoryKv;
#[gpui::test]
async fn test_rewind() {
let kv = InMemoryKv::default();
let mut history = History::new(ReplicaId(0));
let op1 = insert_operation(&[], &mut history, &kv).await;
let op2 = insert_operation(&[op1.id()], &mut history, &kv).await;
let op3 = insert_operation(&[op1.id()], &mut history, &kv).await;
let op4 = insert_operation(&[op2.id(), op3.id()], &mut history, &kv).await;
let op5 = insert_operation(&[op4.id()], &mut history, &kv).await;
let op6 = insert_operation(&[op4.id()], &mut history, &kv).await;
let op7 = insert_operation(&[op2.id()], &mut history, &kv).await;
let op8 = insert_operation(&[op5.id()], &mut history, &kv).await;
let op9 = insert_operation(&[op5.id()], &mut history, &kv).await;
let op10 = insert_operation(&[op8.id()], &mut history, &kv).await;
let op11 = insert_operation(&[op9.id(), op10.id()], &mut history, &kv).await;
assert_eq!(
rewind(&[op4.id()], &mut history, &kv).await,
&[
(
RevisionId::from([op2.id(), op3.id()].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
target_revision_id: RevisionId::from([op4.id()].as_slice()),
operation_id: op4.id(),
}]
),
(
RevisionId::from([op1.id()].as_slice()),
vec![
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op2.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op3.id(),
}
]
),
(
RevisionId::from([].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([].as_slice()),
target_revision_id: RevisionId::from([op1.id()].as_slice()),
operation_id: op1.id(),
}]
),
]
);
assert_eq!(
rewind(&[op6.id()], &mut history, &kv).await,
&[
(
RevisionId::from([op4.id()].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([op4.id()].as_slice()),
target_revision_id: RevisionId::from([op6.id()].as_slice()),
operation_id: op6.id(),
}]
),
(
RevisionId::from([op2.id(), op3.id()].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
target_revision_id: RevisionId::from([op4.id()].as_slice()),
operation_id: op4.id(),
}]
),
(
RevisionId::from([op1.id()].as_slice()),
vec![
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op2.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op3.id(),
}
]
),
(
RevisionId::from([].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([].as_slice()),
target_revision_id: RevisionId::from([op1.id()].as_slice()),
operation_id: op1.id(),
}]
),
]
);
assert_eq!(
rewind(&[op5.id(), op6.id()], &mut history, &kv).await,
&[
(
RevisionId::from([op4.id()].as_slice()),
vec![
ReplayOperation {
parent_revision_id: RevisionId::from([op4.id()].as_slice()),
target_revision_id: RevisionId::from([op5.id(), op6.id()].as_slice()),
operation_id: op5.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op4.id()].as_slice()),
target_revision_id: RevisionId::from([op5.id(), op6.id()].as_slice()),
operation_id: op6.id(),
}
]
),
(
RevisionId::from([op2.id(), op3.id()].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
target_revision_id: RevisionId::from([op4.id()].as_slice()),
operation_id: op4.id(),
}]
),
(
RevisionId::from([op1.id()].as_slice()),
vec![
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op2.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op3.id(),
}
]
),
(
RevisionId::from([].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([].as_slice()),
target_revision_id: RevisionId::from([op1.id()].as_slice()),
operation_id: op1.id(),
}]
),
]
);
assert_eq!(
rewind(&[op4.id(), op7.id()], &mut history, &kv).await,
&[
(
RevisionId::from([op1.id()].as_slice()),
vec![
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id()].as_slice()),
operation_id: op2.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op2.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op3.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op2.id()].as_slice()),
target_revision_id: RevisionId::from([op4.id(), op7.id()].as_slice()),
operation_id: op7.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
target_revision_id: RevisionId::from([op4.id(), op7.id()].as_slice()),
operation_id: op4.id(),
},
]
),
(
RevisionId::from([].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([].as_slice()),
target_revision_id: RevisionId::from([op1.id()].as_slice()),
operation_id: op1.id(),
}]
),
]
);
assert_eq!(
rewind(&[op11.id()], &mut history, &kv).await,
&[
(
RevisionId::from([op9.id(), op10.id()].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([op9.id(), op10.id()].as_slice()),
target_revision_id: RevisionId::from([op11.id()].as_slice()),
operation_id: op11.id(),
}]
),
(
RevisionId::from([op5.id()].as_slice()),
vec![
ReplayOperation {
parent_revision_id: RevisionId::from([op5.id()].as_slice()),
target_revision_id: RevisionId::from([op8.id()].as_slice()),
operation_id: op8.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op5.id()].as_slice()),
target_revision_id: RevisionId::from([op9.id(), op10.id()].as_slice()),
operation_id: op9.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op8.id()].as_slice()),
target_revision_id: RevisionId::from([op9.id(), op10.id()].as_slice()),
operation_id: op10.id(),
}
]
),
(
RevisionId::from([op4.id()].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([op4.id()].as_slice()),
target_revision_id: RevisionId::from([op5.id()].as_slice()),
operation_id: op5.id(),
}]
),
(
RevisionId::from([op2.id(), op3.id()].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
target_revision_id: RevisionId::from([op4.id()].as_slice()),
operation_id: op4.id(),
}]
),
(
RevisionId::from([op1.id()].as_slice()),
vec![
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op2.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op3.id(),
}
]
),
(
RevisionId::from([].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([].as_slice()),
target_revision_id: RevisionId::from([op1.id()].as_slice()),
operation_id: op1.id(),
}]
),
]
);
}
async fn insert_operation(
parent: &[OperationId],
history: &mut History,
kv: &dyn KvStore,
) -> Operation {
let operation = Operation::CreateBranch(crate::operations::CreateBranch {
id: history.next_operation_id(),
parent: parent.into(),
name: "1".into(),
});
history.insert(operation.clone(), kv).await.unwrap();
operation
}
async fn rewind(
revision_id: &[OperationId],
history: &mut History,
kv: &dyn KvStore,
) -> Vec<(RevisionId, Vec<ReplayOperation>)> {
let mut rewind = history.rewind(&revision_id.into(), kv).await.unwrap();
let mut results = Vec::new();
let mut prev_replay = Vec::new();
let mut ix = 0;
while let Some(ancestor_id) = rewind.next(kv).await.unwrap() {
let mut replay = rewind.replay().collect::<Vec<_>>();
let suffix_start = replay.len() - prev_replay.len();
assert_eq!(prev_replay, &replay[suffix_start..]);
prev_replay = replay.clone();
drop(replay.drain(suffix_start..));
results.push((ancestor_id, replay));
rewind = history.rewind(&revision_id.into(), kv).await.unwrap();
ix += 1;
for _ in 0..ix {
rewind.next(kv).await.unwrap();
}
}
results
}
}

182
crates/crdb/src/messages.rs Normal file
View file

@ -0,0 +1,182 @@
use crate::{
operations::{CreateBranch, CreateDocument, Edit},
BranchId, OperationCount, OperationId, ReplicaId, RepoId, Request, RevisionId, RoomCredentials,
};
use collections::BTreeMap;
use serde::{Deserialize, Serialize};
use std::{any::Any, sync::Arc};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum RequestEnvelope {
PublishRepo(PublishRepo),
CloneRepo(CloneRepo),
ReconnectToRepo(ReconnectToRepo),
SyncRepo(SyncRepo),
PublishOperations(PublishOperations),
}
impl RequestEnvelope {
pub fn unwrap(self) -> Box<dyn Any> {
match self {
RequestEnvelope::PublishRepo(request) => Box::new(request),
RequestEnvelope::CloneRepo(request) => Box::new(request),
RequestEnvelope::ReconnectToRepo(request) => Box::new(request),
RequestEnvelope::SyncRepo(request) => Box::new(request),
RequestEnvelope::PublishOperations(request) => Box::new(request),
}
}
}
impl From<Operation> for MessageEnvelope {
fn from(value: Operation) -> Self {
Self::Operation(value)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PublishRepo {
pub id: RepoId,
pub name: Arc<str>,
}
impl Request for PublishRepo {
type Response = PublishRepoResponse;
}
impl Into<RequestEnvelope> for PublishRepo {
fn into(self) -> RequestEnvelope {
RequestEnvelope::PublishRepo(self)
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct PublishRepoResponse {
pub credentials: RoomCredentials,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CloneRepo {
pub name: Arc<str>,
}
impl Request for CloneRepo {
type Response = CloneRepoResponse;
}
impl Into<RequestEnvelope> for CloneRepo {
fn into(self) -> RequestEnvelope {
RequestEnvelope::CloneRepo(self)
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct CloneRepoResponse {
pub repo_id: RepoId,
pub replica_id: ReplicaId,
pub credentials: RoomCredentials,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ReconnectToRepo {
pub id: RepoId,
pub replica_id: ReplicaId,
}
impl Request for ReconnectToRepo {
type Response = ReconnectToRepoResponse;
}
impl Into<RequestEnvelope> for ReconnectToRepo {
fn into(self) -> RequestEnvelope {
RequestEnvelope::ReconnectToRepo(self)
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct ReconnectToRepoResponse {
pub credentials: RoomCredentials,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SyncRepo {
pub id: RepoId,
pub max_operation_ids: BTreeMap<ReplicaId, OperationCount>,
}
impl Request for SyncRepo {
type Response = SyncRepoResponse;
}
impl Into<RequestEnvelope> for SyncRepo {
fn into(self) -> RequestEnvelope {
RequestEnvelope::SyncRepo(self)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SyncRepoResponse {
pub operations: Vec<Operation>,
pub max_operation_ids: BTreeMap<ReplicaId, OperationCount>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PublishOperations {
pub repo_id: RepoId,
pub operations: Vec<Operation>,
}
impl Request for PublishOperations {
type Response = ();
}
impl Into<RequestEnvelope> for PublishOperations {
fn into(self) -> RequestEnvelope {
RequestEnvelope::PublishOperations(self)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum MessageEnvelope {
Operation(Operation),
}
impl MessageEnvelope {
pub fn unwrap(self) -> Box<dyn Any> {
Box::new(match self {
MessageEnvelope::Operation(message) => message,
})
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Operation {
CreateDocument(CreateDocument),
Edit(Edit),
CreateBranch(CreateBranch),
}
impl Operation {
pub fn id(&self) -> OperationId {
match self {
Operation::CreateDocument(op) => op.id,
Operation::Edit(op) => op.id,
Operation::CreateBranch(op) => op.id,
}
}
pub fn branch_id(&self) -> BranchId {
match self {
Operation::CreateBranch(op) => op.id,
Operation::CreateDocument(op) => op.branch_id,
Operation::Edit(op) => op.branch_id,
}
}
pub fn parent(&self) -> &RevisionId {
match self {
Operation::CreateDocument(op) => &op.parent,
Operation::Edit(op) => &op.parent,
Operation::CreateBranch(op) => &op.parent,
}
}
}

View file

@ -0,0 +1,286 @@
use crate::{
btree::{self, Bias},
dense_id::DenseId,
AnchorRange, BranchId, DocumentFragment, DocumentFragmentSummary, DocumentId, DocumentMetadata,
InsertionFragment, OperationId, Revision, RevisionId, RopeBuilder, Tombstone,
};
use anyhow::Result;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use std::{cmp, sync::Arc};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CreateBranch {
pub id: BranchId,
pub parent: RevisionId,
pub name: Arc<str>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CreateDocument {
pub id: DocumentId,
pub branch_id: BranchId,
pub parent: RevisionId,
}
impl CreateDocument {
pub fn apply(self, revision: &mut Revision) {
let mut cursor = revision.document_fragments.cursor::<DocumentId>();
let mut new_document_fragments = cursor.slice(&self.id, Bias::Right, &());
new_document_fragments.push(
DocumentFragment {
document_id: self.id,
location: DenseId::min(),
insertion_id: self.id,
insertion_subrange: 0..0,
tombstones: Default::default(),
undo_count: 0,
},
&(),
);
new_document_fragments.append(cursor.suffix(&()), &());
drop(cursor);
revision.document_fragments = new_document_fragments;
revision.insertion_fragments.insert_or_replace(
InsertionFragment {
insertion_id: self.id,
offset_in_insertion: 0,
fragment_location: DenseId::min(),
},
&(),
);
revision.document_metadata.insert(
self.id,
DocumentMetadata {
path: None,
last_change: self.id,
},
);
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Edit {
pub id: OperationId,
pub document_id: DocumentId,
pub branch_id: BranchId,
pub parent: RevisionId,
pub edits: SmallVec<[(AnchorRange, Arc<str>); 2]>,
}
impl Edit {
pub fn apply(self, parent_revision: &Revision, revision: &mut Revision) -> Result<()> {
if self.edits.is_empty() {
return Ok(());
}
let mut old_fragments = revision
.document_fragments
.cursor::<DocumentFragmentSummary>();
// Slice to the start of the document this to which this operation applies.
let mut new_fragments = old_fragments.slice(&self.document_id, Bias::Left, &());
let mut new_insertions = Vec::new();
let mut new_ropes = RopeBuilder::new(
revision.visible_text.cursor(0),
revision.hidden_text.cursor(0),
);
new_ropes.append(
new_fragments.summary().visible_len,
new_fragments.summary().hidden_len,
);
let mut insertion_offset = 0;
let mut current_fragment = old_fragments.item().cloned();
for (range, new_text) in self.edits {
// We need to tombstone the intersection of the edit's range with fragments that
// were visible in the operation's parent revision.
for mut parent_fragment in parent_revision
.visible_fragments_for_range(range.clone())?
.cloned()
{
// Intersect the parent fragment with the edit's range.
if parent_fragment.insertion_id == range.start_insertion_id {
parent_fragment.insertion_subrange.start = range.start_offset_in_insertion;
}
if parent_fragment.insertion_id == range.end_insertion_id {
parent_fragment.insertion_subrange.end = cmp::min(
parent_fragment.insertion_subrange.end,
range.end_offset_in_insertion,
);
}
// Find the locations of the parent fragment in the new revision.
for fragment_location in revision.fragment_locations(
parent_fragment.insertion_id,
parent_fragment.insertion_subrange,
) {
if let Some(fragment) = current_fragment.as_ref() {
// Advance to fragment_location if it is greater than the location of the current fragment,
if *fragment_location > fragment.location {
// Flush the remainder of current fragment.
if !fragment.insertion_subrange.is_empty() || fragment.is_sentinel() {
new_ropes.push_fragment(fragment, fragment.visible());
new_insertions
.push(btree::Edit::Insert(InsertionFragment::new(&fragment)));
new_fragments.push(fragment.clone(), &());
}
old_fragments.next(&());
// Append all fragments between the previous fragment and the new fragment_location.
let slice = old_fragments.slice(
&(self.document_id, fragment_location),
Bias::Left,
&(),
);
new_ropes
.append(slice.summary().visible_len, slice.summary().hidden_len);
new_fragments.append(slice, &());
current_fragment = old_fragments.item().cloned();
// We should always find a fragment when seeking to fragment_location.
debug_assert!(current_fragment.is_some());
}
}
// If the edit starts at the end of the current fragment, flush it.
if let Some(fragment) = current_fragment.as_ref() {
if fragment.insertion_id == range.start_insertion_id
&& fragment.insertion_subrange.end == range.start_offset_in_insertion
{
let fragment = current_fragment.take().unwrap();
new_ropes.push_fragment(&fragment, fragment.visible());
new_insertions
.push(btree::Edit::Insert(InsertionFragment::new(&fragment)));
new_fragments.push(fragment, &());
old_fragments.next(&());
current_fragment = old_fragments.item().and_then(|fragment| {
if fragment.document_id == self.document_id {
Some(fragment.clone())
} else {
None
}
});
}
}
if let Some(fragment) = current_fragment.take() {
// If we haven't advanced off the end, then the current fragment intersects
// the current edit's range.
let (prefix, mut intersection, suffix) = fragment.intersect(range.clone());
// If we have a prefix, push it.
if let Some(mut prefix) = prefix {
prefix.location = DenseId::between(
&new_fragments.summary().max_location,
&intersection.location,
);
new_insertions
.push(btree::Edit::Insert(InsertionFragment::new(&prefix)));
new_ropes.push_fragment(&prefix, prefix.visible());
new_fragments.push(prefix, &());
}
if let Some(suffix) = suffix {
intersection.location = DenseId::between(
&new_fragments.summary().max_location,
&suffix.location,
);
// If we still have a suffix, the next edit may be inside of it, so set it as
// the current fragment and continue the loop.
current_fragment = Some(suffix);
} else {
// Otherwise, advance to the next fragment if it's still part of the same document.
old_fragments.next(&());
if let Some(next_fragment) = old_fragments.item() {
if next_fragment.document_id == self.document_id {
current_fragment = Some(next_fragment.clone());
}
}
}
// Then tombstone the intersecting portion.
let was_visible = intersection.visible();
intersection.tombstones.push(Tombstone {
id: self.id,
undo_count: 0,
});
new_ropes.push_fragment(&intersection, was_visible);
new_insertions
.push(btree::Edit::Insert(InsertionFragment::new(&intersection)));
new_fragments.push(intersection, &());
}
}
}
// Move past insertions that were causally after the current operation.
while let Some(fragment) = current_fragment.as_ref() {
if fragment.insertion_id.is_causally_after(self.id) {
new_ropes.push_fragment(fragment, fragment.visible());
new_insertions.push(btree::Edit::Insert(InsertionFragment::new(fragment)));
new_fragments.push(fragment.clone(), &());
old_fragments.next(&());
current_fragment = old_fragments.item().and_then(|fragment| {
if fragment.document_id == self.document_id {
Some(fragment.clone())
} else {
None
}
});
} else {
break;
}
}
// Finally, insert a fragment containing the new text.
if !new_text.is_empty() {
let fragment = DocumentFragment {
document_id: self.document_id,
location: DenseId::between(
&new_fragments.summary().max_location,
current_fragment
.as_ref()
.map_or(DenseId::max_ref(), |fragment| &fragment.location),
),
insertion_id: self.id,
insertion_subrange: insertion_offset..insertion_offset + new_text.len(),
tombstones: Default::default(),
undo_count: 0,
};
new_insertions.push(btree::Edit::Insert(InsertionFragment::new(&fragment)));
new_ropes.push_str(new_text.as_ref());
new_fragments.push(fragment, &());
insertion_offset += new_text.len();
}
}
if let Some(fragment) = current_fragment {
if !fragment.insertion_subrange.is_empty() {
new_ropes.push_fragment(&fragment, fragment.visible());
new_insertions.push(btree::Edit::Insert(InsertionFragment::new(&fragment)));
new_fragments.push(fragment, &());
}
old_fragments.next(&());
}
let suffix = old_fragments.suffix(&());
drop(old_fragments);
new_ropes.append(suffix.summary().visible_len, suffix.summary().hidden_len);
let (visible_text, hidden_text) = new_ropes.finish();
revision.visible_text = visible_text;
revision.hidden_text = hidden_text;
new_fragments.append(suffix, &());
revision.document_fragments = new_fragments;
new_insertions.sort_unstable_by_key(|edit| edit.key());
new_insertions.dedup_by_key(|edit| edit.key());
revision.insertion_fragments.edit(new_insertions, &());
revision.check_invariants();
Ok(())
}
}

1443
crates/crdb/src/rope.rs Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,51 @@
use serde::{Deserialize, Serialize};
use std::ops::{Add, AddAssign, Sub};
#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct OffsetUtf16(pub usize);
impl<'a> Add<&'a Self> for OffsetUtf16 {
type Output = Self;
fn add(self, other: &'a Self) -> Self::Output {
Self(self.0 + other.0)
}
}
impl Add for OffsetUtf16 {
type Output = Self;
fn add(self, other: Self) -> Self::Output {
Self(self.0 + other.0)
}
}
impl<'a> Sub<&'a Self> for OffsetUtf16 {
type Output = Self;
fn sub(self, other: &'a Self) -> Self::Output {
debug_assert!(*other <= self);
Self(self.0 - other.0)
}
}
impl Sub for OffsetUtf16 {
type Output = OffsetUtf16;
fn sub(self, other: Self) -> Self::Output {
debug_assert!(other <= self);
Self(self.0 - other.0)
}
}
impl<'a> AddAssign<&'a Self> for OffsetUtf16 {
fn add_assign(&mut self, other: &'a Self) {
self.0 += other.0;
}
}
impl AddAssign<Self> for OffsetUtf16 {
fn add_assign(&mut self, other: Self) {
self.0 += other.0;
}
}

View file

@ -0,0 +1,129 @@
use serde::{Deserialize, Serialize};
use std::{
cmp::Ordering,
ops::{Add, AddAssign, Sub},
};
#[derive(Clone, Copy, Default, Eq, PartialEq, Debug, Hash, Serialize, Deserialize)]
pub struct Point {
pub row: u32,
pub column: u32,
}
impl Point {
pub const MAX: Self = Self {
row: u32::MAX,
column: u32::MAX,
};
pub fn new(row: u32, column: u32) -> Self {
Point { row, column }
}
pub fn zero() -> Self {
Point::new(0, 0)
}
pub fn parse_str(s: &str) -> Self {
let mut point = Self::zero();
for (row, line) in s.split('\n').enumerate() {
point.row = row as u32;
point.column = line.len() as u32;
}
point
}
pub fn is_zero(&self) -> bool {
self.row == 0 && self.column == 0
}
pub fn saturating_sub(self, other: Self) -> Self {
if self < other {
Self::zero()
} else {
self - other
}
}
}
impl<'a> Add<&'a Self> for Point {
type Output = Point;
fn add(self, other: &'a Self) -> Self::Output {
self + *other
}
}
impl Add for Point {
type Output = Point;
fn add(self, other: Self) -> Self::Output {
if other.row == 0 {
Point::new(self.row, self.column + other.column)
} else {
Point::new(self.row + other.row, other.column)
}
}
}
impl<'a> Sub<&'a Self> for Point {
type Output = Point;
fn sub(self, other: &'a Self) -> Self::Output {
self - *other
}
}
impl Sub for Point {
type Output = Point;
fn sub(self, other: Self) -> Self::Output {
debug_assert!(other <= self);
if self.row == other.row {
Point::new(0, self.column - other.column)
} else {
Point::new(self.row - other.row, self.column)
}
}
}
impl<'a> AddAssign<&'a Self> for Point {
fn add_assign(&mut self, other: &'a Self) {
*self += *other;
}
}
impl AddAssign<Self> for Point {
fn add_assign(&mut self, other: Self) {
if other.row == 0 {
self.column += other.column;
} else {
self.row += other.row;
self.column = other.column;
}
}
}
impl PartialOrd for Point {
fn partial_cmp(&self, other: &Point) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Point {
#[cfg(target_pointer_width = "64")]
fn cmp(&self, other: &Point) -> Ordering {
let a = (self.row as usize) << 32 | self.column as usize;
let b = (other.row as usize) << 32 | other.column as usize;
a.cmp(&b)
}
#[cfg(target_pointer_width = "32")]
fn cmp(&self, other: &Point) -> Ordering {
match self.row.cmp(&other.row) {
Ordering::Equal => self.column.cmp(&other.column),
comparison @ _ => comparison,
}
}
}

View file

@ -0,0 +1,119 @@
use std::{
cmp::Ordering,
ops::{Add, AddAssign, Sub},
};
#[derive(Clone, Copy, Default, Eq, PartialEq, Debug, Hash)]
pub struct PointUtf16 {
pub row: u32,
pub column: u32,
}
impl PointUtf16 {
pub const MAX: Self = Self {
row: u32::MAX,
column: u32::MAX,
};
pub fn new(row: u32, column: u32) -> Self {
PointUtf16 { row, column }
}
pub fn zero() -> Self {
PointUtf16::new(0, 0)
}
pub fn is_zero(&self) -> bool {
self.row == 0 && self.column == 0
}
pub fn saturating_sub(self, other: Self) -> Self {
if self < other {
Self::zero()
} else {
self - other
}
}
}
impl<'a> Add<&'a Self> for PointUtf16 {
type Output = PointUtf16;
fn add(self, other: &'a Self) -> Self::Output {
self + *other
}
}
impl Add for PointUtf16 {
type Output = PointUtf16;
fn add(self, other: Self) -> Self::Output {
if other.row == 0 {
PointUtf16::new(self.row, self.column + other.column)
} else {
PointUtf16::new(self.row + other.row, other.column)
}
}
}
impl<'a> Sub<&'a Self> for PointUtf16 {
type Output = PointUtf16;
fn sub(self, other: &'a Self) -> Self::Output {
self - *other
}
}
impl Sub for PointUtf16 {
type Output = PointUtf16;
fn sub(self, other: Self) -> Self::Output {
debug_assert!(other <= self);
if self.row == other.row {
PointUtf16::new(0, self.column - other.column)
} else {
PointUtf16::new(self.row - other.row, self.column)
}
}
}
impl<'a> AddAssign<&'a Self> for PointUtf16 {
fn add_assign(&mut self, other: &'a Self) {
*self += *other;
}
}
impl AddAssign<Self> for PointUtf16 {
fn add_assign(&mut self, other: Self) {
if other.row == 0 {
self.column += other.column;
} else {
self.row += other.row;
self.column = other.column;
}
}
}
impl PartialOrd for PointUtf16 {
fn partial_cmp(&self, other: &PointUtf16) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for PointUtf16 {
#[cfg(target_pointer_width = "64")]
fn cmp(&self, other: &PointUtf16) -> Ordering {
let a = (self.row as usize) << 32 | self.column as usize;
let b = (other.row as usize) << 32 | other.column as usize;
a.cmp(&b)
}
#[cfg(target_pointer_width = "32")]
fn cmp(&self, other: &PointUtf16) -> Ordering {
match self.row.cmp(&other.row) {
Ordering::Equal => self.column.cmp(&other.column),
comparison @ _ => comparison,
}
}
}

View file

@ -0,0 +1,58 @@
use super::{ChunkSummary, TextDimension, TextSummary};
use crate::btree;
use std::ops::{Add, AddAssign, Sub, SubAssign};
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Unclipped<T>(pub T);
impl<T> From<T> for Unclipped<T> {
fn from(value: T) -> Self {
Unclipped(value)
}
}
impl<'a, T: btree::Dimension<'a, ChunkSummary>> btree::Dimension<'a, ChunkSummary>
for Unclipped<T>
{
fn add_summary(&mut self, summary: &'a ChunkSummary, _: &()) {
self.0.add_summary(summary, &());
}
}
impl<T: TextDimension> TextDimension for Unclipped<T> {
fn from_text_summary(summary: &TextSummary) -> Self {
Unclipped(T::from_text_summary(summary))
}
fn add_assign(&mut self, other: &Self) {
TextDimension::add_assign(&mut self.0, &other.0);
}
}
impl<T: Add<T, Output = T>> Add<Unclipped<T>> for Unclipped<T> {
type Output = Unclipped<T>;
fn add(self, rhs: Unclipped<T>) -> Self::Output {
Unclipped(self.0 + rhs.0)
}
}
impl<T: Sub<T, Output = T>> Sub<Unclipped<T>> for Unclipped<T> {
type Output = Unclipped<T>;
fn sub(self, rhs: Unclipped<T>) -> Self::Output {
Unclipped(self.0 - rhs.0)
}
}
impl<T: AddAssign<T>> AddAssign<Unclipped<T>> for Unclipped<T> {
fn add_assign(&mut self, rhs: Unclipped<T>) {
self.0 += rhs.0;
}
}
impl<T: SubAssign<T>> SubAssign<Unclipped<T>> for Unclipped<T> {
fn sub_assign(&mut self, rhs: Unclipped<T>) {
self.0 -= rhs.0;
}
}

419
crates/crdb/src/sync.rs Normal file
View file

@ -0,0 +1,419 @@
use crate::{
btree::{self, Bias},
messages::{Operation, PublishOperations},
OperationId,
};
use bromberg_sl2::HashMatrix;
use std::{
cmp::Ordering,
iter,
ops::{Range, RangeBounds},
};
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct Digest {
count: usize,
hash: HashMatrix,
}
impl btree::Item for Operation {
type Summary = OperationSummary;
fn summary(&self) -> Self::Summary {
OperationSummary {
max_id: self.id(),
digest: Digest {
count: 1,
hash: bromberg_sl2::hash_strict(&self.id().to_be_bytes()),
},
}
}
}
impl btree::KeyedItem for Operation {
type Key = OperationId;
fn key(&self) -> Self::Key {
self.id()
}
}
#[derive(Clone, Debug, Default)]
pub struct OperationSummary {
max_id: OperationId,
digest: Digest,
}
impl btree::Summary for OperationSummary {
type Context = ();
fn add_summary(&mut self, summary: &Self, _: &()) {
debug_assert!(self.max_id < summary.max_id);
self.max_id = summary.max_id;
self.digest.count += summary.digest.count;
self.digest.hash = self.digest.hash * summary.digest.hash;
}
}
impl btree::Dimension<'_, OperationSummary> for OperationId {
fn add_summary(&mut self, summary: &'_ OperationSummary, _: &()) {
debug_assert!(*self < summary.max_id);
*self = summary.max_id;
}
}
impl btree::Dimension<'_, OperationSummary> for usize {
fn add_summary(&mut self, summary: &'_ OperationSummary, _: &()) {
*self += summary.digest.count;
}
}
impl btree::Dimension<'_, OperationSummary> for Digest {
fn add_summary(&mut self, summary: &'_ OperationSummary, _: &()) {
self.count += summary.digest.count;
self.hash = self.hash * summary.digest.hash;
}
}
struct SyncRequest {
digests: Vec<Digest>,
}
struct SyncResponse {
shared_prefix_end: usize,
operations: Vec<Operation>,
}
struct SyncStats {
server_operations: usize,
client_operations: usize,
}
fn sync_server(
operations: &mut btree::Sequence<Operation>,
sync_request: SyncRequest,
) -> SyncResponse {
for client_digest in sync_request.digests {
let server_digest = digest_for_range(operations, 0..client_digest.count);
if server_digest == client_digest {
return SyncResponse {
shared_prefix_end: server_digest.count,
operations: operations_for_range(operations, server_digest.count..)
.cloned()
.collect(),
};
}
}
SyncResponse {
shared_prefix_end: 0,
operations: operations.iter().cloned().collect(),
}
}
fn publish_operations(
server_operations: &mut btree::Sequence<Operation>,
request: PublishOperations,
) {
server_operations.edit(
request
.operations
.into_iter()
.map(btree::Edit::Insert)
.collect(),
&(),
);
}
fn sync_client(
client_operations: &mut btree::Sequence<Operation>,
server_operations: &mut btree::Sequence<Operation>,
min_shared_prefix_end: usize,
max_digest_count: usize,
) -> SyncStats {
let mut digests = Vec::new();
let mut digest_end_ix = client_operations.summary().digest.count;
// We will multiply by some some factor less than 1 to produce digests
// over ever smaller digest ranges. The following formula ensures that
// we will produce `max_digest_count` digests, and that the last digest
// will go from `0` to `min_shared_prefix_end`.
// op_count * factor^max_digest_count = min_shared_prefix_end
// factor^max_digest_count = min_shared_prefix_end/op_count
// max_digest_count * log_2(factor) = log_2(min_shared_prefix_end/op_count)
// log_2(factor) = log_2(min_shared_prefix_end/op_count)/max_digest_count
// factor = 2^(log_2(min_shared_prefix_end/op_count)/max_digest_count)
let factor = 2f64.powf(
(min_shared_prefix_end as f64 / digest_end_ix as f64).log2() / max_digest_count as f64,
);
for _ in 0..max_digest_count {
if digest_end_ix <= min_shared_prefix_end {
break;
}
digests.push(digest_for_range(client_operations, 0..digest_end_ix));
digest_end_ix = (digest_end_ix as f64 * factor).ceil() as usize; // 🪬
}
let server_response = sync_server(server_operations, SyncRequest { digests });
let new_ops_from_client = {
let mut new_ops_from_client = Vec::new();
let mut client_cursor = client_operations.cursor::<usize>();
let mut new_client_operations =
client_cursor.slice(&server_response.shared_prefix_end, Bias::Right, &());
let mut server_operations = server_response.operations.iter().peekable();
let mut new_ops_from_server = Vec::new();
while let Some(server_op) = server_operations.peek() {
match client_cursor.item() {
Some(client_operation) => {
let comparison = server_op.id().cmp(&client_operation.id());
match comparison {
Ordering::Less => {
new_ops_from_server.push(server_operations.next().unwrap().clone());
}
_ => {
new_client_operations.extend(new_ops_from_server.drain(..), &());
new_client_operations.push(client_operation.clone(), &());
client_cursor.next(&());
if comparison == Ordering::Equal {
server_operations.next();
} else {
new_ops_from_client.push(client_operation.clone());
}
}
}
}
None => {
new_ops_from_server.push(server_operations.next().unwrap().clone());
}
}
}
new_client_operations.extend(new_ops_from_server, &());
let client_suffix = client_cursor.suffix(&());
new_client_operations.append(client_suffix.clone(), &());
drop(client_cursor);
*client_operations = new_client_operations;
new_ops_from_client.extend(client_suffix.iter().cloned());
new_ops_from_client
};
let sync_stats = SyncStats {
server_operations: server_response.operations.len(),
client_operations: new_ops_from_client.len(),
};
publish_operations(
server_operations,
PublishOperations {
repo_id: Default::default(),
operations: new_ops_from_client,
},
);
sync_stats
}
fn digest_for_range(operations: &btree::Sequence<Operation>, range: Range<usize>) -> Digest {
let mut cursor = operations.cursor::<usize>();
cursor.seek(&range.start, Bias::Right, &());
cursor.summary(&range.end, Bias::Right, &())
}
fn operations_for_range<T: RangeBounds<usize>>(
operations: &btree::Sequence<Operation>,
range: T,
) -> impl Iterator<Item = &Operation> {
let mut cursor = operations.cursor::<usize>();
match range.start_bound() {
collections::Bound::Included(start) => {
cursor.seek(start, Bias::Right, &());
}
collections::Bound::Excluded(start) => {
cursor.seek(&(*start + 1), Bias::Right, &());
}
collections::Bound::Unbounded => cursor.next(&()),
}
iter::from_fn(move || {
if range.contains(cursor.start()) {
let operation = cursor.item()?;
cursor.next(&());
Some(operation)
} else {
None
}
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{operations, OperationCount};
use rand::prelude::*;
use std::env;
#[test]
fn test_sync() {
assert_sync(1..=10, 5..=10, 0, 16);
assert_sync(1..=10, 4..=10, 0, 16);
assert_sync(1..=10, 1..=5, 0, 16);
assert_sync([1, 3, 5, 7, 9], [2, 4, 6, 8, 10], 0, 16);
assert_sync([1, 2, 3, 4, 6, 7, 8, 9, 11, 12], [4, 5, 6, 10, 12], 0, 16);
assert_sync(1..=10, 5..=14, 0, 16);
assert_sync(1..=80, (1..=70).chain(90..=100), 0, 16);
assert_sync(1..=1910, (1..=1900).chain(1910..=2000), 0, 16);
}
#[gpui::test(iterations = 100)]
fn test_random(mut rng: StdRng) {
let max_operations = env::var("OPERATIONS")
.map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
.unwrap_or(10);
let min_shared_prefix_end = 1024;
let max_digest_count = 1024;
let mut connected = true;
let mut client_ops = btree::Sequence::new();
let mut server_ops = btree::Sequence::new();
let mut ideal_server_ops = 0;
let mut ideal_client_ops = 0;
let mut next_reconnection = None;
for ix in 1..=max_operations {
if connected && rng.gen_bool(0.0005) {
dbg!(ix);
connected = false;
let mut factor = 0.0005;
while rng.gen() {
factor *= 2.0;
}
let remaining_operations = max_operations - ix;
let disconnection_period = (remaining_operations as f64 * factor) as usize;
next_reconnection = Some(ix + disconnection_period);
dbg!(disconnection_period);
}
if next_reconnection == Some(ix) {
connected = true;
next_reconnection = None;
log::debug!("===============");
let stats = sync_client(
&mut client_ops,
&mut server_ops,
min_shared_prefix_end,
max_digest_count,
);
log::debug!(
"ideal server ops: {}, actual server ops: {}, abs error: {}, pct error: {:.3}%",
ideal_server_ops,
stats.server_operations,
stats.server_operations - ideal_server_ops,
((stats.server_operations as f64 / ideal_server_ops as f64) - 1.) * 100.
);
log::debug!(
"ideal client ops: {}, actual client ops: {}, abs error: {}, pct error: {:.3}%",
ideal_client_ops,
stats.client_operations,
stats.client_operations - ideal_client_ops,
((stats.client_operations as f64 / ideal_client_ops as f64) - 1.0) * 100.
);
assert_eq!(
client_ops.iter().map(|op| op.id()).collect::<Vec<_>>(),
server_ops.iter().map(|op| op.id()).collect::<Vec<_>>()
);
ideal_client_ops = 0;
ideal_server_ops = 0;
}
if connected {
client_ops.push(build_operation(ix), &());
server_ops.push(build_operation(ix), &());
} else if rng.gen_bool(0.95) {
ideal_server_ops += 1;
server_ops.push(build_operation(ix), &());
} else {
ideal_client_ops += 1;
client_ops.push(build_operation(ix), &());
}
}
log::debug!("============");
let stats = sync_client(
&mut client_ops,
&mut server_ops,
min_shared_prefix_end,
max_digest_count,
);
log::debug!(
"ideal server ops: {}, actual server ops: {}, abs error: {}, pct error: {:.3}%",
ideal_server_ops,
stats.server_operations,
stats.server_operations - ideal_server_ops,
((stats.server_operations as f64 / ideal_server_ops as f64) - 1.) * 100.
);
log::debug!(
"ideal client ops: {}, actual client ops: {}, abs error: {}, pct error: {:.3}%",
ideal_client_ops,
stats.client_operations,
stats.client_operations - ideal_client_ops,
((stats.client_operations as f64 / ideal_client_ops as f64) - 1.0) * 100.
);
assert_eq!(
client_ops.iter().map(|op| op.id()).collect::<Vec<_>>(),
server_ops.iter().map(|op| op.id()).collect::<Vec<_>>()
);
}
fn assert_sync(
client_ops: impl IntoIterator<Item = usize>,
server_ops: impl IntoIterator<Item = usize>,
min_digest_delta: usize,
max_digest_count: usize,
) {
let client_ops = client_ops
.into_iter()
.map(build_operation)
.collect::<Vec<_>>();
let server_ops = server_ops
.into_iter()
.map(build_operation)
.collect::<Vec<_>>();
let mut client_operations = btree::Sequence::from_iter(client_ops, &());
let mut server_operations = btree::Sequence::from_iter(server_ops, &());
sync_client(
&mut client_operations,
&mut server_operations,
min_digest_delta,
max_digest_count,
);
assert_eq!(
client_operations
.iter()
.map(|op| op.id())
.collect::<Vec<_>>(),
server_operations
.iter()
.map(|op| op.id())
.collect::<Vec<_>>()
);
}
fn build_operation(id: usize) -> Operation {
Operation::CreateBranch(operations::CreateBranch {
id: OperationId {
replica_id: Default::default(),
operation_count: OperationCount(id),
},
parent: Default::default(),
name: "".into(),
})
}
fn digest_counts(digests: &[Digest]) -> Vec<usize> {
digests.iter().map(|d| d.count).collect()
}
}

201
crates/crdb/src/test.rs Normal file
View file

@ -0,0 +1,201 @@
use crate::{ClientNetwork, ClientRoom, RoomCredentials, RoomName, RoomToken, ServerNetwork, User};
use anyhow::{anyhow, Result};
use collections::BTreeMap;
use futures::{channel::mpsc, future::BoxFuture, FutureExt, StreamExt};
use gpui::executor::Background;
use parking_lot::Mutex;
use std::sync::Arc;
pub struct TestNetwork(Arc<Mutex<NetworkState>>);
impl TestNetwork {
pub fn new(executor: Arc<Background>) -> Self {
Self(Arc::new(Mutex::new(NetworkState {
executor,
request_handler: None,
rooms: Default::default(),
})))
}
pub fn server(&self) -> TestServerNetwork {
TestServerNetwork(self.0.clone())
}
pub fn client(&self, login: impl Into<Arc<str>>) -> TestClientNetwork {
TestClientNetwork {
user: User {
login: login.into(),
},
network: self.0.clone(),
}
}
}
struct NetworkState {
executor: Arc<Background>,
request_handler:
Option<Box<dyn Send + Fn(User, Vec<u8>) -> Result<BoxFuture<'static, Result<Vec<u8>>>>>>,
rooms: BTreeMap<RoomName, Room>,
}
#[derive(Default)]
pub struct Room {
inboxes: BTreeMap<RoomToken, mpsc::UnboundedSender<Vec<u8>>>,
authorized_users: BTreeMap<RoomToken, Arc<str>>,
next_token_id: usize,
}
pub struct TestServerNetwork(Arc<Mutex<NetworkState>>);
impl ServerNetwork for TestServerNetwork {
fn create_room(&self, name: &RoomName) -> BoxFuture<Result<()>> {
let network = self.0.clone();
let room = name.clone();
async move {
let executor = network.lock().executor.clone();
executor.simulate_random_delay().await;
network.lock().rooms.insert(room, Default::default());
Ok(())
}
.boxed()
}
fn grant_room_access(&self, room: &RoomName, user: &str) -> RoomToken {
let mut network = self.0.lock();
let room = network.rooms.get_mut(&room).expect("room must exist");
let token_id = room.next_token_id;
room.next_token_id += 1;
let token = RoomToken(format!("{}/{}", token_id, user).into());
room.authorized_users.insert(token.clone(), user.into());
token
}
fn handle_requests<H, F>(&self, handle_request: H)
where
H: 'static + Send + Fn(User, Vec<u8>) -> Result<F>,
F: 'static + Send + futures::Future<Output = Result<Vec<u8>>>,
{
self.0.lock().request_handler = Some(Box::new(move |user, request| {
handle_request(user, request.clone()).map(FutureExt::boxed)
}));
}
}
pub struct TestClientNetwork {
user: User,
network: Arc<Mutex<NetworkState>>,
}
impl ClientNetwork for TestClientNetwork {
type Room = TestClientRoom;
fn request(&self, request: Vec<u8>) -> BoxFuture<Result<Vec<u8>>> {
let response =
self.network.lock().request_handler.as_ref().unwrap()(self.user.clone(), request);
async move { response?.await }.boxed()
}
fn room(&self, credentials: RoomCredentials) -> Self::Room {
TestClientRoom {
outbox: Default::default(),
credentials,
message_handler: Default::default(),
network: self.network.clone(),
}
}
}
pub struct TestClientRoom {
outbox: Option<mpsc::UnboundedSender<Vec<u8>>>,
credentials: RoomCredentials,
message_handler: Arc<Mutex<Option<Box<dyn Send + Fn(Vec<u8>)>>>>,
network: Arc<Mutex<NetworkState>>,
}
impl ClientRoom for TestClientRoom {
fn connect(&mut self) -> BoxFuture<Result<()>> {
assert!(
self.outbox.is_none(),
"client should not connect more than once"
);
let (inbox_tx, mut inbox_rx) = mpsc::unbounded();
{
let mut network = self.network.lock();
let room = network
.rooms
.get_mut(&self.credentials.name)
.expect("room should exist");
if !room.authorized_users.contains_key(&self.credentials.token) {
return std::future::ready(Err(anyhow!(
"token {:?} is not authorized to enter room {:?}",
self.credentials.token,
self.credentials.name
)))
.boxed();
}
let existing_inbox = room
.inboxes
.insert(self.credentials.token.clone(), inbox_tx);
assert!(
existing_inbox.is_none(),
"client should not connect twice with the same token"
);
}
let message_handler = self.message_handler.clone();
self.network
.lock()
.executor
.spawn(async move {
while let Some(message) = inbox_rx.next().await {
if let Some(handler) = message_handler.lock().as_ref() {
handler(message);
}
}
})
.detach();
// Send outbound messages to other clients in the room.
let (outbox_tx, mut outbox_rx) = mpsc::unbounded();
self.outbox = Some(outbox_tx);
let executor = self.network.lock().executor.clone();
let network = self.network.clone();
let credentials = self.credentials.clone();
self.network
.lock()
.executor
.spawn(async move {
while let Some(message) = outbox_rx.next().await {
let inboxes = network
.lock()
.rooms
.get(&credentials.name)
.map(|room| room.inboxes.clone());
if let Some(inboxes) = inboxes {
for (inbox_token, inbox) in inboxes {
executor.simulate_random_delay().await;
if inbox_token != credentials.token {
let _ = inbox.unbounded_send(message.clone());
}
}
}
}
})
.detach();
async { Ok(()) }.boxed()
}
fn broadcast(&self, message: Vec<u8>) {
let tx = self.outbox.as_ref().expect("must be connected");
tx.unbounded_send(message).expect("channel must be open");
}
fn handle_messages(&self, handle_message: impl 'static + Send + Fn(Vec<u8>)) {
self.message_handler
.lock()
.replace(Box::new(handle_message));
}
}

View file

@ -562,8 +562,8 @@ impl DeterministicState {
self.poll_history.push(event);
if let Some(prev_history) = &self.previous_poll_history {
let ix = self.poll_history.len() - 1;
let prev_event = prev_history[ix];
if event != prev_event {
let prev_event = prev_history.get(ix);
if Some(&event) != prev_event {
let mut message = String::new();
writeln!(
&mut message,
@ -574,17 +574,22 @@ impl DeterministicState {
})
)
.unwrap();
writeln!(
&mut message,
"previous runnable backtrace:\n{:?}",
self.runnable_backtraces
.get_mut(&prev_event.id())
.map(|trace| {
trace.resolve();
util::CwdBacktrace(trace)
})
)
.unwrap();
if let Some(prev_event) = prev_event {
writeln!(
&mut message,
"previous runnable backtrace:\n{:?}",
self.runnable_backtraces
.get_mut(&prev_event.id())
.map(|trace| {
trace.resolve();
util::CwdBacktrace(trace)
})
)
.unwrap();
} else {
writeln!(&mut message, "previous runnable backtrace:\nnone").unwrap();
}
panic!("detected non-determinism after {ix}. {message}");
}
}

View file

@ -8,7 +8,7 @@ publish = false
path = "src/rope.rs"
[dependencies]
bromberg_sl2 = { git = "https://github.com/zed-industries/bromberg_sl2", rev = "950bc5482c216c395049ae33ae4501e08975f17f" }
bromberg_sl2 = { git = "https://github.com/zed-industries/bromberg_sl2", rev = "6faf816bd5b4b7b2b6ea77495686634732ded095" }
smallvec.workspace = true
sum_tree = { path = "../sum_tree" }
arrayvec = "0.7.1"

View file

@ -665,7 +665,7 @@ pub enum Edit<T: KeyedItem> {
}
impl<T: KeyedItem> Edit<T> {
fn key(&self) -> T::Key {
pub fn key(&self) -> T::Key {
match self {
Edit::Insert(item) => item.key(),
Edit::Remove(key) => key.clone(),

View file

@ -1,4 +1,9 @@
use std::{cmp::Ordering, fmt::Debug};
use std::{
cmp::Ordering,
collections::BTreeMap,
fmt::Debug,
ops::{Bound, RangeBounds},
};
use crate::{Bias, Dimension, Edit, Item, KeyedItem, SeekTarget, SumTree, Summary};
@ -25,7 +30,11 @@ pub struct TreeSet<K>(TreeMap<K, ()>)
where
K: Clone + Debug + Default + Ord;
impl<K: Clone + Debug + Default + Ord, V: Clone + Debug> TreeMap<K, V> {
impl<K, V> TreeMap<K, V>
where
K: Clone + Debug + Default + Ord,
V: Clone + Debug,
{
pub fn from_ordered_entries(entries: impl IntoIterator<Item = (K, V)>) -> Self {
let tree = SumTree::from_iter(
entries
@ -54,6 +63,10 @@ impl<K: Clone + Debug + Default + Ord, V: Clone + Debug> TreeMap<K, V> {
}
}
pub fn contains_key<'a>(&self, key: &'a K) -> bool {
self.get(key).is_some()
}
pub fn insert(&mut self, key: K, value: V) {
self.0.insert_or_replace(MapEntry { key, value }, &());
}
@ -93,6 +106,32 @@ impl<K: Clone + Debug + Default + Ord, V: Clone + Debug> TreeMap<K, V> {
cursor.item().map(|item| (&item.key, &item.value))
}
pub fn range<'a, R>(&self, range: R) -> impl Iterator<Item = (&K, &V)>
where
K: 'a,
R: RangeBounds<&'a K>,
{
let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
match range.start_bound() {
Bound::Included(start) => {
let start = MapKeyRef(Some(*start));
cursor.seek(&start, Bias::Left, &());
}
Bound::Excluded(start) => {
let start = MapKeyRef(Some(*start));
cursor.seek(&start, Bias::Right, &());
}
Bound::Unbounded => cursor.next(&()),
}
cursor
.map(|entry| (&entry.key, &entry.value))
.take_while(move |(key, _)| match range.end_bound() {
Bound::Included(end) => key <= end,
Bound::Excluded(end) => key < end,
Bound::Unbounded => true,
})
}
pub fn iter_from<'a>(&'a self, from: &'a K) -> impl Iterator<Item = (&K, &V)> + '_ {
let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
let from_key = MapKeyRef(Some(from));
@ -162,6 +201,28 @@ impl<K: Clone + Debug + Default + Ord, V: Clone + Debug> TreeMap<K, V> {
}
}
impl<K, V> Into<BTreeMap<K, V>> for &TreeMap<K, V>
where
K: Clone + Debug + Default + Ord,
V: Clone + Debug,
{
fn into(self) -> BTreeMap<K, V> {
self.iter()
.map(|(replica_id, count)| (replica_id.clone(), count.clone()))
.collect()
}
}
impl<K, V> From<&BTreeMap<K, V>> for TreeMap<K, V>
where
K: Clone + Debug + Default + Ord,
V: Clone + Debug,
{
fn from(value: &BTreeMap<K, V>) -> Self {
TreeMap::from_ordered_entries(value.into_iter().map(|(k, v)| (k.clone(), v.clone())))
}
}
#[derive(Debug)]
struct MapSeekTargetAdaptor<'a, T>(&'a T);

View file

@ -9,8 +9,10 @@ pub mod test;
use std::{
borrow::Cow,
cmp::{self, Ordering},
env,
ops::{AddAssign, Range, RangeInclusive},
panic::Location,
path::PathBuf,
pin::Pin,
task::{Context, Poll},
};
@ -374,6 +376,19 @@ impl<T: Ord + Clone> RangeExt<T> for RangeInclusive<T> {
}
}
pub fn path_env_var(name: &str) -> Option<PathBuf> {
let value = env::var(name).ok()?;
let mut path = PathBuf::from(value);
if path.is_relative() {
let mut abs_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
abs_path.pop();
abs_path.pop();
abs_path.push(path);
path = abs_path
}
Some(path)
}
#[cfg(test)]
mod tests {
use super::*;

View file

@ -36,7 +36,7 @@ terminal = { path = "../terminal" }
theme = { path = "../theme" }
util = { path = "../util" }
async-recursion = "1.0.0"
async-recursion.workspace = true
itertools = "0.10"
bincode = "1.2.1"
anyhow.workspace = true

View file

@ -72,7 +72,7 @@ zed-actions = {path = "../zed-actions"}
anyhow.workspace = true
async-compression = { version = "0.3", features = ["gzip", "futures-bufread"] }
async-tar = "0.4.2"
async-recursion = "0.3"
async-recursion.workspace = true
async-trait.workspace = true
backtrace = "0.3"
chrono = "0.4"