Track open buffers when handling sync requests

When a host sends a buffer to a guest for the first time, they record that
they have done so in a set tied to that guest's peer id. When the guest
reconnects and syncs buffers, they do so under a different peer id, so we
need to be sure we track which buffers we have sent them to avoid sending
them the same buffer twice, which violates the guest's assumptions.
This commit is contained in:
Nathan Sobo 2023-01-02 20:12:00 -07:00
parent 74843493f4
commit a6ffcdd0cf
3 changed files with 29 additions and 14 deletions

View file

@ -23,7 +23,7 @@ use std::{
time::{Duration, SystemTime}, time::{Duration, SystemTime},
}; };
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
use util::{post_inc, ResultExt}; use util::ResultExt;
#[cfg(any(test, feature = "test-support"))] #[cfg(any(test, feature = "test-support"))]
use collections::{btree_map, BTreeMap}; use collections::{btree_map, BTreeMap};
@ -840,7 +840,7 @@ impl Fs for FakeFs {
let target = normalize_path(target); let target = normalize_path(target);
let mut state = self.state.lock().await; let mut state = self.state.lock().await;
let mtime = state.next_mtime; let mtime = state.next_mtime;
let inode = post_inc(&mut state.next_inode); let inode = util::post_inc(&mut state.next_inode);
state.next_mtime += Duration::from_nanos(1); state.next_mtime += Duration::from_nanos(1);
let source_entry = state.read_path(&source).await?; let source_entry = state.read_path(&source).await?;
let content = source_entry.lock().await.file_content(&source)?.clone(); let content = source_entry.lock().await.file_content(&source)?.clone();

View file

@ -4,7 +4,7 @@ use futures::channel::mpsc;
use smol::{channel, prelude::*, Executor}; use smol::{channel, prelude::*, Executor};
use std::{ use std::{
any::Any, any::Any,
fmt::{self, Display, Write as _}, fmt::{self, Display},
marker::PhantomData, marker::PhantomData,
mem, mem,
pin::Pin, pin::Pin,
@ -17,8 +17,7 @@ use std::{
use crate::{ use crate::{
platform::{self, Dispatcher}, platform::{self, Dispatcher},
util::{self, CwdBacktrace}, util, MutableAppContext,
MutableAppContext,
}; };
pub enum Foreground { pub enum Foreground {
@ -549,6 +548,8 @@ impl Future for Timer {
#[cfg(any(test, feature = "test-support"))] #[cfg(any(test, feature = "test-support"))]
impl DeterministicState { impl DeterministicState {
fn push_to_history(&mut self, event: ExecutorEvent) { fn push_to_history(&mut self, event: ExecutorEvent) {
use std::fmt::Write as _;
self.poll_history.push(event); self.poll_history.push(event);
if let Some(prev_history) = &self.previous_poll_history { if let Some(prev_history) = &self.previous_poll_history {
let ix = self.poll_history.len() - 1; let ix = self.poll_history.len() - 1;
@ -560,7 +561,7 @@ impl DeterministicState {
"current runnable backtrace:\n{:?}", "current runnable backtrace:\n{:?}",
self.runnable_backtraces.get_mut(&event.id()).map(|trace| { self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
trace.resolve(); trace.resolve();
CwdBacktrace(trace) util::CwdBacktrace(trace)
}) })
) )
.unwrap(); .unwrap();
@ -571,7 +572,7 @@ impl DeterministicState {
.get_mut(&prev_event.id()) .get_mut(&prev_event.id())
.map(|trace| { .map(|trace| {
trace.resolve(); trace.resolve();
CwdBacktrace(trace) util::CwdBacktrace(trace)
}) })
) )
.unwrap(); .unwrap();

View file

@ -62,7 +62,7 @@ use std::{
time::Instant, time::Instant,
}; };
use terminal::{Terminal, TerminalBuilder}; use terminal::{Terminal, TerminalBuilder};
use util::{defer, post_inc, ResultExt, TryFutureExt as _}; use util::{debug_panic, defer, post_inc, ResultExt, TryFutureExt as _};
pub use fs::*; pub use fs::*;
pub use worktree::*; pub use worktree::*;
@ -1501,16 +1501,20 @@ impl Project {
} }
Some(OpenBuffer::Weak(existing_handle)) => { Some(OpenBuffer::Weak(existing_handle)) => {
if existing_handle.upgrade(cx).is_some() { if existing_handle.upgrade(cx).is_some() {
debug_panic!("already registered buffer with remote id {}", remote_id);
Err(anyhow!( Err(anyhow!(
"already registered buffer with remote id {}", "already registered buffer with remote id {}",
remote_id remote_id
))? ))?
} }
} }
Some(OpenBuffer::Strong(_)) => Err(anyhow!( Some(OpenBuffer::Strong(_)) => {
"already registered buffer with remote id {}", debug_panic!("already registered buffer with remote id {}", remote_id);
remote_id Err(anyhow!(
))?, "already registered buffer with remote id {}",
remote_id
))?
}
} }
cx.subscribe(buffer, |this, buffer, event, cx| { cx.subscribe(buffer, |this, buffer, event, cx| {
this.on_buffer_event(buffer, event, cx); this.on_buffer_event(buffer, event, cx);
@ -5150,18 +5154,28 @@ impl Project {
this: ModelHandle<Self>, this: ModelHandle<Self>,
envelope: TypedEnvelope<proto::SynchronizeBuffers>, envelope: TypedEnvelope<proto::SynchronizeBuffers>,
_: Arc<Client>, _: Arc<Client>,
cx: AsyncAppContext, mut cx: AsyncAppContext,
) -> Result<proto::SynchronizeBuffersResponse> { ) -> Result<proto::SynchronizeBuffersResponse> {
let project_id = envelope.payload.project_id; let project_id = envelope.payload.project_id;
let mut response = proto::SynchronizeBuffersResponse { let mut response = proto::SynchronizeBuffersResponse {
buffers: Default::default(), buffers: Default::default(),
}; };
this.read_with(&cx, |this, cx| { this.update(&mut cx, |this, cx| {
let Some(guest_id) = envelope.original_sender_id else {
log::error!("missing original_sender_id on SynchronizeBuffers request");
return;
};
for buffer in envelope.payload.buffers { for buffer in envelope.payload.buffers {
let buffer_id = buffer.id; let buffer_id = buffer.id;
let remote_version = language::proto::deserialize_version(buffer.version); let remote_version = language::proto::deserialize_version(buffer.version);
if let Some(buffer) = this.buffer_for_id(buffer_id, cx) { if let Some(buffer) = this.buffer_for_id(buffer_id, cx) {
this.shared_buffers
.entry(guest_id)
.or_default()
.insert(buffer_id);
let buffer = buffer.read(cx); let buffer = buffer.read(cx);
response.buffers.push(proto::BufferVersion { response.buffers.push(proto::BufferVersion {
id: buffer_id, id: buffer_id,