Co-authored-by: Julia Risley <julia@zed.dev>
This commit is contained in:
Max Brunsfeld 2023-04-24 13:54:47 -07:00
parent a8ddba55d8
commit 7bd51851c2

View file

@ -93,7 +93,7 @@ pub trait Item {
pub struct Project { pub struct Project {
worktrees: Vec<WorktreeHandle>, worktrees: Vec<WorktreeHandle>,
active_entry: Option<ProjectEntryId>, active_entry: Option<ProjectEntryId>,
buffer_changes_tx: mpsc::UnboundedSender<BufferMessage>, buffer_ordered_messages_tx: mpsc::UnboundedSender<BufferOrderedMessage>,
languages: Arc<LanguageRegistry>, languages: Arc<LanguageRegistry>,
language_servers: HashMap<LanguageServerId, LanguageServerState>, language_servers: HashMap<LanguageServerId, LanguageServerState>,
language_server_ids: HashMap<(WorktreeId, LanguageServerName), LanguageServerId>, language_server_ids: HashMap<(WorktreeId, LanguageServerName), LanguageServerId>,
@ -137,7 +137,8 @@ struct LspBufferSnapshot {
snapshot: TextBufferSnapshot, snapshot: TextBufferSnapshot,
} }
enum BufferMessage { /// Message ordered with respect to buffer operations
enum BufferOrderedMessage {
Operation { Operation {
buffer_id: u64, buffer_id: u64,
operation: proto::Operation, operation: proto::Operation,
@ -447,11 +448,11 @@ impl Project {
) -> ModelHandle<Self> { ) -> ModelHandle<Self> {
cx.add_model(|cx: &mut ModelContext<Self>| { cx.add_model(|cx: &mut ModelContext<Self>| {
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded();
cx.spawn_weak(|this, cx| Self::send_buffer_messages(this, rx, cx)) cx.spawn_weak(|this, cx| Self::send_buffer_ordered_messages(this, rx, cx))
.detach(); .detach();
Self { Self {
worktrees: Default::default(), worktrees: Default::default(),
buffer_changes_tx: tx, buffer_ordered_messages_tx: tx,
collaborators: Default::default(), collaborators: Default::default(),
opened_buffers: Default::default(), opened_buffers: Default::default(),
shared_buffers: Default::default(), shared_buffers: Default::default(),
@ -515,11 +516,11 @@ impl Project {
} }
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded();
cx.spawn_weak(|this, cx| Self::send_buffer_messages(this, rx, cx)) cx.spawn_weak(|this, cx| Self::send_buffer_ordered_messages(this, rx, cx))
.detach(); .detach();
let mut this = Self { let mut this = Self {
worktrees: Vec::new(), worktrees: Vec::new(),
buffer_changes_tx: tx, buffer_ordered_messages_tx: tx,
loading_buffers_by_path: Default::default(), loading_buffers_by_path: Default::default(),
opened_buffer: watch::channel(), opened_buffer: watch::channel(),
shared_buffers: Default::default(), shared_buffers: Default::default(),
@ -1172,8 +1173,8 @@ impl Project {
) )
}) })
.collect(); .collect();
self.buffer_changes_tx self.buffer_ordered_messages_tx
.unbounded_send(BufferMessage::Resync) .unbounded_send(BufferOrderedMessage::Resync)
.unwrap(); .unwrap();
cx.notify(); cx.notify();
Ok(()) Ok(())
@ -1788,9 +1789,9 @@ impl Project {
} }
} }
async fn send_buffer_messages( async fn send_buffer_ordered_messages(
this: WeakModelHandle<Self>, this: WeakModelHandle<Self>,
rx: UnboundedReceiver<BufferMessage>, rx: UnboundedReceiver<BufferOrderedMessage>,
mut cx: AsyncAppContext, mut cx: AsyncAppContext,
) -> Option<()> { ) -> Option<()> {
const MAX_BATCH_SIZE: usize = 128; const MAX_BATCH_SIZE: usize = 128;
@ -1830,7 +1831,7 @@ impl Project {
for change in changes { for change in changes {
match change { match change {
BufferMessage::Operation { BufferOrderedMessage::Operation {
buffer_id, buffer_id,
operation, operation,
} => { } => {
@ -1844,7 +1845,7 @@ impl Project {
.push(operation); .push(operation);
} }
BufferMessage::Resync => { BufferOrderedMessage::Resync => {
operations_by_buffer_id.clear(); operations_by_buffer_id.clear();
if this if this
.update(&mut cx, |this, cx| this.synchronize_remote_buffers(cx)) .update(&mut cx, |this, cx| this.synchronize_remote_buffers(cx))
@ -1855,7 +1856,7 @@ impl Project {
} }
} }
BufferMessage::LanguageServerUpdate { BufferOrderedMessage::LanguageServerUpdate {
language_server_id, language_server_id,
message, message,
} => { } => {
@ -1904,8 +1905,8 @@ impl Project {
) -> Option<()> { ) -> Option<()> {
match event { match event {
BufferEvent::Operation(operation) => { BufferEvent::Operation(operation) => {
self.buffer_changes_tx self.buffer_ordered_messages_tx
.unbounded_send(BufferMessage::Operation { .unbounded_send(BufferOrderedMessage::Operation {
buffer_id: buffer.read(cx).remote_id(), buffer_id: buffer.read(cx).remote_id(),
operation: language::proto::serialize_operation(operation), operation: language::proto::serialize_operation(operation),
}) })
@ -2018,9 +2019,9 @@ impl Project {
language_server_id, language_server_id,
cx, cx,
); );
this.buffer_changes_tx this.buffer_ordered_messages_tx
.unbounded_send( .unbounded_send(
BufferMessage::LanguageServerUpdate { BufferOrderedMessage::LanguageServerUpdate {
language_server_id, language_server_id,
message:proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(Default::default()) message:proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(Default::default())
}, },
@ -2697,8 +2698,8 @@ impl Project {
if is_disk_based_diagnostics_progress { if is_disk_based_diagnostics_progress {
language_server_status.has_pending_diagnostic_updates = true; language_server_status.has_pending_diagnostic_updates = true;
self.disk_based_diagnostics_started(language_server_id, cx); self.disk_based_diagnostics_started(language_server_id, cx);
self.buffer_changes_tx self.buffer_ordered_messages_tx
.unbounded_send(BufferMessage::LanguageServerUpdate { .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
language_server_id, language_server_id,
message: proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(Default::default()) message: proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(Default::default())
}) })
@ -2714,8 +2715,8 @@ impl Project {
}, },
cx, cx,
); );
self.buffer_changes_tx self.buffer_ordered_messages_tx
.unbounded_send(BufferMessage::LanguageServerUpdate { .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
language_server_id, language_server_id,
message: proto::update_language_server::Variant::WorkStart( message: proto::update_language_server::Variant::WorkStart(
proto::LspWorkStart { proto::LspWorkStart {
@ -2740,8 +2741,8 @@ impl Project {
}, },
cx, cx,
); );
self.buffer_changes_tx self.buffer_ordered_messages_tx
.unbounded_send(BufferMessage::LanguageServerUpdate { .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
language_server_id, language_server_id,
message: proto::update_language_server::Variant::WorkProgress( message: proto::update_language_server::Variant::WorkProgress(
proto::LspWorkProgress { proto::LspWorkProgress {
@ -2760,8 +2761,8 @@ impl Project {
if is_disk_based_diagnostics_progress { if is_disk_based_diagnostics_progress {
language_server_status.has_pending_diagnostic_updates = false; language_server_status.has_pending_diagnostic_updates = false;
self.disk_based_diagnostics_finished(language_server_id, cx); self.disk_based_diagnostics_finished(language_server_id, cx);
self.buffer_changes_tx self.buffer_ordered_messages_tx
.unbounded_send(BufferMessage::LanguageServerUpdate { .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
language_server_id, language_server_id,
message: message:
proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated( proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
@ -2771,8 +2772,8 @@ impl Project {
.ok(); .ok();
} else { } else {
self.on_lsp_work_end(language_server_id, token.clone(), cx); self.on_lsp_work_end(language_server_id, token.clone(), cx);
self.buffer_changes_tx self.buffer_ordered_messages_tx
.unbounded_send(BufferMessage::LanguageServerUpdate { .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
language_server_id, language_server_id,
message: proto::update_language_server::Variant::WorkEnd( message: proto::update_language_server::Variant::WorkEnd(
proto::LspWorkEnd { token }, proto::LspWorkEnd { token },
@ -4915,8 +4916,8 @@ impl Project {
if is_host { if is_host {
this.opened_buffers this.opened_buffers
.retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_))); .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
this.buffer_changes_tx this.buffer_ordered_messages_tx
.unbounded_send(BufferMessage::Resync) .unbounded_send(BufferOrderedMessage::Resync)
.unwrap(); .unwrap();
} }