Create buffers for remote collaborators out of band
Previously, we would use `Project::serialize_buffer_for_peer` and `Project::deserialize_buffer` respectively in the host and in the guest to create a new buffer or just send its ID if the host thought the buffer had already been sent. These methods would be called as part of other methods, such as `Project::open_buffer_by_id` or `Project::open_buffer_for_symbol`. However, if any of the tasks driving the futures that eventually called `Project::deserialize_buffer` were dropped after the host responded with the buffer state but (crucially) before the guest deserialized it and registered it, there could be a situation where the host thought the guest had the buffer (thus sending them just the buffer id) and the guest would wait indefinitely. Given how crucial this interaction is, this commit switches to creating remote buffers for peers out of band. The host will push buffers to guests, who will always refer to buffers via IDs and wait for the host to send them, as opposed to including the buffer's payload as part of some other operation.
This commit is contained in:
parent
75c9b90c76
commit
9c9bf07e40
7 changed files with 183 additions and 171 deletions
|
@ -522,11 +522,10 @@ async fn location_links_from_proto(
|
|||
for link in proto_links {
|
||||
let origin = match link.origin {
|
||||
Some(origin) => {
|
||||
let buffer = origin
|
||||
.buffer
|
||||
.ok_or_else(|| anyhow!("missing origin buffer"))?;
|
||||
let buffer = project
|
||||
.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
|
||||
.update(&mut cx, |this, cx| {
|
||||
this.wait_for_buffer(origin.buffer_id, cx)
|
||||
})
|
||||
.await?;
|
||||
let start = origin
|
||||
.start
|
||||
|
@ -548,9 +547,10 @@ async fn location_links_from_proto(
|
|||
};
|
||||
|
||||
let target = link.target.ok_or_else(|| anyhow!("missing target"))?;
|
||||
let buffer = target.buffer.ok_or_else(|| anyhow!("missing buffer"))?;
|
||||
let buffer = project
|
||||
.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
|
||||
.update(&mut cx, |this, cx| {
|
||||
this.wait_for_buffer(target.buffer_id, cx)
|
||||
})
|
||||
.await?;
|
||||
let start = target
|
||||
.start
|
||||
|
@ -664,19 +664,19 @@ fn location_links_to_proto(
|
|||
.into_iter()
|
||||
.map(|definition| {
|
||||
let origin = definition.origin.map(|origin| {
|
||||
let buffer = project.serialize_buffer_for_peer(&origin.buffer, peer_id, cx);
|
||||
let buffer_id = project.create_buffer_for_peer(&origin.buffer, peer_id, cx);
|
||||
proto::Location {
|
||||
start: Some(serialize_anchor(&origin.range.start)),
|
||||
end: Some(serialize_anchor(&origin.range.end)),
|
||||
buffer: Some(buffer),
|
||||
buffer_id,
|
||||
}
|
||||
});
|
||||
|
||||
let buffer = project.serialize_buffer_for_peer(&definition.target.buffer, peer_id, cx);
|
||||
let buffer_id = project.create_buffer_for_peer(&definition.target.buffer, peer_id, cx);
|
||||
let target = proto::Location {
|
||||
start: Some(serialize_anchor(&definition.target.range.start)),
|
||||
end: Some(serialize_anchor(&definition.target.range.end)),
|
||||
buffer: Some(buffer),
|
||||
buffer_id,
|
||||
};
|
||||
|
||||
proto::LocationLink {
|
||||
|
@ -792,11 +792,11 @@ impl LspCommand for GetReferences {
|
|||
let locations = response
|
||||
.into_iter()
|
||||
.map(|definition| {
|
||||
let buffer = project.serialize_buffer_for_peer(&definition.buffer, peer_id, cx);
|
||||
let buffer_id = project.create_buffer_for_peer(&definition.buffer, peer_id, cx);
|
||||
proto::Location {
|
||||
start: Some(serialize_anchor(&definition.range.start)),
|
||||
end: Some(serialize_anchor(&definition.range.end)),
|
||||
buffer: Some(buffer),
|
||||
buffer_id,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
@ -812,9 +812,10 @@ impl LspCommand for GetReferences {
|
|||
) -> Result<Vec<Location>> {
|
||||
let mut locations = Vec::new();
|
||||
for location in message.locations {
|
||||
let buffer = location.buffer.ok_or_else(|| anyhow!("missing buffer"))?;
|
||||
let target_buffer = project
|
||||
.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
|
||||
.update(&mut cx, |this, cx| {
|
||||
this.wait_for_buffer(location.buffer_id, cx)
|
||||
})
|
||||
.await?;
|
||||
let start = location
|
||||
.start
|
||||
|
|
|
@ -112,7 +112,7 @@ pub struct Project {
|
|||
collaborators: HashMap<PeerId, Collaborator>,
|
||||
client_subscriptions: Vec<client::Subscription>,
|
||||
_subscriptions: Vec<gpui::Subscription>,
|
||||
opened_buffer: (Rc<RefCell<watch::Sender<()>>>, watch::Receiver<()>),
|
||||
opened_buffer: (watch::Sender<()>, watch::Receiver<()>),
|
||||
shared_buffers: HashMap<PeerId, HashSet<u64>>,
|
||||
#[allow(clippy::type_complexity)]
|
||||
loading_buffers: HashMap<
|
||||
|
@ -375,6 +375,7 @@ impl Project {
|
|||
client.add_model_message_handler(Self::handle_update_project);
|
||||
client.add_model_message_handler(Self::handle_unregister_project);
|
||||
client.add_model_message_handler(Self::handle_project_unshared);
|
||||
client.add_model_message_handler(Self::handle_create_buffer_for_peer);
|
||||
client.add_model_message_handler(Self::handle_update_buffer_file);
|
||||
client.add_model_message_handler(Self::handle_update_buffer);
|
||||
client.add_model_message_handler(Self::handle_update_diagnostic_summary);
|
||||
|
@ -454,7 +455,6 @@ impl Project {
|
|||
let handle = cx.weak_handle();
|
||||
project_store.update(cx, |store, cx| store.add_project(handle, cx));
|
||||
|
||||
let (opened_buffer_tx, opened_buffer_rx) = watch::channel();
|
||||
Self {
|
||||
worktrees: Default::default(),
|
||||
collaborators: Default::default(),
|
||||
|
@ -472,7 +472,7 @@ impl Project {
|
|||
_maintain_remote_id,
|
||||
_maintain_online_status,
|
||||
},
|
||||
opened_buffer: (Rc::new(RefCell::new(opened_buffer_tx)), opened_buffer_rx),
|
||||
opened_buffer: watch::channel(),
|
||||
client_subscriptions: Vec::new(),
|
||||
_subscriptions: vec![cx.observe_global::<Settings, _>(Self::on_settings_changed)],
|
||||
_maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
|
||||
|
@ -540,7 +540,6 @@ impl Project {
|
|||
worktrees.push(worktree);
|
||||
}
|
||||
|
||||
let (opened_buffer_tx, opened_buffer_rx) = watch::channel();
|
||||
let this = cx.add_model(|cx: &mut ModelContext<Self>| {
|
||||
let handle = cx.weak_handle();
|
||||
project_store.update(cx, |store, cx| store.add_project(handle, cx));
|
||||
|
@ -548,7 +547,7 @@ impl Project {
|
|||
let mut this = Self {
|
||||
worktrees: Vec::new(),
|
||||
loading_buffers: Default::default(),
|
||||
opened_buffer: (Rc::new(RefCell::new(opened_buffer_tx)), opened_buffer_rx),
|
||||
opened_buffer: watch::channel(),
|
||||
shared_buffers: Default::default(),
|
||||
loading_local_worktrees: Default::default(),
|
||||
active_entry: None,
|
||||
|
@ -1624,9 +1623,10 @@ impl Project {
|
|||
path: path_string,
|
||||
})
|
||||
.await?;
|
||||
let buffer = response.buffer.ok_or_else(|| anyhow!("missing buffer"))?;
|
||||
this.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
|
||||
.await
|
||||
this.update(&mut cx, |this, cx| {
|
||||
this.wait_for_buffer(response.buffer_id, cx)
|
||||
})
|
||||
.await
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -1684,11 +1684,8 @@ impl Project {
|
|||
.client
|
||||
.request(proto::OpenBufferById { project_id, id });
|
||||
cx.spawn(|this, mut cx| async move {
|
||||
let buffer = request
|
||||
.await?
|
||||
.buffer
|
||||
.ok_or_else(|| anyhow!("invalid buffer"))?;
|
||||
this.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
|
||||
let buffer_id = request.await?.buffer_id;
|
||||
this.update(&mut cx, |this, cx| this.wait_for_buffer(buffer_id, cx))
|
||||
.await
|
||||
})
|
||||
} else {
|
||||
|
@ -1800,6 +1797,7 @@ impl Project {
|
|||
})
|
||||
.detach();
|
||||
|
||||
*self.opened_buffer.0.borrow_mut() = ();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -3476,9 +3474,10 @@ impl Project {
|
|||
});
|
||||
cx.spawn(|this, mut cx| async move {
|
||||
let response = request.await?;
|
||||
let buffer = response.buffer.ok_or_else(|| anyhow!("invalid buffer"))?;
|
||||
this.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
|
||||
.await
|
||||
this.update(&mut cx, |this, cx| {
|
||||
this.wait_for_buffer(response.buffer_id, cx)
|
||||
})
|
||||
.await
|
||||
})
|
||||
} else {
|
||||
Task::ready(Err(anyhow!("project does not have a remote id")))
|
||||
|
@ -4294,9 +4293,10 @@ impl Project {
|
|||
let response = request.await?;
|
||||
let mut result = HashMap::default();
|
||||
for location in response.locations {
|
||||
let buffer = location.buffer.ok_or_else(|| anyhow!("missing buffer"))?;
|
||||
let target_buffer = this
|
||||
.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
|
||||
.update(&mut cx, |this, cx| {
|
||||
this.wait_for_buffer(location.buffer_id, cx)
|
||||
})
|
||||
.await?;
|
||||
let start = location
|
||||
.start
|
||||
|
@ -5107,6 +5107,36 @@ impl Project {
|
|||
})
|
||||
}
|
||||
|
||||
async fn handle_create_buffer_for_peer(
|
||||
this: ModelHandle<Self>,
|
||||
envelope: TypedEnvelope<proto::CreateBufferForPeer>,
|
||||
_: Arc<Client>,
|
||||
mut cx: AsyncAppContext,
|
||||
) -> Result<()> {
|
||||
this.update(&mut cx, |this, cx| {
|
||||
let mut buffer = envelope
|
||||
.payload
|
||||
.buffer
|
||||
.ok_or_else(|| anyhow!("invalid buffer"))?;
|
||||
let mut buffer_file = None;
|
||||
if let Some(file) = buffer.file.take() {
|
||||
let worktree_id = WorktreeId::from_proto(file.worktree_id);
|
||||
let worktree = this
|
||||
.worktree_for_id(worktree_id, cx)
|
||||
.ok_or_else(|| anyhow!("no worktree found for id {}", file.worktree_id))?;
|
||||
buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
|
||||
as Arc<dyn language::File>);
|
||||
}
|
||||
|
||||
let buffer = cx.add_model(|cx| {
|
||||
Buffer::from_proto(this.replica_id(), buffer, buffer_file, cx).unwrap()
|
||||
});
|
||||
this.register_buffer(&buffer, cx)?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
async fn handle_update_buffer_file(
|
||||
this: ModelHandle<Self>,
|
||||
envelope: TypedEnvelope<proto::UpdateBufferFile>,
|
||||
|
@ -5448,9 +5478,9 @@ impl Project {
|
|||
for range in ranges {
|
||||
let start = serialize_anchor(&range.start);
|
||||
let end = serialize_anchor(&range.end);
|
||||
let buffer = this.serialize_buffer_for_peer(&buffer, peer_id, cx);
|
||||
let buffer_id = this.create_buffer_for_peer(&buffer, peer_id, cx);
|
||||
locations.push(proto::Location {
|
||||
buffer: Some(buffer),
|
||||
buffer_id,
|
||||
start: Some(start),
|
||||
end: Some(end),
|
||||
});
|
||||
|
@ -5487,9 +5517,9 @@ impl Project {
|
|||
.await?;
|
||||
|
||||
Ok(proto::OpenBufferForSymbolResponse {
|
||||
buffer: Some(this.update(&mut cx, |this, cx| {
|
||||
this.serialize_buffer_for_peer(&buffer, peer_id, cx)
|
||||
})),
|
||||
buffer_id: this.update(&mut cx, |this, cx| {
|
||||
this.create_buffer_for_peer(&buffer, peer_id, cx)
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -5515,7 +5545,7 @@ impl Project {
|
|||
.await?;
|
||||
this.update(&mut cx, |this, cx| {
|
||||
Ok(proto::OpenBufferResponse {
|
||||
buffer: Some(this.serialize_buffer_for_peer(&buffer, peer_id, cx)),
|
||||
buffer_id: this.create_buffer_for_peer(&buffer, peer_id, cx),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
@ -5541,7 +5571,7 @@ impl Project {
|
|||
let buffer = open_buffer.await?;
|
||||
this.update(&mut cx, |this, cx| {
|
||||
Ok(proto::OpenBufferResponse {
|
||||
buffer: Some(this.serialize_buffer_for_peer(&buffer, peer_id, cx)),
|
||||
buffer_id: this.create_buffer_for_peer(&buffer, peer_id, cx),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
@ -5553,13 +5583,13 @@ impl Project {
|
|||
cx: &AppContext,
|
||||
) -> proto::ProjectTransaction {
|
||||
let mut serialized_transaction = proto::ProjectTransaction {
|
||||
buffers: Default::default(),
|
||||
buffer_ids: Default::default(),
|
||||
transactions: Default::default(),
|
||||
};
|
||||
for (buffer, transaction) in project_transaction.0 {
|
||||
serialized_transaction
|
||||
.buffers
|
||||
.push(self.serialize_buffer_for_peer(&buffer, peer_id, cx));
|
||||
.buffer_ids
|
||||
.push(self.create_buffer_for_peer(&buffer, peer_id, cx));
|
||||
serialized_transaction
|
||||
.transactions
|
||||
.push(language::proto::serialize_transaction(&transaction));
|
||||
|
@ -5575,9 +5605,10 @@ impl Project {
|
|||
) -> Task<Result<ProjectTransaction>> {
|
||||
cx.spawn(|this, mut cx| async move {
|
||||
let mut project_transaction = ProjectTransaction::default();
|
||||
for (buffer, transaction) in message.buffers.into_iter().zip(message.transactions) {
|
||||
for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
|
||||
{
|
||||
let buffer = this
|
||||
.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
|
||||
.update(&mut cx, |this, cx| this.wait_for_buffer(buffer_id, cx))
|
||||
.await?;
|
||||
let transaction = language::proto::deserialize_transaction(transaction)?;
|
||||
project_transaction.0.insert(buffer, transaction);
|
||||
|
@ -5601,81 +5632,51 @@ impl Project {
|
|||
})
|
||||
}
|
||||
|
||||
fn serialize_buffer_for_peer(
|
||||
fn create_buffer_for_peer(
|
||||
&mut self,
|
||||
buffer: &ModelHandle<Buffer>,
|
||||
peer_id: PeerId,
|
||||
cx: &AppContext,
|
||||
) -> proto::Buffer {
|
||||
) -> u64 {
|
||||
let buffer_id = buffer.read(cx).remote_id();
|
||||
let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
|
||||
if shared_buffers.insert(buffer_id) {
|
||||
proto::Buffer {
|
||||
variant: Some(proto::buffer::Variant::State(buffer.read(cx).to_proto())),
|
||||
}
|
||||
} else {
|
||||
proto::Buffer {
|
||||
variant: Some(proto::buffer::Variant::Id(buffer_id)),
|
||||
if let Some(project_id) = self.remote_id() {
|
||||
let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
|
||||
if shared_buffers.insert(buffer_id) {
|
||||
self.client
|
||||
.send(proto::CreateBufferForPeer {
|
||||
project_id,
|
||||
peer_id: peer_id.0,
|
||||
buffer: Some(buffer.read(cx).to_proto()),
|
||||
})
|
||||
.log_err();
|
||||
}
|
||||
}
|
||||
|
||||
buffer_id
|
||||
}
|
||||
|
||||
fn deserialize_buffer(
|
||||
&mut self,
|
||||
buffer: proto::Buffer,
|
||||
fn wait_for_buffer(
|
||||
&self,
|
||||
id: u64,
|
||||
cx: &mut ModelContext<Self>,
|
||||
) -> Task<Result<ModelHandle<Buffer>>> {
|
||||
let replica_id = self.replica_id();
|
||||
|
||||
let opened_buffer_tx = self.opened_buffer.0.clone();
|
||||
let mut opened_buffer_rx = self.opened_buffer.1.clone();
|
||||
cx.spawn(|this, mut cx| async move {
|
||||
match buffer.variant.ok_or_else(|| anyhow!("missing buffer"))? {
|
||||
proto::buffer::Variant::Id(id) => {
|
||||
let buffer = loop {
|
||||
let buffer = this.read_with(&cx, |this, cx| {
|
||||
this.opened_buffers
|
||||
.get(&id)
|
||||
.and_then(|buffer| buffer.upgrade(cx))
|
||||
});
|
||||
if let Some(buffer) = buffer {
|
||||
break buffer;
|
||||
}
|
||||
opened_buffer_rx
|
||||
.next()
|
||||
.await
|
||||
.ok_or_else(|| anyhow!("project dropped while waiting for buffer"))?;
|
||||
};
|
||||
Ok(buffer)
|
||||
cx.spawn(|this, cx| async move {
|
||||
let buffer = loop {
|
||||
let buffer = this.read_with(&cx, |this, cx| {
|
||||
this.opened_buffers
|
||||
.get(&id)
|
||||
.and_then(|buffer| buffer.upgrade(cx))
|
||||
});
|
||||
if let Some(buffer) = buffer {
|
||||
break buffer;
|
||||
}
|
||||
proto::buffer::Variant::State(mut buffer) => {
|
||||
let mut buffer_worktree = None;
|
||||
let mut buffer_file = None;
|
||||
if let Some(file) = buffer.file.take() {
|
||||
this.read_with(&cx, |this, cx| {
|
||||
let worktree_id = WorktreeId::from_proto(file.worktree_id);
|
||||
let worktree =
|
||||
this.worktree_for_id(worktree_id, cx).ok_or_else(|| {
|
||||
anyhow!("no worktree found for id {}", file.worktree_id)
|
||||
})?;
|
||||
buffer_file =
|
||||
Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
|
||||
as Arc<dyn language::File>);
|
||||
buffer_worktree = Some(worktree);
|
||||
Ok::<_, anyhow::Error>(())
|
||||
})?;
|
||||
}
|
||||
|
||||
let buffer = cx.add_model(|cx| {
|
||||
Buffer::from_proto(replica_id, buffer, buffer_file, cx).unwrap()
|
||||
});
|
||||
|
||||
this.update(&mut cx, |this, cx| this.register_buffer(&buffer, cx))?;
|
||||
|
||||
*opened_buffer_tx.borrow_mut().borrow_mut() = ();
|
||||
Ok(buffer)
|
||||
}
|
||||
}
|
||||
opened_buffer_rx
|
||||
.next()
|
||||
.await
|
||||
.ok_or_else(|| anyhow!("project dropped while waiting for buffer"))?;
|
||||
};
|
||||
Ok(buffer)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue