Merge pull request #525 from zed-industries/preserve-worktrees

Grow worktrees monotonically when sharing and move most messages to the background
This commit is contained in:
Antonio Scandurra 2022-03-04 09:48:18 +01:00 committed by GitHub
commit dc5a09b3f7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 662 additions and 380 deletions

View file

@ -761,7 +761,7 @@ mod tests {
project
.update(cx, |project, cx| {
project.find_or_create_local_worktree("/test", false, cx)
project.find_or_create_local_worktree("/test", true, cx)
})
.await
.unwrap();

View file

@ -8217,7 +8217,7 @@ mod tests {
let (worktree, relative_path) = project
.update(cx, |project, cx| {
project.find_or_create_local_worktree("/file", false, cx)
project.find_or_create_local_worktree("/file", true, cx)
})
.await
.unwrap();

View file

@ -448,7 +448,7 @@ mod tests {
params
.project
.update(cx, |project, cx| {
project.find_or_create_local_worktree("/root", false, cx)
project.find_or_create_local_worktree("/root", true, cx)
})
.await
.unwrap();
@ -517,7 +517,7 @@ mod tests {
params
.project
.update(cx, |project, cx| {
project.find_or_create_local_worktree("/dir", false, cx)
project.find_or_create_local_worktree("/dir", true, cx)
})
.await
.unwrap();
@ -583,7 +583,7 @@ mod tests {
params
.project
.update(cx, |project, cx| {
project.find_or_create_local_worktree("/root/the-parent-dir/the-file", false, cx)
project.find_or_create_local_worktree("/root/the-parent-dir/the-file", true, cx)
})
.await
.unwrap();

View file

@ -68,6 +68,7 @@ pub struct Buffer {
remote_selections: TreeMap<ReplicaId, SelectionSet>,
selections_update_count: usize,
diagnostics_update_count: usize,
diagnostics_timestamp: clock::Lamport,
file_update_count: usize,
language_server: Option<LanguageServerState>,
completion_triggers: Vec<String>,
@ -425,23 +426,30 @@ impl Buffer {
this.apply_ops(ops, cx)?;
for selection_set in message.selections {
let lamport_timestamp = clock::Lamport {
replica_id: selection_set.replica_id as ReplicaId,
value: selection_set.lamport_timestamp,
};
this.remote_selections.insert(
selection_set.replica_id as ReplicaId,
SelectionSet {
selections: proto::deserialize_selections(selection_set.selections),
lamport_timestamp: clock::Lamport {
replica_id: selection_set.replica_id as ReplicaId,
value: selection_set.lamport_timestamp,
},
lamport_timestamp,
},
);
this.text.lamport_clock.observe(lamport_timestamp);
}
let snapshot = this.snapshot();
let entries = proto::deserialize_diagnostics(message.diagnostics);
this.apply_diagnostic_update(
DiagnosticSet::from_sorted_entries(entries.into_iter().cloned(), &snapshot),
DiagnosticSet::from_sorted_entries(entries.iter().cloned(), &snapshot),
clock::Lamport {
replica_id: 0,
value: message.diagnostics_timestamp,
},
cx,
);
this.completion_triggers = message.completion_triggers;
Ok(this)
@ -470,6 +478,7 @@ impl Buffer {
})
.collect(),
diagnostics: proto::serialize_diagnostics(self.diagnostics.iter()),
diagnostics_timestamp: self.diagnostics_timestamp.value,
completion_triggers: self.completion_triggers.clone(),
}
}
@ -512,6 +521,7 @@ impl Buffer {
selections_update_count: 0,
diagnostics: Default::default(),
diagnostics_update_count: 0,
diagnostics_timestamp: Default::default(),
file_update_count: 0,
language_server: None,
completion_triggers: Default::default(),
@ -1005,11 +1015,12 @@ impl Buffer {
drop(edits_since_save);
let set = DiagnosticSet::new(sanitized_diagnostics, content);
self.apply_diagnostic_update(set.clone(), cx);
let lamport_timestamp = self.text.lamport_clock.tick();
self.apply_diagnostic_update(set.clone(), lamport_timestamp, cx);
let op = Operation::UpdateDiagnostics {
diagnostics: set.iter().cloned().collect(),
lamport_timestamp: self.text.lamport_clock.tick(),
lamport_timestamp,
};
self.send_operation(op, cx);
Ok(())
@ -1288,6 +1299,13 @@ impl Buffer {
self.text.wait_for_edits(edit_ids)
}
pub fn wait_for_anchors<'a>(
&mut self,
anchors: impl IntoIterator<Item = &'a Anchor>,
) -> impl Future<Output = ()> {
self.text.wait_for_anchors(anchors)
}
pub fn wait_for_version(&mut self, version: clock::Global) -> impl Future<Output = ()> {
self.text.wait_for_version(version)
}
@ -1672,11 +1690,12 @@ impl Buffer {
}
Operation::UpdateDiagnostics {
diagnostics: diagnostic_set,
..
lamport_timestamp,
} => {
let snapshot = self.snapshot();
self.apply_diagnostic_update(
DiagnosticSet::from_sorted_entries(diagnostic_set.iter().cloned(), &snapshot),
lamport_timestamp,
cx,
);
}
@ -1710,11 +1729,20 @@ impl Buffer {
}
}
fn apply_diagnostic_update(&mut self, diagnostics: DiagnosticSet, cx: &mut ModelContext<Self>) {
self.diagnostics = diagnostics;
self.diagnostics_update_count += 1;
cx.notify();
cx.emit(Event::DiagnosticsUpdated);
fn apply_diagnostic_update(
&mut self,
diagnostics: DiagnosticSet,
lamport_timestamp: clock::Lamport,
cx: &mut ModelContext<Self>,
) {
if lamport_timestamp > self.diagnostics_timestamp {
self.diagnostics = diagnostics;
self.diagnostics_timestamp = lamport_timestamp;
self.diagnostics_update_count += 1;
self.text.lamport_clock.observe(lamport_timestamp);
cx.notify();
cx.emit(Event::DiagnosticsUpdated);
}
}
#[cfg(not(test))]

View file

@ -25,7 +25,13 @@ pub fn serialize_operation(operation: &Operation) -> proto::Operation {
replica_id: undo.id.replica_id as u32,
local_timestamp: undo.id.value,
lamport_timestamp: lamport_timestamp.value,
ranges: undo.ranges.iter().map(serialize_range).collect(),
version: From::from(&undo.version),
transaction_ranges: undo
.transaction_ranges
.iter()
.map(serialize_range)
.collect(),
transaction_version: From::from(&undo.transaction_version),
counts: undo
.counts
.iter()
@ -35,7 +41,6 @@ pub fn serialize_operation(operation: &Operation) -> proto::Operation {
count: *count,
})
.collect(),
version: From::from(&undo.version),
}),
Operation::UpdateSelections {
selections,
@ -183,6 +188,7 @@ pub fn deserialize_operation(message: proto::Operation) -> Result<Operation> {
replica_id: undo.replica_id as ReplicaId,
value: undo.local_timestamp,
},
version: undo.version.into(),
counts: undo
.counts
.into_iter()
@ -196,8 +202,12 @@ pub fn deserialize_operation(message: proto::Operation) -> Result<Operation> {
)
})
.collect(),
ranges: undo.ranges.into_iter().map(deserialize_range).collect(),
version: undo.version.into(),
transaction_ranges: undo
.transaction_ranges
.into_iter()
.map(deserialize_range)
.collect(),
transaction_version: undo.transaction_version.into(),
},
}),
proto::operation::Variant::UpdateSelections(message) => {

View file

@ -12,7 +12,7 @@ use std::{
time::{Duration, Instant},
};
use unindent::Unindent as _;
use util::test::Network;
use util::{post_inc, test::Network};
#[cfg(test)]
#[ctor::ctor]
@ -1194,6 +1194,7 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
let mut now = Instant::now();
let mut mutation_count = operations;
let mut next_diagnostic_id = 0;
let mut active_selections = BTreeMap::default();
loop {
let replica_index = rng.gen_range(0..replica_ids.len());
@ -1234,7 +1235,27 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
});
mutation_count -= 1;
}
40..=49 if replica_ids.len() < max_peers => {
40..=49 if mutation_count != 0 && replica_id == 0 => {
let entry_count = rng.gen_range(1..=5);
buffer.update(cx, |buffer, cx| {
let diagnostics = (0..entry_count)
.map(|_| {
let range = buffer.random_byte_range(0, &mut rng);
DiagnosticEntry {
range,
diagnostic: Diagnostic {
message: post_inc(&mut next_diagnostic_id).to_string(),
..Default::default()
},
}
})
.collect();
log::info!("peer {} setting diagnostics: {:?}", replica_id, diagnostics);
buffer.update_diagnostics(diagnostics, None, cx).unwrap();
});
mutation_count -= 1;
}
50..=59 if replica_ids.len() < max_peers => {
let old_buffer = buffer.read(cx).to_proto();
let new_replica_id = replica_ids.len() as ReplicaId;
log::info!(
@ -1251,14 +1272,14 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
replica_ids.push(new_replica_id);
network.replicate(replica_id, new_replica_id);
}
50..=69 if mutation_count != 0 => {
60..=69 if mutation_count != 0 => {
buffer.update(cx, |buffer, cx| {
buffer.randomly_undo_redo(&mut rng, cx);
log::info!("buffer {} text: {:?}", buffer.replica_id(), buffer.text());
});
mutation_count -= 1;
}
70..=99 if network.has_unreceived(replica_id) => {
_ if network.has_unreceived(replica_id) => {
let ops = network
.receive(replica_id)
.into_iter()
@ -1295,15 +1316,25 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
}
}
let first_buffer = buffers[0].read(cx);
let first_buffer = buffers[0].read(cx).snapshot();
for buffer in &buffers[1..] {
let buffer = buffer.read(cx);
let buffer = buffer.read(cx).snapshot();
assert_eq!(
buffer.text(),
first_buffer.text(),
"Replica {} text != Replica 0 text",
buffer.replica_id()
);
assert_eq!(
buffer
.diagnostics_in_range::<_, usize>(0..buffer.len())
.collect::<Vec<_>>(),
first_buffer
.diagnostics_in_range::<_, usize>(0..first_buffer.len())
.collect::<Vec<_>>(),
"Replica {} diagnostics != Replica 0 diagnostics",
buffer.replica_id()
);
}
for buffer in &buffers {

View file

@ -35,10 +35,11 @@ pub(crate) trait LspCommand: 'static + Sized {
) -> Result<Self::Response>;
fn to_proto(&self, project_id: u64, buffer: &Buffer) -> Self::ProtoRequest;
fn from_proto(
async fn from_proto(
message: Self::ProtoRequest,
project: &mut Project,
buffer: &Buffer,
project: ModelHandle<Project>,
buffer: ModelHandle<Buffer>,
cx: AsyncAppContext,
) -> Result<Self>;
fn response_to_proto(
response: Self::Response,
@ -125,19 +126,28 @@ impl LspCommand for PrepareRename {
position: Some(language::proto::serialize_anchor(
&buffer.anchor_before(self.position),
)),
version: (&buffer.version()).into(),
}
}
fn from_proto(message: proto::PrepareRename, _: &mut Project, buffer: &Buffer) -> Result<Self> {
async fn from_proto(
message: proto::PrepareRename,
_: ModelHandle<Project>,
buffer: ModelHandle<Buffer>,
mut cx: AsyncAppContext,
) -> Result<Self> {
let position = message
.position
.and_then(deserialize_anchor)
.ok_or_else(|| anyhow!("invalid position"))?;
if !buffer.can_resolve(&position) {
Err(anyhow!("cannot resolve position"))?;
}
buffer
.update(&mut cx, |buffer, _| {
buffer.wait_for_version(message.version.into())
})
.await;
Ok(Self {
position: position.to_point_utf16(buffer),
position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)),
})
}
@ -245,19 +255,27 @@ impl LspCommand for PerformRename {
&buffer.anchor_before(self.position),
)),
new_name: self.new_name.clone(),
version: (&buffer.version()).into(),
}
}
fn from_proto(message: proto::PerformRename, _: &mut Project, buffer: &Buffer) -> Result<Self> {
async fn from_proto(
message: proto::PerformRename,
_: ModelHandle<Project>,
buffer: ModelHandle<Buffer>,
mut cx: AsyncAppContext,
) -> Result<Self> {
let position = message
.position
.and_then(deserialize_anchor)
.ok_or_else(|| anyhow!("invalid position"))?;
if !buffer.can_resolve(&position) {
Err(anyhow!("cannot resolve position"))?;
}
buffer
.update(&mut cx, |buffer, _| {
buffer.wait_for_version(message.version.into())
})
.await;
Ok(Self {
position: position.to_point_utf16(buffer),
position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)),
new_name: message.new_name,
push_to_history: false,
})
@ -389,19 +407,27 @@ impl LspCommand for GetDefinition {
position: Some(language::proto::serialize_anchor(
&buffer.anchor_before(self.position),
)),
version: (&buffer.version()).into(),
}
}
fn from_proto(message: proto::GetDefinition, _: &mut Project, buffer: &Buffer) -> Result<Self> {
async fn from_proto(
message: proto::GetDefinition,
_: ModelHandle<Project>,
buffer: ModelHandle<Buffer>,
mut cx: AsyncAppContext,
) -> Result<Self> {
let position = message
.position
.and_then(deserialize_anchor)
.ok_or_else(|| anyhow!("invalid position"))?;
if !buffer.can_resolve(&position) {
Err(anyhow!("cannot resolve position"))?;
}
buffer
.update(&mut cx, |buffer, _| {
buffer.wait_for_version(message.version.into())
})
.await;
Ok(Self {
position: position.to_point_utf16(buffer),
position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)),
})
}
@ -447,6 +473,9 @@ impl LspCommand for GetDefinition {
.end
.and_then(deserialize_anchor)
.ok_or_else(|| anyhow!("missing target end"))?;
buffer
.update(&mut cx, |buffer, _| buffer.wait_for_anchors([&start, &end]))
.await;
locations.push(Location {
buffer,
range: start..end,
@ -537,19 +566,27 @@ impl LspCommand for GetReferences {
position: Some(language::proto::serialize_anchor(
&buffer.anchor_before(self.position),
)),
version: (&buffer.version()).into(),
}
}
fn from_proto(message: proto::GetReferences, _: &mut Project, buffer: &Buffer) -> Result<Self> {
async fn from_proto(
message: proto::GetReferences,
_: ModelHandle<Project>,
buffer: ModelHandle<Buffer>,
mut cx: AsyncAppContext,
) -> Result<Self> {
let position = message
.position
.and_then(deserialize_anchor)
.ok_or_else(|| anyhow!("invalid position"))?;
if !buffer.can_resolve(&position) {
Err(anyhow!("cannot resolve position"))?;
}
buffer
.update(&mut cx, |buffer, _| {
buffer.wait_for_version(message.version.into())
})
.await;
Ok(Self {
position: position.to_point_utf16(buffer),
position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)),
})
}
@ -595,6 +632,9 @@ impl LspCommand for GetReferences {
.end
.and_then(deserialize_anchor)
.ok_or_else(|| anyhow!("missing target end"))?;
target_buffer
.update(&mut cx, |buffer, _| buffer.wait_for_anchors([&start, &end]))
.await;
locations.push(Location {
buffer: target_buffer,
range: start..end,
@ -666,23 +706,27 @@ impl LspCommand for GetDocumentHighlights {
position: Some(language::proto::serialize_anchor(
&buffer.anchor_before(self.position),
)),
version: (&buffer.version()).into(),
}
}
fn from_proto(
async fn from_proto(
message: proto::GetDocumentHighlights,
_: &mut Project,
buffer: &Buffer,
_: ModelHandle<Project>,
buffer: ModelHandle<Buffer>,
mut cx: AsyncAppContext,
) -> Result<Self> {
let position = message
.position
.and_then(deserialize_anchor)
.ok_or_else(|| anyhow!("invalid position"))?;
if !buffer.can_resolve(&position) {
Err(anyhow!("cannot resolve position"))?;
}
buffer
.update(&mut cx, |buffer, _| {
buffer.wait_for_version(message.version.into())
})
.await;
Ok(Self {
position: position.to_point_utf16(buffer),
position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)),
})
}
@ -713,33 +757,34 @@ impl LspCommand for GetDocumentHighlights {
self,
message: proto::GetDocumentHighlightsResponse,
_: ModelHandle<Project>,
_: ModelHandle<Buffer>,
_: AsyncAppContext,
buffer: ModelHandle<Buffer>,
mut cx: AsyncAppContext,
) -> Result<Vec<DocumentHighlight>> {
Ok(message
.highlights
.into_iter()
.map(|highlight| {
let start = highlight
.start
.and_then(deserialize_anchor)
.ok_or_else(|| anyhow!("missing target start"))?;
let end = highlight
.end
.and_then(deserialize_anchor)
.ok_or_else(|| anyhow!("missing target end"))?;
let kind = match proto::document_highlight::Kind::from_i32(highlight.kind) {
Some(proto::document_highlight::Kind::Text) => DocumentHighlightKind::TEXT,
Some(proto::document_highlight::Kind::Read) => DocumentHighlightKind::READ,
Some(proto::document_highlight::Kind::Write) => DocumentHighlightKind::WRITE,
None => DocumentHighlightKind::TEXT,
};
Ok(DocumentHighlight {
range: start..end,
kind,
})
})
.collect::<Result<Vec<_>>>()?)
let mut highlights = Vec::new();
for highlight in message.highlights {
let start = highlight
.start
.and_then(deserialize_anchor)
.ok_or_else(|| anyhow!("missing target start"))?;
let end = highlight
.end
.and_then(deserialize_anchor)
.ok_or_else(|| anyhow!("missing target end"))?;
buffer
.update(&mut cx, |buffer, _| buffer.wait_for_anchors([&start, &end]))
.await;
let kind = match proto::document_highlight::Kind::from_i32(highlight.kind) {
Some(proto::document_highlight::Kind::Text) => DocumentHighlightKind::TEXT,
Some(proto::document_highlight::Kind::Read) => DocumentHighlightKind::READ,
Some(proto::document_highlight::Kind::Write) => DocumentHighlightKind::WRITE,
None => DocumentHighlightKind::TEXT,
};
highlights.push(DocumentHighlight {
range: start..end,
kind,
});
}
Ok(highlights)
}
fn buffer_id_from_proto(message: &proto::GetDocumentHighlights) -> u64 {

View file

@ -8,7 +8,7 @@ use anyhow::{anyhow, Context, Result};
use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
use clock::ReplicaId;
use collections::{hash_map, HashMap, HashSet};
use futures::{future::Shared, Future, FutureExt, StreamExt};
use futures::{future::Shared, Future, FutureExt, StreamExt, TryFutureExt};
use fuzzy::{PathMatch, PathMatchCandidate, PathMatchCandidateSet};
use gpui::{
AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task,
@ -64,6 +64,8 @@ pub struct Project {
ProjectPath,
postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
>,
loading_local_worktrees:
HashMap<Arc<Path>, Shared<Task<Result<ModelHandle<Worktree>, Arc<anyhow::Error>>>>>,
opened_buffers: HashMap<u64, OpenBuffer>,
nonce: u128,
}
@ -282,6 +284,7 @@ impl Project {
opened_buffers: Default::default(),
shared_buffers: Default::default(),
loading_buffers: Default::default(),
loading_local_worktrees: Default::default(),
client_state: ProjectClientState::Local {
is_shared: false,
remote_id_tx,
@ -336,6 +339,7 @@ impl Project {
loading_buffers: Default::default(),
opened_buffer: (Rc::new(RefCell::new(opened_buffer_tx)), opened_buffer_rx),
shared_buffers: Default::default(),
loading_local_worktrees: Default::default(),
active_entry: None,
collaborators: Default::default(),
languages,
@ -398,19 +402,61 @@ impl Project {
}
#[cfg(any(test, feature = "test-support"))]
pub fn has_deferred_operations(&self, cx: &AppContext) -> bool {
self.opened_buffers.values().any(|buffer| match buffer {
OpenBuffer::Strong(buffer) => buffer.read(cx).deferred_ops_len() > 0,
OpenBuffer::Weak(buffer) => buffer
.upgrade(cx)
.map_or(false, |buffer| buffer.read(cx).deferred_ops_len() > 0),
OpenBuffer::Loading(_) => false,
})
pub fn languages(&self) -> &Arc<LanguageRegistry> {
&self.languages
}
#[cfg(any(test, feature = "test-support"))]
pub fn languages(&self) -> &Arc<LanguageRegistry> {
&self.languages
pub fn check_invariants(&self, cx: &AppContext) {
if self.is_local() {
let mut worktree_root_paths = HashMap::default();
for worktree in self.worktrees(cx) {
let worktree = worktree.read(cx);
let abs_path = worktree.as_local().unwrap().abs_path().clone();
let prev_worktree_id = worktree_root_paths.insert(abs_path.clone(), worktree.id());
assert_eq!(
prev_worktree_id,
None,
"abs path {:?} for worktree {:?} is not unique ({:?} was already registered with the same path)",
abs_path,
worktree.id(),
prev_worktree_id
)
}
} else {
let replica_id = self.replica_id();
for buffer in self.opened_buffers.values() {
if let Some(buffer) = buffer.upgrade(cx) {
let buffer = buffer.read(cx);
assert_eq!(
buffer.deferred_ops_len(),
0,
"replica {}, buffer {} has deferred operations",
replica_id,
buffer.remote_id()
);
}
}
}
}
#[cfg(any(test, feature = "test-support"))]
pub fn has_open_buffer(&self, path: impl Into<ProjectPath>, cx: &AppContext) -> bool {
let path = path.into();
if let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) {
self.opened_buffers.iter().any(|(_, buffer)| {
if let Some(buffer) = buffer.upgrade(cx) {
if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
if file.worktree == worktree && file.path() == &path.path {
return true;
}
}
}
false
})
} else {
false
}
}
pub fn fs(&self) -> &Arc<dyn Fs> {
@ -479,16 +525,16 @@ impl Project {
.filter_map(move |worktree| worktree.upgrade(cx))
}
pub fn strong_worktrees<'a>(
pub fn visible_worktrees<'a>(
&'a self,
cx: &'a AppContext,
) -> impl 'a + Iterator<Item = ModelHandle<Worktree>> {
self.worktrees.iter().filter_map(|worktree| {
worktree.upgrade(cx).and_then(|worktree| {
if worktree.read(cx).is_weak() {
None
} else {
if worktree.read(cx).is_visible() {
Some(worktree)
} else {
None
}
})
})
@ -514,6 +560,7 @@ impl Project {
} = &mut this.client_state
{
*is_shared = true;
for open_buffer in this.opened_buffers.values_mut() {
match open_buffer {
OpenBuffer::Strong(_) => {}
@ -525,6 +572,18 @@ impl Project {
OpenBuffer::Loading(_) => unreachable!(),
}
}
for worktree_handle in this.worktrees.iter_mut() {
match worktree_handle {
WorktreeHandle::Strong(_) => {}
WorktreeHandle::Weak(worktree) => {
if let Some(worktree) = worktree.upgrade(cx) {
*worktree_handle = WorktreeHandle::Strong(worktree);
}
}
}
}
remote_id_rx
.borrow()
.ok_or_else(|| anyhow!("no project id"))
@ -555,7 +614,7 @@ impl Project {
pub fn unshare(&self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
let rpc = self.client.clone();
cx.spawn(|this, mut cx| async move {
let project_id = this.update(&mut cx, |this, _| {
let project_id = this.update(&mut cx, |this, cx| {
if let ProjectClientState::Local {
is_shared,
remote_id_rx,
@ -563,15 +622,27 @@ impl Project {
} = &mut this.client_state
{
*is_shared = false;
for open_buffer in this.opened_buffers.values_mut() {
match open_buffer {
OpenBuffer::Strong(buffer) => {
*open_buffer = OpenBuffer::Weak(buffer.downgrade());
}
OpenBuffer::Weak(_) => {}
OpenBuffer::Loading(_) => unreachable!(),
_ => {}
}
}
for worktree_handle in this.worktrees.iter_mut() {
match worktree_handle {
WorktreeHandle::Strong(worktree) => {
if !worktree.read(cx).is_visible() {
*worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
}
}
_ => {}
}
}
remote_id_rx
.borrow()
.ok_or_else(|| anyhow!("no project id"))
@ -743,7 +814,7 @@ impl Project {
} else {
let worktree = this
.update(&mut cx, |this, cx| {
this.create_local_worktree(&abs_path, true, cx)
this.create_local_worktree(&abs_path, false, cx)
})
.await?;
this.update(&mut cx, |this, cx| {
@ -763,12 +834,12 @@ impl Project {
}
pub fn save_buffer_as(
&self,
&mut self,
buffer: ModelHandle<Buffer>,
abs_path: PathBuf,
cx: &mut ModelContext<Project>,
) -> Task<Result<()>> {
let worktree_task = self.find_or_create_local_worktree(&abs_path, false, cx);
let worktree_task = self.find_or_create_local_worktree(&abs_path, true, cx);
cx.spawn(|this, mut cx| async move {
let (worktree, path) = worktree_task.await?;
worktree
@ -786,25 +857,6 @@ impl Project {
})
}
#[cfg(any(test, feature = "test-support"))]
pub fn has_open_buffer(&self, path: impl Into<ProjectPath>, cx: &AppContext) -> bool {
let path = path.into();
if let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) {
self.opened_buffers.iter().any(|(_, buffer)| {
if let Some(buffer) = buffer.upgrade(cx) {
if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
if file.worktree == worktree && file.path() == &path.path {
return true;
}
}
}
false
})
} else {
false
}
}
pub fn get_open_buffer(
&mut self,
path: &ProjectPath,
@ -1166,6 +1218,10 @@ impl Project {
let (worktree, relative_path) = self
.find_local_worktree(&abs_path, cx)
.ok_or_else(|| anyhow!("no worktree found for diagnostics"))?;
if !worktree.read(cx).is_visible() {
return Ok(());
}
let project_path = ProjectPath {
worktree_id: worktree.read(cx).id(),
path: relative_path.into(),
@ -1774,6 +1830,7 @@ impl Project {
})
} else if let Some(project_id) = self.remote_id() {
let rpc = self.client.clone();
let version = buffer.version();
cx.spawn_weak(|_, mut cx| async move {
let response = rpc
.request(proto::GetCodeActions {
@ -1781,6 +1838,7 @@ impl Project {
buffer_id,
start: Some(language::proto::serialize_anchor(&range.start)),
end: Some(language::proto::serialize_anchor(&range.end)),
version: (&version).into(),
})
.await?;
@ -2051,7 +2109,7 @@ impl Project {
) -> Task<Result<HashMap<ModelHandle<Buffer>, Vec<Range<Anchor>>>>> {
if self.is_local() {
let snapshots = self
.strong_worktrees(cx)
.visible_worktrees(cx)
.filter_map(|tree| {
let tree = tree.read(cx).as_local()?;
Some(tree.snapshot())
@ -2295,16 +2353,16 @@ impl Project {
}
pub fn find_or_create_local_worktree(
&self,
&mut self,
abs_path: impl AsRef<Path>,
weak: bool,
visible: bool,
cx: &mut ModelContext<Self>,
) -> Task<Result<(ModelHandle<Worktree>, PathBuf)>> {
let abs_path = abs_path.as_ref();
if let Some((tree, relative_path)) = self.find_local_worktree(abs_path, cx) {
Task::ready(Ok((tree.clone(), relative_path.into())))
} else {
let worktree = self.create_local_worktree(abs_path, weak, cx);
let worktree = self.create_local_worktree(abs_path, visible, cx);
cx.foreground()
.spawn(async move { Ok((worktree.await?, PathBuf::new())) })
}
@ -2335,38 +2393,62 @@ impl Project {
}
fn create_local_worktree(
&self,
&mut self,
abs_path: impl AsRef<Path>,
weak: bool,
visible: bool,
cx: &mut ModelContext<Self>,
) -> Task<Result<ModelHandle<Worktree>>> {
let fs = self.fs.clone();
let client = self.client.clone();
let path = Arc::from(abs_path.as_ref());
cx.spawn(|project, mut cx| async move {
let worktree = Worktree::local(client.clone(), path, weak, fs, &mut cx).await?;
let path: Arc<Path> = abs_path.as_ref().into();
let task = self
.loading_local_worktrees
.entry(path.clone())
.or_insert_with(|| {
cx.spawn(|project, mut cx| {
async move {
let worktree =
Worktree::local(client.clone(), path.clone(), visible, fs, &mut cx)
.await;
project.update(&mut cx, |project, _| {
project.loading_local_worktrees.remove(&path);
});
let worktree = worktree?;
let (remote_project_id, is_shared) = project.update(&mut cx, |project, cx| {
project.add_worktree(&worktree, cx);
(project.remote_id(), project.is_shared())
});
let (remote_project_id, is_shared) =
project.update(&mut cx, |project, cx| {
project.add_worktree(&worktree, cx);
(project.remote_id(), project.is_shared())
});
if let Some(project_id) = remote_project_id {
worktree
.update(&mut cx, |worktree, cx| {
worktree.as_local_mut().unwrap().register(project_id, cx)
})
.await?;
if is_shared {
worktree
.update(&mut cx, |worktree, cx| {
worktree.as_local_mut().unwrap().share(project_id, cx)
})
.await?;
}
if let Some(project_id) = remote_project_id {
if is_shared {
worktree
.update(&mut cx, |worktree, cx| {
worktree.as_local_mut().unwrap().share(project_id, cx)
})
.await?;
} else {
worktree
.update(&mut cx, |worktree, cx| {
worktree.as_local_mut().unwrap().register(project_id, cx)
})
.await?;
}
}
Ok(worktree)
}
.map_err(|err| Arc::new(err))
})
.shared()
})
.clone();
cx.foreground().spawn(async move {
match task.await {
Ok(worktree) => Ok(worktree),
Err(err) => Err(anyhow!("{}", err)),
}
Ok(worktree)
})
}
@ -2388,11 +2470,14 @@ impl Project {
.detach();
}
let push_weak_handle = {
let push_strong_handle = {
let worktree = worktree.read(cx);
worktree.is_local() && worktree.is_weak()
self.is_shared() || worktree.is_visible() || worktree.is_remote()
};
if push_weak_handle {
if push_strong_handle {
self.worktrees
.push(WorktreeHandle::Strong(worktree.clone()));
} else {
cx.observe_release(&worktree, |this, cx| {
this.worktrees
.retain(|worktree| worktree.upgrade(cx).is_some());
@ -2401,9 +2486,6 @@ impl Project {
.detach();
self.worktrees
.push(WorktreeHandle::Weak(worktree.downgrade()));
} else {
self.worktrees
.push(WorktreeHandle::Strong(worktree.clone()));
}
cx.notify();
}
@ -2623,7 +2705,7 @@ impl Project {
root_name: envelope.payload.root_name,
entries: Default::default(),
diagnostic_summaries: Default::default(),
weak: envelope.payload.weak,
visible: envelope.payload.visible,
};
let (worktree, load_task) =
Worktree::remote(remote_id, replica_id, worktree, client, cx);
@ -2731,7 +2813,7 @@ impl Project {
buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
}
OpenBuffer::Loading(operations) => operations.extend_from_slice(&ops),
_ => unreachable!(),
OpenBuffer::Weak(_) => {}
},
hash_map::Entry::Vacant(e) => {
e.insert(OpenBuffer::Loading(ops));
@ -2785,13 +2867,11 @@ impl Project {
.ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?;
Ok::<_, anyhow::Error>((project_id, buffer))
})?;
if !buffer
.read_with(&cx, |buffer, _| buffer.version())
.observed_all(&requested_version)
{
Err(anyhow!("save request depends on unreceived edits"))?;
}
buffer
.update(&mut cx, |buffer, _| {
buffer.wait_for_version(requested_version)
})
.await;
let (saved_version, mtime) = buffer.update(&mut cx, |buffer, cx| buffer.save(cx)).await?;
Ok(proto::BufferSaved {
@ -2849,12 +2929,9 @@ impl Project {
.map(|buffer| buffer.upgrade(cx).unwrap())
.ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))
})?;
if !buffer
.read_with(&cx, |buffer, _| buffer.version())
.observed_all(&version)
{
Err(anyhow!("completion request depends on unreceived edits"))?;
}
buffer
.update(&mut cx, |buffer, _| buffer.wait_for_version(version))
.await;
let version = buffer.read_with(&cx, |buffer, _| buffer.version());
let completions = this
.update(&mut cx, |this, cx| this.completions(&buffer, position, cx))
@ -2924,10 +3001,13 @@ impl Project {
.map(|buffer| buffer.upgrade(cx).unwrap())
.ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))
})?;
buffer
.update(&mut cx, |buffer, _| {
buffer.wait_for_version(envelope.payload.version.into())
})
.await;
let version = buffer.read_with(&cx, |buffer, _| buffer.version());
if !version.observed(start.timestamp) || !version.observed(end.timestamp) {
Err(anyhow!("code action request references unreceived edits"))?;
}
let code_actions = this.update(&mut cx, |this, cx| {
Ok::<_, anyhow::Error>(this.code_actions(&buffer, start..end, cx))
})?;
@ -2983,19 +3063,26 @@ impl Project {
<T::LspRequest as lsp::request::Request>::Result: Send,
{
let sender_id = envelope.original_sender_id()?;
let (request, buffer_version) = this.update(&mut cx, |this, cx| {
let buffer_id = T::buffer_id_from_proto(&envelope.payload);
let buffer_handle = this
.opened_buffers
let buffer_id = T::buffer_id_from_proto(&envelope.payload);
let buffer_handle = this.read_with(&cx, |this, _| {
this.opened_buffers
.get(&buffer_id)
.map(|buffer| buffer.upgrade(cx).unwrap())
.ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?;
let buffer = buffer_handle.read(cx);
let buffer_version = buffer.version();
let request = T::from_proto(envelope.payload, this, buffer)?;
Ok::<_, anyhow::Error>((this.request_lsp(buffer_handle, request, cx), buffer_version))
.map(|buffer| buffer.upgrade(&cx).unwrap())
.ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
})?;
let response = request.await?;
let request = T::from_proto(
envelope.payload,
this.clone(),
buffer_handle.clone(),
cx.clone(),
)
.await?;
let buffer_version = buffer_handle.read_with(&cx, |buffer, _| buffer.version());
let response = this
.update(&mut cx, |this, cx| {
this.request_lsp(buffer_handle, request, cx)
})
.await?;
this.update(&mut cx, |this, cx| {
Ok(T::response_to_proto(
response,
@ -3357,7 +3444,7 @@ impl Project {
) -> impl 'a + Future<Output = Vec<PathMatch>> {
let worktrees = self
.worktrees(cx)
.filter(|worktree| !worktree.read(cx).is_weak())
.filter(|worktree| worktree.read(cx).is_visible())
.collect::<Vec<_>>();
let include_root_name = worktrees.len() > 1;
let candidate_sets = worktrees
@ -3652,7 +3739,7 @@ mod tests {
let (tree, _) = project
.update(cx, |project, cx| {
project.find_or_create_local_worktree(&root_link_path, false, cx)
project.find_or_create_local_worktree(&root_link_path, true, cx)
})
.await
.unwrap();
@ -3721,7 +3808,7 @@ mod tests {
let (tree, _) = project
.update(cx, |project, cx| {
project.find_or_create_local_worktree("/dir", false, cx)
project.find_or_create_local_worktree("/dir", true, cx)
})
.await
.unwrap();
@ -3819,7 +3906,7 @@ mod tests {
let project = Project::test(Arc::new(RealFs), cx);
let (tree, _) = project
.update(cx, |project, cx| {
project.find_or_create_local_worktree(&dir.path(), false, cx)
project.find_or_create_local_worktree(&dir.path(), true, cx)
})
.await
.unwrap();
@ -3867,7 +3954,7 @@ mod tests {
let (tree, _) = project
.update(cx, |project, cx| {
project.find_or_create_local_worktree("/dir/b.rs", false, cx)
project.find_or_create_local_worktree("/dir/b.rs", true, cx)
})
.await
.unwrap();
@ -3924,16 +4011,13 @@ mod tests {
assert_eq!(definition.range.to_offset(target_buffer), 9..10);
assert_eq!(
list_worktrees(&project, cx),
[("/dir/b.rs".as_ref(), false), ("/dir/a.rs".as_ref(), true)]
[("/dir/b.rs".as_ref(), true), ("/dir/a.rs".as_ref(), false)]
);
drop(definition);
});
cx.read(|cx| {
assert_eq!(
list_worktrees(&project, cx),
[("/dir/b.rs".as_ref(), false)]
);
assert_eq!(list_worktrees(&project, cx), [("/dir/b.rs".as_ref(), true)]);
});
fn list_worktrees<'a>(
@ -3947,7 +4031,7 @@ mod tests {
let worktree = worktree.read(cx);
(
worktree.as_local().unwrap().abs_path().as_ref(),
worktree.is_weak(),
worktree.is_visible(),
)
})
.collect::<Vec<_>>()
@ -3968,7 +4052,7 @@ mod tests {
let project = Project::test(fs.clone(), cx);
let worktree_id = project
.update(cx, |p, cx| {
p.find_or_create_local_worktree("/dir", false, cx)
p.find_or_create_local_worktree("/dir", true, cx)
})
.await
.unwrap()
@ -4006,7 +4090,7 @@ mod tests {
let project = Project::test(fs.clone(), cx);
let worktree_id = project
.update(cx, |p, cx| {
p.find_or_create_local_worktree("/dir/file1", false, cx)
p.find_or_create_local_worktree("/dir/file1", true, cx)
})
.await
.unwrap()
@ -4050,7 +4134,7 @@ mod tests {
let (tree, _) = project
.update(cx, |p, cx| {
p.find_or_create_local_worktree(dir.path(), false, cx)
p.find_or_create_local_worktree(dir.path(), true, cx)
})
.await
.unwrap();
@ -4087,7 +4171,7 @@ mod tests {
Worktree::remote(
1,
1,
initial_snapshot.to_proto(&Default::default(), Default::default()),
initial_snapshot.to_proto(&Default::default(), true),
rpc.clone(),
cx,
)
@ -4196,7 +4280,7 @@ mod tests {
let project = Project::test(fs.clone(), cx);
let worktree_id = project
.update(cx, |p, cx| {
p.find_or_create_local_worktree("/the-dir", false, cx)
p.find_or_create_local_worktree("/the-dir", true, cx)
})
.await
.unwrap()
@ -4246,7 +4330,7 @@ mod tests {
let project = Project::test(Arc::new(RealFs), cx);
let (worktree, _) = project
.update(cx, |p, cx| {
p.find_or_create_local_worktree(dir.path(), false, cx)
p.find_or_create_local_worktree(dir.path(), true, cx)
})
.await
.unwrap();
@ -4380,7 +4464,7 @@ mod tests {
let project = Project::test(Arc::new(RealFs), cx);
let (worktree, _) = project
.update(cx, |p, cx| {
p.find_or_create_local_worktree(dir.path(), false, cx)
p.find_or_create_local_worktree(dir.path(), true, cx)
})
.await
.unwrap();
@ -4489,7 +4573,7 @@ mod tests {
let project = Project::test(fs.clone(), cx);
let (worktree, _) = project
.update(cx, |p, cx| {
p.find_or_create_local_worktree("/the-dir", false, cx)
p.find_or_create_local_worktree("/the-dir", true, cx)
})
.await
.unwrap();
@ -4757,7 +4841,7 @@ mod tests {
let (tree, _) = project
.update(cx, |project, cx| {
project.find_or_create_local_worktree("/dir", false, cx)
project.find_or_create_local_worktree("/dir", true, cx)
})
.await
.unwrap();
@ -4885,7 +4969,7 @@ mod tests {
let project = Project::test(fs.clone(), cx);
let (tree, _) = project
.update(cx, |project, cx| {
project.find_or_create_local_worktree("/dir", false, cx)
project.find_or_create_local_worktree("/dir", true, cx)
})
.await
.unwrap();

View file

@ -71,7 +71,7 @@ pub struct LocalWorktree {
queued_operations: Vec<(u64, Operation)>,
client: Arc<Client>,
fs: Arc<dyn Fs>,
weak: bool,
visible: bool,
}
pub struct RemoteWorktree {
@ -83,7 +83,7 @@ pub struct RemoteWorktree {
replica_id: ReplicaId,
queued_operations: Vec<(u64, Operation)>,
diagnostic_summaries: TreeMap<PathKey, DiagnosticSummary>,
weak: bool,
visible: bool,
}
#[derive(Clone)]
@ -169,11 +169,12 @@ impl Worktree {
pub async fn local(
client: Arc<Client>,
path: impl Into<Arc<Path>>,
weak: bool,
visible: bool,
fs: Arc<dyn Fs>,
cx: &mut AsyncAppContext,
) -> Result<ModelHandle<Self>> {
let (tree, scan_states_tx) = LocalWorktree::new(client, path, weak, fs.clone(), cx).await?;
let (tree, scan_states_tx) =
LocalWorktree::new(client, path, visible, fs.clone(), cx).await?;
tree.update(cx, |tree, cx| {
let tree = tree.as_local_mut().unwrap();
let abs_path = tree.abs_path().clone();
@ -203,7 +204,7 @@ impl Worktree {
.map(|c| c.to_ascii_lowercase())
.collect();
let root_name = worktree.root_name.clone();
let weak = worktree.weak;
let visible = worktree.visible;
let snapshot = Snapshot {
id: WorktreeId(remote_id as usize),
root_name,
@ -236,7 +237,7 @@ impl Worktree {
)
}),
),
weak,
visible,
})
});
@ -345,6 +346,10 @@ impl Worktree {
matches!(self, Worktree::Local(_))
}
pub fn is_remote(&self) -> bool {
!self.is_local()
}
pub fn snapshot(&self) -> Snapshot {
match self {
Worktree::Local(worktree) => worktree.snapshot().snapshot,
@ -352,10 +357,10 @@ impl Worktree {
}
}
pub fn is_weak(&self) -> bool {
pub fn is_visible(&self) -> bool {
match self {
Worktree::Local(worktree) => worktree.weak,
Worktree::Remote(worktree) => worktree.weak,
Worktree::Local(worktree) => worktree.visible,
Worktree::Remote(worktree) => worktree.visible,
}
}
@ -454,7 +459,7 @@ impl LocalWorktree {
async fn new(
client: Arc<Client>,
path: impl Into<Arc<Path>>,
weak: bool,
visible: bool,
fs: Arc<dyn Fs>,
cx: &mut AsyncAppContext,
) -> Result<(ModelHandle<Worktree>, UnboundedSender<ScanState>)> {
@ -521,7 +526,7 @@ impl LocalWorktree {
queued_operations: Default::default(),
client,
fs,
weak,
visible,
};
cx.spawn_weak(|this, mut cx| async move {
@ -734,10 +739,11 @@ impl LocalWorktree {
worktree_id: self.id().to_proto(),
root_name: self.root_name().to_string(),
authorized_logins: self.authorized_logins(),
weak: self.weak,
visible: self.visible,
};
let request = client.request(register_message);
cx.spawn(|this, mut cx| async move {
let response = client.request(register_message).await;
let response = request.await;
this.update(&mut cx, |this, _| {
let worktree = this.as_local_mut().unwrap();
match response {
@ -754,45 +760,49 @@ impl LocalWorktree {
})
}
pub fn share(
&mut self,
project_id: u64,
cx: &mut ModelContext<Worktree>,
) -> impl Future<Output = Result<()>> {
pub fn share(&mut self, project_id: u64, cx: &mut ModelContext<Worktree>) -> Task<Result<()>> {
let register = self.register(project_id, cx);
let (mut share_tx, mut share_rx) = oneshot::channel();
let (snapshots_to_send_tx, snapshots_to_send_rx) =
smol::channel::unbounded::<LocalSnapshot>();
if self.share.is_some() {
let _ = share_tx.try_send(Ok(()));
} else {
let snapshot = self.snapshot();
let rpc = self.client.clone();
let worktree_id = cx.model_id() as u64;
let (snapshots_to_send_tx, snapshots_to_send_rx) =
smol::channel::unbounded::<LocalSnapshot>();
let maintain_remote_snapshot = cx.background().spawn({
let rpc = rpc.clone();
let snapshot = snapshot.clone();
let diagnostic_summaries = self.diagnostic_summaries.clone();
async move {
if let Err(error) = rpc
.request(proto::UpdateWorktree {
project_id,
worktree_id,
root_name: snapshot.root_name().to_string(),
updated_entries: snapshot
.entries_by_path
.iter()
.filter(|e| !e.is_ignored)
.map(Into::into)
.collect(),
removed_entries: Default::default(),
})
.await
{
let _ = share_tx.try_send(Err(error));
return Err(anyhow!("failed to send initial update worktree"));
} else {
let _ = share_tx.try_send(Ok(()));
}
let mut prev_snapshot = match snapshots_to_send_rx.recv().await {
Ok(snapshot) => {
if let Err(error) = rpc
.request(proto::UpdateWorktree {
project_id,
worktree_id,
root_name: snapshot.root_name().to_string(),
updated_entries: snapshot
.entries_by_path
.iter()
.filter(|e| !e.is_ignored)
.map(Into::into)
.collect(),
removed_entries: Default::default(),
})
.await
{
let _ = share_tx.try_send(Err(error));
return Err(anyhow!("failed to send initial update worktree"));
} else {
let _ = share_tx.try_send(Ok(()));
snapshot
}
}
Err(error) => {
let _ = share_tx.try_send(Err(error.into()));
return Err(anyhow!("failed to send initial update worktree"));
}
};
for (path, summary) in diagnostic_summaries.iter() {
rpc.send(proto::UpdateDiagnosticSummary {
@ -802,7 +812,6 @@ impl LocalWorktree {
})?;
}
let mut prev_snapshot = snapshot;
while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
let message =
snapshot.build_update(&prev_snapshot, project_id, worktree_id, false);
@ -816,17 +825,24 @@ impl LocalWorktree {
});
self.share = Some(ShareState {
project_id,
snapshots_tx: snapshots_to_send_tx,
snapshots_tx: snapshots_to_send_tx.clone(),
_maintain_remote_snapshot: Some(maintain_remote_snapshot),
});
}
async move {
cx.spawn_weak(|this, cx| async move {
register.await?;
if let Some(this) = this.upgrade(&cx) {
this.read_with(&cx, |this, _| {
let this = this.as_local().unwrap();
let _ = snapshots_to_send_tx.try_send(this.snapshot());
});
}
share_rx
.next()
.await
.unwrap_or_else(|| Err(anyhow!("share ended")))
}
})
}
pub fn unshare(&mut self) {
@ -1024,7 +1040,7 @@ impl LocalSnapshot {
pub(crate) fn to_proto(
&self,
diagnostic_summaries: &TreeMap<PathKey, DiagnosticSummary>,
weak: bool,
visible: bool,
) -> proto::Worktree {
let root_name = self.root_name.clone();
proto::Worktree {
@ -1040,7 +1056,7 @@ impl LocalSnapshot {
.iter()
.map(|(path, summary)| summary.to_proto(&path.0))
.collect(),
weak,
visible,
}
}
@ -2464,7 +2480,7 @@ mod tests {
let tree = Worktree::local(
client,
Arc::from(Path::new("/root")),
false,
true,
fs,
&mut cx.to_async(),
)
@ -2507,7 +2523,7 @@ mod tests {
let tree = Worktree::local(
client,
dir.path(),
false,
true,
Arc::new(RealFs),
&mut cx.to_async(),
)

View file

@ -327,7 +327,7 @@ impl ProjectPanel {
.project
.read(cx)
.worktrees(cx)
.filter(|worktree| !worktree.read(cx).is_weak());
.filter(|worktree| worktree.read(cx).is_visible());
self.visible_entries.clear();
let mut entry_ix = 0;
@ -642,7 +642,7 @@ mod tests {
});
let (root1, _) = project
.update(cx, |project, cx| {
project.find_or_create_local_worktree("/root1", false, cx)
project.find_or_create_local_worktree("/root1", true, cx)
})
.await
.unwrap();
@ -651,7 +651,7 @@ mod tests {
.await;
let (root2, _) = project
.update(cx, |project, cx| {
project.find_or_create_local_worktree("/root2", false, cx)
project.find_or_create_local_worktree("/root2", true, cx)
})
.await
.unwrap();

View file

@ -270,7 +270,7 @@ impl ProjectSymbolsView {
range.end = cmp::min(range.end, view.matches.len());
let show_worktree_root_name =
view.project.read(cx).strong_worktrees(cx).count() > 1;
view.project.read(cx).visible_worktrees(cx).count() > 1;
items.extend(view.matches[range].iter().enumerate().map(move |(ix, m)| {
view.render_match(m, start + ix, show_worktree_root_name, cx)
}));

View file

@ -134,7 +134,7 @@ message RegisterWorktree {
uint64 worktree_id = 2;
string root_name = 3;
repeated string authorized_logins = 4;
bool weak = 5;
bool visible = 5;
}
message UnregisterWorktree {
@ -164,6 +164,7 @@ message GetDefinition {
uint64 project_id = 1;
uint64 buffer_id = 2;
Anchor position = 3;
repeated VectorClockEntry version = 4;
}
message GetDefinitionResponse {
@ -174,6 +175,7 @@ message GetReferences {
uint64 project_id = 1;
uint64 buffer_id = 2;
Anchor position = 3;
repeated VectorClockEntry version = 4;
}
message GetReferencesResponse {
@ -184,6 +186,7 @@ message GetDocumentHighlights {
uint64 project_id = 1;
uint64 buffer_id = 2;
Anchor position = 3;
repeated VectorClockEntry version = 4;
}
message GetDocumentHighlightsResponse {
@ -328,6 +331,7 @@ message GetCodeActions {
uint64 buffer_id = 2;
Anchor start = 3;
Anchor end = 4;
repeated VectorClockEntry version = 5;
}
message GetCodeActionsResponse {
@ -349,6 +353,7 @@ message PrepareRename {
uint64 project_id = 1;
uint64 buffer_id = 2;
Anchor position = 3;
repeated VectorClockEntry version = 4;
}
message PrepareRenameResponse {
@ -363,6 +368,7 @@ message PerformRename {
uint64 buffer_id = 2;
Anchor position = 3;
string new_name = 4;
repeated VectorClockEntry version = 5;
}
message PerformRenameResponse {
@ -502,7 +508,7 @@ message Worktree {
string root_name = 2;
repeated Entry entries = 3;
repeated DiagnosticSummary diagnostic_summaries = 4;
bool weak = 5;
bool visible = 5;
}
message File {
@ -536,7 +542,8 @@ message BufferState {
repeated Operation operations = 4;
repeated SelectionSet selections = 5;
repeated Diagnostic diagnostics = 6;
repeated string completion_triggers = 7;
uint32 diagnostics_timestamp = 7;
repeated string completion_triggers = 8;
}
message BufferFragment {
@ -623,9 +630,10 @@ message Operation {
uint32 replica_id = 1;
uint32 local_timestamp = 2;
uint32 lamport_timestamp = 3;
repeated Range ranges = 4;
repeated VectorClockEntry version = 5;
repeated UndoCount counts = 6;
repeated VectorClockEntry version = 4;
repeated Range transaction_ranges = 5;
repeated VectorClockEntry transaction_version = 6;
repeated UndoCount counts = 7;
}
message UpdateSelections {

View file

@ -139,10 +139,10 @@ macro_rules! entity_messages {
messages!(
(Ack, Foreground),
(AddProjectCollaborator, Foreground),
(ApplyCodeAction, Foreground),
(ApplyCodeActionResponse, Foreground),
(ApplyCompletionAdditionalEdits, Foreground),
(ApplyCompletionAdditionalEditsResponse, Foreground),
(ApplyCodeAction, Background),
(ApplyCodeActionResponse, Background),
(ApplyCompletionAdditionalEdits, Background),
(ApplyCompletionAdditionalEditsResponse, Background),
(BufferReloaded, Foreground),
(BufferSaved, Foreground),
(ChannelMessageSent, Foreground),
@ -157,15 +157,15 @@ messages!(
(GetChannels, Foreground),
(GetChannelsResponse, Foreground),
(GetCodeActions, Background),
(GetCodeActionsResponse, Foreground),
(GetCodeActionsResponse, Background),
(GetCompletions, Background),
(GetCompletionsResponse, Foreground),
(GetDefinition, Foreground),
(GetDefinitionResponse, Foreground),
(GetCompletionsResponse, Background),
(GetDefinition, Background),
(GetDefinitionResponse, Background),
(GetDocumentHighlights, Background),
(GetDocumentHighlightsResponse, Background),
(GetReferences, Foreground),
(GetReferencesResponse, Foreground),
(GetReferences, Background),
(GetReferencesResponse, Background),
(GetProjectSymbols, Background),
(GetProjectSymbolsResponse, Background),
(GetUsers, Foreground),
@ -176,10 +176,10 @@ messages!(
(JoinProjectResponse, Foreground),
(LeaveChannel, Foreground),
(LeaveProject, Foreground),
(OpenBuffer, Foreground),
(OpenBufferForSymbol, Foreground),
(OpenBufferForSymbolResponse, Foreground),
(OpenBufferResponse, Foreground),
(OpenBuffer, Background),
(OpenBufferForSymbol, Background),
(OpenBufferForSymbolResponse, Background),
(OpenBufferResponse, Background),
(PerformRename, Background),
(PerformRenameResponse, Background),
(PrepareRename, Background),
@ -190,8 +190,8 @@ messages!(
(RegisterWorktree, Foreground),
(RemoveProjectCollaborator, Foreground),
(SaveBuffer, Foreground),
(SearchProject, Foreground),
(SearchProjectResponse, Foreground),
(SearchProject, Background),
(SearchProjectResponse, Background),
(SendChannelMessage, Foreground),
(SendChannelMessageResponse, Foreground),
(ShareProject, Foreground),
@ -199,7 +199,7 @@ messages!(
(UnregisterProject, Foreground),
(UnregisterWorktree, Foreground),
(UnshareProject, Foreground),
(UpdateBuffer, Foreground),
(UpdateBuffer, Background),
(UpdateBufferFile, Foreground),
(UpdateContacts, Foreground),
(UpdateDiagnosticSummary, Foreground),

View file

@ -5,4 +5,4 @@ pub mod proto;
pub use conn::Connection;
pub use peer::*;
pub const PROTOCOL_VERSION: u32 = 8;
pub const PROTOCOL_VERSION: u32 = 9;

View file

@ -735,7 +735,7 @@ mod tests {
let project = Project::test(fs.clone(), cx);
let (tree, _) = project
.update(cx, |project, cx| {
project.find_or_create_local_worktree("/dir", false, cx)
project.find_or_create_local_worktree("/dir", true, cx)
})
.await
.unwrap();

View file

@ -351,7 +351,7 @@ impl Server {
.values()
.cloned()
.collect(),
weak: worktree.weak,
visible: worktree.visible,
})
})
.collect();
@ -440,7 +440,7 @@ impl Server {
Worktree {
authorized_user_ids: contact_user_ids.clone(),
root_name: request.payload.root_name.clone(),
weak: request.payload.weak,
visible: request.payload.visible,
},
)?;
}
@ -1070,7 +1070,7 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/a", false, cx)
p.find_or_create_local_worktree("/a", true, cx)
})
.await
.unwrap();
@ -1202,7 +1202,7 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/a", false, cx)
p.find_or_create_local_worktree("/a", true, cx)
})
.await
.unwrap();
@ -1303,7 +1303,7 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/a", false, cx)
p.find_or_create_local_worktree("/a", true, cx)
})
.await
.unwrap();
@ -1475,7 +1475,7 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/dir", false, cx)
p.find_or_create_local_worktree("/dir", true, cx)
})
.await
.unwrap();
@ -1557,7 +1557,7 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/dir", false, cx)
p.find_or_create_local_worktree("/dir", true, cx)
})
.await
.unwrap();
@ -1638,7 +1638,7 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/dir", false, cx)
p.find_or_create_local_worktree("/dir", true, cx)
})
.await
.unwrap();
@ -1717,7 +1717,7 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/dir", false, cx)
p.find_or_create_local_worktree("/dir", true, cx)
})
.await
.unwrap();
@ -1790,7 +1790,7 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/a", false, cx)
p.find_or_create_local_worktree("/a", true, cx)
})
.await
.unwrap();
@ -1878,7 +1878,7 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/a", false, cx)
p.find_or_create_local_worktree("/a", true, cx)
})
.await
.unwrap();
@ -2104,7 +2104,7 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/a", false, cx)
p.find_or_create_local_worktree("/a", true, cx)
})
.await
.unwrap();
@ -2303,7 +2303,7 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/a", false, cx)
p.find_or_create_local_worktree("/a", true, cx)
})
.await
.unwrap();
@ -2409,7 +2409,7 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/root-1", false, cx)
p.find_or_create_local_worktree("/root-1", true, cx)
})
.await
.unwrap();
@ -2545,7 +2545,7 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/root-1", false, cx)
p.find_or_create_local_worktree("/root-1", true, cx)
})
.await
.unwrap();
@ -2666,7 +2666,7 @@ mod tests {
let (worktree_1, _) = project_a
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/root-1", false, cx)
p.find_or_create_local_worktree("/root-1", true, cx)
})
.await
.unwrap();
@ -2675,7 +2675,7 @@ mod tests {
.await;
let (worktree_2, _) = project_a
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/root-2", false, cx)
p.find_or_create_local_worktree("/root-2", true, cx)
})
.await
.unwrap();
@ -2775,7 +2775,7 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/root-1", false, cx)
p.find_or_create_local_worktree("/root-1", true, cx)
})
.await
.unwrap();
@ -2921,7 +2921,7 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/code/crate-1", false, cx)
p.find_or_create_local_worktree("/code/crate-1", true, cx)
})
.await
.unwrap();
@ -3051,7 +3051,7 @@ mod tests {
let (worktree_a, _) = project_a
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/root", false, cx)
p.find_or_create_local_worktree("/root", true, cx)
})
.await
.unwrap();
@ -3155,7 +3155,7 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/a", false, cx)
p.find_or_create_local_worktree("/a", true, cx)
})
.await
.unwrap();
@ -3391,7 +3391,7 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/dir", false, cx)
p.find_or_create_local_worktree("/dir", true, cx)
})
.await
.unwrap();
@ -4005,7 +4005,7 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(cx_a, |p, cx| {
p.find_or_create_local_worktree("/a", false, cx)
p.find_or_create_local_worktree("/a", true, cx)
})
.await
.unwrap();
@ -4165,7 +4165,7 @@ mod tests {
let (collab_worktree, _) = host_project
.update(&mut host_cx, |project, cx| {
project.find_or_create_local_worktree("/_collab", false, cx)
project.find_or_create_local_worktree("/_collab", true, cx)
})
.await
.unwrap();
@ -4246,6 +4246,12 @@ mod tests {
.collect::<BTreeMap<_, _>>()
});
host_client
.project
.as_ref()
.unwrap()
.read_with(&host_cx, |project, cx| project.check_invariants(cx));
for (guest_client, mut guest_cx) in clients.into_iter() {
let guest_id = guest_client.client.id();
let worktree_snapshots =
@ -4291,13 +4297,7 @@ mod tests {
.project
.as_ref()
.unwrap()
.read_with(&guest_cx, |project, cx| {
assert!(
!project.has_deferred_operations(cx),
"guest {} has deferred operations",
guest_id,
);
});
.read_with(&guest_cx, |project, cx| project.check_invariants(cx));
for guest_buffer in &guest_client.buffers {
let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
@ -4307,14 +4307,24 @@ mod tests {
guest_id, guest_client.peer_id, buffer_id
))
});
let path = host_buffer
.read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
assert_eq!(
guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
0,
"guest {}, buffer {}, path {:?} has deferred operations",
guest_id,
buffer_id,
path,
);
assert_eq!(
guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
"guest {}, buffer {}, path {:?}, differs from the host's buffer",
guest_id,
buffer_id,
host_buffer
.read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx))
path
);
}
@ -4660,12 +4670,17 @@ mod tests {
}
log::info!("Host: find/create local worktree {:?}", path);
project
.update(&mut cx, |project, cx| {
project.find_or_create_local_worktree(path, false, cx)
})
.await
.unwrap();
let find_or_create_worktree = project.update(&mut cx, |project, cx| {
project.find_or_create_local_worktree(path, true, cx)
});
let find_or_create_worktree = async move {
find_or_create_worktree.await.unwrap();
};
if rng.lock().gen() {
cx.background().spawn(find_or_create_worktree).detach();
} else {
find_or_create_worktree.await;
}
}
10..=80 if !files.lock().is_empty() => {
let buffer = if self.buffers.is_empty() || rng.lock().gen() {
@ -4674,7 +4689,7 @@ mod tests {
.update(&mut cx, |project, cx| {
project.find_or_create_local_worktree(
file.clone(),
false,
true,
cx,
)
})
@ -4682,7 +4697,12 @@ mod tests {
.unwrap();
let project_path =
worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
log::info!("Host: opening path {:?}, {:?}", file, project_path);
log::info!(
"Host: opening path {:?}, worktree {}, relative_path {:?}",
file,
project_path.0,
project_path.1
);
let buffer = project
.update(&mut cx, |project, cx| {
project.open_buffer(project_path, cx)
@ -4769,7 +4789,8 @@ mod tests {
.worktrees(&cx)
.filter(|worktree| {
let worktree = worktree.read(cx);
!worktree.is_weak() && worktree.entries(false).any(|e| e.is_file())
worktree.is_visible()
&& worktree.entries(false).any(|e| e.is_file())
})
.choose(&mut *rng.lock())
}) {
@ -4793,11 +4814,11 @@ mod tests {
)
});
log::info!(
"Guest {}: opening path in worktree {:?} {:?} {:?}",
"Guest {}: opening path {:?} in worktree {} ({})",
guest_id,
project_path.1,
project_path.0,
worktree_root_name,
project_path.1
);
let buffer = project
.update(&mut cx, |project, cx| {
@ -4806,11 +4827,11 @@ mod tests {
.await
.unwrap();
log::info!(
"Guest {}: path in worktree {:?} {:?} {:?} opened with buffer id {:?}",
"Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
guest_id,
project_path.1,
project_path.0,
worktree_root_name,
project_path.1,
buffer.read_with(&cx, |buffer, _| buffer.remote_id())
);
self.buffers.insert(buffer.clone());
@ -4841,8 +4862,9 @@ mod tests {
10..=19 => {
let completions = project.update(&mut cx, |project, cx| {
log::info!(
"Guest {}: requesting completions for buffer {:?}",
"Guest {}: requesting completions for buffer {} ({:?})",
guest_id,
buffer.read(cx).remote_id(),
buffer.read(cx).file().unwrap().full_path(cx)
);
let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
@ -4861,8 +4883,9 @@ mod tests {
20..=29 => {
let code_actions = project.update(&mut cx, |project, cx| {
log::info!(
"Guest {}: requesting code actions for buffer {:?}",
"Guest {}: requesting code actions for buffer {} ({:?})",
guest_id,
buffer.read(cx).remote_id(),
buffer.read(cx).file().unwrap().full_path(cx)
);
let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
@ -4881,18 +4904,16 @@ mod tests {
30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
log::info!(
"Guest {}: saving buffer {:?}",
"Guest {}: saving buffer {} ({:?})",
guest_id,
buffer.remote_id(),
buffer.file().unwrap().full_path(cx)
);
(buffer.version(), buffer.save(cx))
});
let save = cx.spawn(|cx| async move {
let save = cx.background().spawn(async move {
let (saved_version, _) = save.await.expect("save request failed");
buffer.read_with(&cx, |buffer, _| {
assert!(buffer.version().observed_all(&saved_version));
assert!(saved_version.observed_all(&requested_version));
});
assert!(saved_version.observed_all(&requested_version));
});
if rng.lock().gen_bool(0.3) {
log::info!("Guest {}: detaching save request", guest_id);
@ -4904,8 +4925,9 @@ mod tests {
40..=44 => {
let prepare_rename = project.update(&mut cx, |project, cx| {
log::info!(
"Guest {}: preparing rename for buffer {:?}",
"Guest {}: preparing rename for buffer {} ({:?})",
guest_id,
buffer.read(cx).remote_id(),
buffer.read(cx).file().unwrap().full_path(cx)
);
let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
@ -4924,8 +4946,9 @@ mod tests {
45..=49 => {
let definitions = project.update(&mut cx, |project, cx| {
log::info!(
"Guest {}: requesting definitions for buffer {:?}",
"Guest {}: requesting definitions for buffer {} ({:?})",
guest_id,
buffer.read(cx).remote_id(),
buffer.read(cx).file().unwrap().full_path(cx)
);
let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
@ -4945,8 +4968,9 @@ mod tests {
50..=54 => {
let highlights = project.update(&mut cx, |project, cx| {
log::info!(
"Guest {}: requesting highlights for buffer {:?}",
"Guest {}: requesting highlights for buffer {} ({:?})",
guest_id,
buffer.read(cx).remote_id(),
buffer.read(cx).file().unwrap().full_path(cx)
);
let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
@ -4981,8 +5005,9 @@ mod tests {
_ => {
buffer.update(&mut cx, |buffer, cx| {
log::info!(
"Guest {}: updating buffer {:?}",
"Guest {}: updating buffer {} ({:?})",
guest_id,
buffer.remote_id(),
buffer.file().unwrap().full_path(cx)
);
buffer.randomly_edit(&mut *rng.lock(), 5, cx)

View file

@ -30,7 +30,7 @@ pub struct Project {
pub struct Worktree {
pub authorized_user_ids: Vec<UserId>,
pub root_name: String,
pub weak: bool,
pub visible: bool,
}
#[derive(Default)]
@ -204,7 +204,7 @@ impl Store {
let mut worktree_root_names = project
.worktrees
.values()
.filter(|worktree| !worktree.weak)
.filter(|worktree| worktree.visible)
.map(|worktree| worktree.root_name.clone())
.collect::<Vec<_>>();
worktree_root_names.sort_unstable();
@ -534,7 +534,12 @@ impl Store {
for entry in updated_entries {
worktree.entries.insert(entry.id, entry.clone());
}
Ok(project.connection_ids())
let connection_ids = project.connection_ids();
#[cfg(test)]
self.check_invariants();
Ok(connection_ids)
}
pub fn project_connection_ids(
@ -619,6 +624,23 @@ impl Store {
.guests
.contains_key(connection_id));
}
if let Some(share) = project.share.as_ref() {
for (worktree_id, worktree) in share.worktrees.iter() {
let mut paths = HashMap::default();
for entry in worktree.entries.values() {
let prev_entry = paths.insert(&entry.path, entry);
assert_eq!(
prev_entry,
None,
"worktree {:?}, duplicate path for entries {:?} and {:?}",
worktree_id,
prev_entry.unwrap(),
entry
);
}
}
}
}
for channel_id in &connection.channels {
let channel = self.channels.get(channel_id).unwrap();

View file

@ -520,7 +520,8 @@ pub struct EditOperation {
pub struct UndoOperation {
pub id: clock::Local,
pub counts: HashMap<clock::Local, u32>,
pub ranges: Vec<Range<FullOffset>>,
pub transaction_ranges: Vec<Range<FullOffset>>,
pub transaction_version: clock::Global,
pub version: clock::Global,
}
@ -1039,7 +1040,7 @@ impl Buffer {
let mut edits = Patch::default();
self.snapshot.undo_map.insert(undo);
let mut cx = undo.version.clone();
let mut cx = undo.transaction_version.clone();
for edit_id in undo.counts.keys().copied() {
cx.observe(edit_id);
}
@ -1047,7 +1048,7 @@ impl Buffer {
let mut old_fragments = self.fragments.cursor::<(VersionedFullOffset, usize)>();
let mut new_fragments = old_fragments.slice(
&VersionedFullOffset::Offset(undo.ranges[0].start),
&VersionedFullOffset::Offset(undo.transaction_ranges[0].start),
Bias::Right,
&cx,
);
@ -1055,7 +1056,7 @@ impl Buffer {
RopeBuilder::new(self.visible_text.cursor(0), self.deleted_text.cursor(0));
new_ropes.push_tree(new_fragments.summary().text);
for range in &undo.ranges {
for range in &undo.transaction_ranges {
let mut end_offset = old_fragments.end(&cx).0.full_offset();
if end_offset < range.start {
@ -1073,7 +1074,7 @@ impl Buffer {
let mut fragment = fragment.clone();
let fragment_was_visible = fragment.visible;
if fragment.was_visible(&undo.version, &self.undo_map)
if fragment.was_visible(&undo.transaction_version, &self.undo_map)
|| undo
.counts
.contains_key(&fragment.insertion_timestamp.local())
@ -1264,9 +1265,10 @@ impl Buffer {
let undo = UndoOperation {
id: self.local_clock.tick(),
version: self.version(),
counts,
ranges: transaction.ranges,
version: transaction.start.clone(),
transaction_ranges: transaction.ranges,
transaction_version: transaction.start.clone(),
};
self.apply_undo(&undo)?;
let operation = Operation::Undo {
@ -1307,6 +1309,32 @@ impl Buffer {
}
}
pub fn wait_for_anchors<'a>(
&mut self,
anchors: impl IntoIterator<Item = &'a Anchor>,
) -> impl 'static + Future<Output = ()> {
let mut futures = Vec::new();
for anchor in anchors {
if !self.version.observed(anchor.timestamp)
&& *anchor != Anchor::max()
&& *anchor != Anchor::min()
{
let (tx, rx) = oneshot::channel();
self.edit_id_resolvers
.entry(anchor.timestamp)
.or_default()
.push(tx);
futures.push(rx);
}
}
async move {
for mut future in futures {
future.recv().await;
}
}
}
pub fn wait_for_version(&mut self, version: clock::Global) -> impl Future<Output = ()> {
let (tx, mut rx) = barrier::channel();
if !self.snapshot.version.observed_all(&version) {

View file

@ -5,7 +5,6 @@ use tempdir::TempDir;
#[derive(Clone)]
struct Envelope<T: Clone> {
message: T,
sender: ReplicaId,
}
pub struct Network<T: Clone, R: rand::Rng> {
@ -40,28 +39,14 @@ impl<T: Clone, R: rand::Rng> Network<T, R> {
for (replica, inbox) in self.inboxes.iter_mut() {
if *replica != sender {
for message in &messages {
let min_index = inbox
.iter()
.enumerate()
.rev()
.find_map(|(index, envelope)| {
if sender == envelope.sender {
Some(index + 1)
} else {
None
}
})
.unwrap_or(0);
// Insert one or more duplicates of this message *after* the previous
// message delivered by this replica.
// Insert one or more duplicates of this message, potentially *before* the previous
// message sent by this peer to simulate out-of-order delivery.
for _ in 0..self.rng.gen_range(1..4) {
let insertion_index = self.rng.gen_range(min_index..inbox.len() + 1);
let insertion_index = self.rng.gen_range(0..inbox.len() + 1);
inbox.insert(
insertion_index,
Envelope {
message: message.clone(),
sender,
},
);
}

View file

@ -737,7 +737,7 @@ impl Workspace {
cx: &mut ViewContext<Self>,
) -> Task<Result<ProjectPath>> {
let entry = self.project().update(cx, |project, cx| {
project.find_or_create_local_worktree(abs_path, false, cx)
project.find_or_create_local_worktree(abs_path, true, cx)
});
cx.spawn(|_, cx| async move {
let (worktree, path) = entry.await?;

View file

@ -257,7 +257,7 @@ mod tests {
params
.project
.update(cx, |project, cx| {
project.find_or_create_local_worktree("/root", false, cx)
project.find_or_create_local_worktree("/root", true, cx)
})
.await
.unwrap();
@ -370,7 +370,7 @@ mod tests {
params
.project
.update(cx, |project, cx| {
project.find_or_create_local_worktree("/dir1", false, cx)
project.find_or_create_local_worktree("/dir1", true, cx)
})
.await
.unwrap();
@ -445,7 +445,7 @@ mod tests {
params
.project
.update(cx, |project, cx| {
project.find_or_create_local_worktree("/root", false, cx)
project.find_or_create_local_worktree("/root", true, cx)
})
.await
.unwrap();
@ -492,7 +492,7 @@ mod tests {
params
.project
.update(cx, |project, cx| {
project.find_or_create_local_worktree("/root", false, cx)
project.find_or_create_local_worktree("/root", true, cx)
})
.await
.unwrap();
@ -644,7 +644,7 @@ mod tests {
params
.project
.update(cx, |project, cx| {
project.find_or_create_local_worktree("/root", false, cx)
project.find_or_create_local_worktree("/root", true, cx)
})
.await
.unwrap();
@ -707,7 +707,7 @@ mod tests {
params
.project
.update(cx, |project, cx| {
project.find_or_create_local_worktree("/root", false, cx)
project.find_or_create_local_worktree("/root", true, cx)
})
.await
.unwrap();