Move several buffer-related messages to the background

This commit is contained in:
Antonio Scandurra 2022-03-03 12:18:19 +01:00
parent 1c14168f38
commit 14d26eeedc
7 changed files with 187 additions and 99 deletions

View file

@ -1291,6 +1291,13 @@ impl Buffer {
self.text.wait_for_edits(edit_ids) self.text.wait_for_edits(edit_ids)
} }
pub fn wait_for_anchors<'a>(
&mut self,
anchors: impl IntoIterator<Item = &'a Anchor>,
) -> impl Future<Output = ()> {
self.text.wait_for_anchors(anchors)
}
pub fn wait_for_version(&mut self, version: clock::Global) -> impl Future<Output = ()> { pub fn wait_for_version(&mut self, version: clock::Global) -> impl Future<Output = ()> {
self.text.wait_for_version(version) self.text.wait_for_version(version)
} }

View file

@ -31,10 +31,11 @@ pub(crate) trait LspCommand: 'static + Sized {
) -> Result<Self::Response>; ) -> Result<Self::Response>;
fn to_proto(&self, project_id: u64, buffer: &Buffer) -> Self::ProtoRequest; fn to_proto(&self, project_id: u64, buffer: &Buffer) -> Self::ProtoRequest;
fn from_proto( async fn from_proto(
message: Self::ProtoRequest, message: Self::ProtoRequest,
project: &mut Project, project: ModelHandle<Project>,
buffer: &Buffer, buffer: ModelHandle<Buffer>,
cx: AsyncAppContext,
) -> Result<Self>; ) -> Result<Self>;
fn response_to_proto( fn response_to_proto(
response: Self::Response, response: Self::Response,
@ -121,19 +122,28 @@ impl LspCommand for PrepareRename {
position: Some(language::proto::serialize_anchor( position: Some(language::proto::serialize_anchor(
&buffer.anchor_before(self.position), &buffer.anchor_before(self.position),
)), )),
version: (&buffer.version()).into(),
} }
} }
fn from_proto(message: proto::PrepareRename, _: &mut Project, buffer: &Buffer) -> Result<Self> { async fn from_proto(
message: proto::PrepareRename,
_: ModelHandle<Project>,
buffer: ModelHandle<Buffer>,
mut cx: AsyncAppContext,
) -> Result<Self> {
let position = message let position = message
.position .position
.and_then(deserialize_anchor) .and_then(deserialize_anchor)
.ok_or_else(|| anyhow!("invalid position"))?; .ok_or_else(|| anyhow!("invalid position"))?;
if !buffer.can_resolve(&position) { buffer
Err(anyhow!("cannot resolve position"))?; .update(&mut cx, |buffer, _| {
} buffer.wait_for_version(message.version.into())
})
.await;
Ok(Self { Ok(Self {
position: position.to_point_utf16(buffer), position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)),
}) })
} }
@ -241,19 +251,27 @@ impl LspCommand for PerformRename {
&buffer.anchor_before(self.position), &buffer.anchor_before(self.position),
)), )),
new_name: self.new_name.clone(), new_name: self.new_name.clone(),
version: (&buffer.version()).into(),
} }
} }
fn from_proto(message: proto::PerformRename, _: &mut Project, buffer: &Buffer) -> Result<Self> { async fn from_proto(
message: proto::PerformRename,
_: ModelHandle<Project>,
buffer: ModelHandle<Buffer>,
mut cx: AsyncAppContext,
) -> Result<Self> {
let position = message let position = message
.position .position
.and_then(deserialize_anchor) .and_then(deserialize_anchor)
.ok_or_else(|| anyhow!("invalid position"))?; .ok_or_else(|| anyhow!("invalid position"))?;
if !buffer.can_resolve(&position) { buffer
Err(anyhow!("cannot resolve position"))?; .update(&mut cx, |buffer, _| {
} buffer.wait_for_version(message.version.into())
})
.await;
Ok(Self { Ok(Self {
position: position.to_point_utf16(buffer), position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)),
new_name: message.new_name, new_name: message.new_name,
push_to_history: false, push_to_history: false,
}) })
@ -385,19 +403,27 @@ impl LspCommand for GetDefinition {
position: Some(language::proto::serialize_anchor( position: Some(language::proto::serialize_anchor(
&buffer.anchor_before(self.position), &buffer.anchor_before(self.position),
)), )),
version: (&buffer.version()).into(),
} }
} }
fn from_proto(message: proto::GetDefinition, _: &mut Project, buffer: &Buffer) -> Result<Self> { async fn from_proto(
message: proto::GetDefinition,
_: ModelHandle<Project>,
buffer: ModelHandle<Buffer>,
mut cx: AsyncAppContext,
) -> Result<Self> {
let position = message let position = message
.position .position
.and_then(deserialize_anchor) .and_then(deserialize_anchor)
.ok_or_else(|| anyhow!("invalid position"))?; .ok_or_else(|| anyhow!("invalid position"))?;
if !buffer.can_resolve(&position) { buffer
Err(anyhow!("cannot resolve position"))?; .update(&mut cx, |buffer, _| {
} buffer.wait_for_version(message.version.into())
})
.await;
Ok(Self { Ok(Self {
position: position.to_point_utf16(buffer), position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)),
}) })
} }
@ -443,6 +469,9 @@ impl LspCommand for GetDefinition {
.end .end
.and_then(deserialize_anchor) .and_then(deserialize_anchor)
.ok_or_else(|| anyhow!("missing target end"))?; .ok_or_else(|| anyhow!("missing target end"))?;
buffer
.update(&mut cx, |buffer, _| buffer.wait_for_anchors([&start, &end]))
.await;
locations.push(Location { locations.push(Location {
buffer, buffer,
range: start..end, range: start..end,
@ -533,19 +562,27 @@ impl LspCommand for GetReferences {
position: Some(language::proto::serialize_anchor( position: Some(language::proto::serialize_anchor(
&buffer.anchor_before(self.position), &buffer.anchor_before(self.position),
)), )),
version: (&buffer.version()).into(),
} }
} }
fn from_proto(message: proto::GetReferences, _: &mut Project, buffer: &Buffer) -> Result<Self> { async fn from_proto(
message: proto::GetReferences,
_: ModelHandle<Project>,
buffer: ModelHandle<Buffer>,
mut cx: AsyncAppContext,
) -> Result<Self> {
let position = message let position = message
.position .position
.and_then(deserialize_anchor) .and_then(deserialize_anchor)
.ok_or_else(|| anyhow!("invalid position"))?; .ok_or_else(|| anyhow!("invalid position"))?;
if !buffer.can_resolve(&position) { buffer
Err(anyhow!("cannot resolve position"))?; .update(&mut cx, |buffer, _| {
} buffer.wait_for_version(message.version.into())
})
.await;
Ok(Self { Ok(Self {
position: position.to_point_utf16(buffer), position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)),
}) })
} }
@ -591,6 +628,9 @@ impl LspCommand for GetReferences {
.end .end
.and_then(deserialize_anchor) .and_then(deserialize_anchor)
.ok_or_else(|| anyhow!("missing target end"))?; .ok_or_else(|| anyhow!("missing target end"))?;
target_buffer
.update(&mut cx, |buffer, _| buffer.wait_for_anchors([&start, &end]))
.await;
locations.push(Location { locations.push(Location {
buffer: target_buffer, buffer: target_buffer,
range: start..end, range: start..end,
@ -658,23 +698,27 @@ impl LspCommand for GetDocumentHighlights {
position: Some(language::proto::serialize_anchor( position: Some(language::proto::serialize_anchor(
&buffer.anchor_before(self.position), &buffer.anchor_before(self.position),
)), )),
version: (&buffer.version()).into(),
} }
} }
fn from_proto( async fn from_proto(
message: proto::GetDocumentHighlights, message: proto::GetDocumentHighlights,
_: &mut Project, _: ModelHandle<Project>,
buffer: &Buffer, buffer: ModelHandle<Buffer>,
mut cx: AsyncAppContext,
) -> Result<Self> { ) -> Result<Self> {
let position = message let position = message
.position .position
.and_then(deserialize_anchor) .and_then(deserialize_anchor)
.ok_or_else(|| anyhow!("invalid position"))?; .ok_or_else(|| anyhow!("invalid position"))?;
if !buffer.can_resolve(&position) { buffer
Err(anyhow!("cannot resolve position"))?; .update(&mut cx, |buffer, _| {
} buffer.wait_for_version(message.version.into())
})
.await;
Ok(Self { Ok(Self {
position: position.to_point_utf16(buffer), position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)),
}) })
} }
@ -705,33 +749,34 @@ impl LspCommand for GetDocumentHighlights {
self, self,
message: proto::GetDocumentHighlightsResponse, message: proto::GetDocumentHighlightsResponse,
_: ModelHandle<Project>, _: ModelHandle<Project>,
_: ModelHandle<Buffer>, buffer: ModelHandle<Buffer>,
_: AsyncAppContext, mut cx: AsyncAppContext,
) -> Result<Vec<DocumentHighlight>> { ) -> Result<Vec<DocumentHighlight>> {
Ok(message let mut highlights = Vec::new();
.highlights for highlight in message.highlights {
.into_iter() let start = highlight
.map(|highlight| { .start
let start = highlight .and_then(deserialize_anchor)
.start .ok_or_else(|| anyhow!("missing target start"))?;
.and_then(deserialize_anchor) let end = highlight
.ok_or_else(|| anyhow!("missing target start"))?; .end
let end = highlight .and_then(deserialize_anchor)
.end .ok_or_else(|| anyhow!("missing target end"))?;
.and_then(deserialize_anchor) buffer
.ok_or_else(|| anyhow!("missing target end"))?; .update(&mut cx, |buffer, _| buffer.wait_for_anchors([&start, &end]))
let kind = match proto::document_highlight::Kind::from_i32(highlight.kind) { .await;
Some(proto::document_highlight::Kind::Text) => DocumentHighlightKind::TEXT, let kind = match proto::document_highlight::Kind::from_i32(highlight.kind) {
Some(proto::document_highlight::Kind::Read) => DocumentHighlightKind::READ, Some(proto::document_highlight::Kind::Text) => DocumentHighlightKind::TEXT,
Some(proto::document_highlight::Kind::Write) => DocumentHighlightKind::WRITE, Some(proto::document_highlight::Kind::Read) => DocumentHighlightKind::READ,
None => DocumentHighlightKind::TEXT, Some(proto::document_highlight::Kind::Write) => DocumentHighlightKind::WRITE,
}; None => DocumentHighlightKind::TEXT,
Ok(DocumentHighlight { };
range: start..end, highlights.push(DocumentHighlight {
kind, range: start..end,
}) kind,
}) });
.collect::<Result<Vec<_>>>()?) }
Ok(highlights)
} }
fn buffer_id_from_proto(message: &proto::GetDocumentHighlights) -> u64 { fn buffer_id_from_proto(message: &proto::GetDocumentHighlights) -> u64 {

View file

@ -1815,6 +1815,7 @@ impl Project {
}) })
} else if let Some(project_id) = self.remote_id() { } else if let Some(project_id) = self.remote_id() {
let rpc = self.client.clone(); let rpc = self.client.clone();
let version = buffer.version();
cx.spawn_weak(|_, mut cx| async move { cx.spawn_weak(|_, mut cx| async move {
let response = rpc let response = rpc
.request(proto::GetCodeActions { .request(proto::GetCodeActions {
@ -1822,6 +1823,7 @@ impl Project {
buffer_id, buffer_id,
start: Some(language::proto::serialize_anchor(&range.start)), start: Some(language::proto::serialize_anchor(&range.start)),
end: Some(language::proto::serialize_anchor(&range.end)), end: Some(language::proto::serialize_anchor(&range.end)),
version: (&version).into(),
}) })
.await?; .await?;
@ -2840,13 +2842,11 @@ impl Project {
.ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?; .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?;
Ok::<_, anyhow::Error>((project_id, buffer)) Ok::<_, anyhow::Error>((project_id, buffer))
})?; })?;
buffer
if !buffer .update(&mut cx, |buffer, _| {
.read_with(&cx, |buffer, _| buffer.version()) buffer.wait_for_version(requested_version)
.observed_all(&requested_version) })
{ .await;
Err(anyhow!("save request depends on unreceived edits"))?;
}
let (saved_version, mtime) = buffer.update(&mut cx, |buffer, cx| buffer.save(cx)).await?; let (saved_version, mtime) = buffer.update(&mut cx, |buffer, cx| buffer.save(cx)).await?;
Ok(proto::BufferSaved { Ok(proto::BufferSaved {
@ -2904,12 +2904,9 @@ impl Project {
.map(|buffer| buffer.upgrade(cx).unwrap()) .map(|buffer| buffer.upgrade(cx).unwrap())
.ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id)) .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))
})?; })?;
if !buffer buffer
.read_with(&cx, |buffer, _| buffer.version()) .update(&mut cx, |buffer, _| buffer.wait_for_version(version))
.observed_all(&version) .await;
{
Err(anyhow!("completion request depends on unreceived edits"))?;
}
let version = buffer.read_with(&cx, |buffer, _| buffer.version()); let version = buffer.read_with(&cx, |buffer, _| buffer.version());
let completions = this let completions = this
.update(&mut cx, |this, cx| this.completions(&buffer, position, cx)) .update(&mut cx, |this, cx| this.completions(&buffer, position, cx))
@ -2979,10 +2976,13 @@ impl Project {
.map(|buffer| buffer.upgrade(cx).unwrap()) .map(|buffer| buffer.upgrade(cx).unwrap())
.ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id)) .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))
})?; })?;
buffer
.update(&mut cx, |buffer, _| {
buffer.wait_for_version(envelope.payload.version.into())
})
.await;
let version = buffer.read_with(&cx, |buffer, _| buffer.version()); let version = buffer.read_with(&cx, |buffer, _| buffer.version());
if !version.observed(start.timestamp) || !version.observed(end.timestamp) {
Err(anyhow!("code action request references unreceived edits"))?;
}
let code_actions = this.update(&mut cx, |this, cx| { let code_actions = this.update(&mut cx, |this, cx| {
Ok::<_, anyhow::Error>(this.code_actions(&buffer, start..end, cx)) Ok::<_, anyhow::Error>(this.code_actions(&buffer, start..end, cx))
})?; })?;
@ -3038,19 +3038,26 @@ impl Project {
<T::LspRequest as lsp::request::Request>::Result: Send, <T::LspRequest as lsp::request::Request>::Result: Send,
{ {
let sender_id = envelope.original_sender_id()?; let sender_id = envelope.original_sender_id()?;
let (request, buffer_version) = this.update(&mut cx, |this, cx| { let buffer_id = T::buffer_id_from_proto(&envelope.payload);
let buffer_id = T::buffer_id_from_proto(&envelope.payload); let buffer_handle = this.read_with(&cx, |this, _| {
let buffer_handle = this this.opened_buffers
.opened_buffers
.get(&buffer_id) .get(&buffer_id)
.map(|buffer| buffer.upgrade(cx).unwrap()) .map(|buffer| buffer.upgrade(&cx).unwrap())
.ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?; .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
let buffer = buffer_handle.read(cx);
let buffer_version = buffer.version();
let request = T::from_proto(envelope.payload, this, buffer)?;
Ok::<_, anyhow::Error>((this.request_lsp(buffer_handle, request, cx), buffer_version))
})?; })?;
let response = request.await?; let request = T::from_proto(
envelope.payload,
this.clone(),
buffer_handle.clone(),
cx.clone(),
)
.await?;
let buffer_version = buffer_handle.read_with(&cx, |buffer, _| buffer.version());
let response = this
.update(&mut cx, |this, cx| {
this.request_lsp(buffer_handle, request, cx)
})
.await?;
this.update(&mut cx, |this, cx| { this.update(&mut cx, |this, cx| {
Ok(T::response_to_proto( Ok(T::response_to_proto(
response, response,

View file

@ -164,6 +164,7 @@ message GetDefinition {
uint64 project_id = 1; uint64 project_id = 1;
uint64 buffer_id = 2; uint64 buffer_id = 2;
Anchor position = 3; Anchor position = 3;
repeated VectorClockEntry version = 4;
} }
message GetDefinitionResponse { message GetDefinitionResponse {
@ -174,6 +175,7 @@ message GetReferences {
uint64 project_id = 1; uint64 project_id = 1;
uint64 buffer_id = 2; uint64 buffer_id = 2;
Anchor position = 3; Anchor position = 3;
repeated VectorClockEntry version = 4;
} }
message GetReferencesResponse { message GetReferencesResponse {
@ -184,6 +186,7 @@ message GetDocumentHighlights {
uint64 project_id = 1; uint64 project_id = 1;
uint64 buffer_id = 2; uint64 buffer_id = 2;
Anchor position = 3; Anchor position = 3;
repeated VectorClockEntry version = 4;
} }
message GetDocumentHighlightsResponse { message GetDocumentHighlightsResponse {
@ -328,6 +331,7 @@ message GetCodeActions {
uint64 buffer_id = 2; uint64 buffer_id = 2;
Anchor start = 3; Anchor start = 3;
Anchor end = 4; Anchor end = 4;
repeated VectorClockEntry version = 5;
} }
message GetCodeActionsResponse { message GetCodeActionsResponse {
@ -349,6 +353,7 @@ message PrepareRename {
uint64 project_id = 1; uint64 project_id = 1;
uint64 buffer_id = 2; uint64 buffer_id = 2;
Anchor position = 3; Anchor position = 3;
repeated VectorClockEntry version = 4;
} }
message PrepareRenameResponse { message PrepareRenameResponse {
@ -363,6 +368,7 @@ message PerformRename {
uint64 buffer_id = 2; uint64 buffer_id = 2;
Anchor position = 3; Anchor position = 3;
string new_name = 4; string new_name = 4;
repeated VectorClockEntry version = 5;
} }
message PerformRenameResponse { message PerformRenameResponse {

View file

@ -157,15 +157,15 @@ messages!(
(GetChannels, Foreground), (GetChannels, Foreground),
(GetChannelsResponse, Foreground), (GetChannelsResponse, Foreground),
(GetCodeActions, Background), (GetCodeActions, Background),
(GetCodeActionsResponse, Foreground), (GetCodeActionsResponse, Background),
(GetCompletions, Background), (GetCompletions, Background),
(GetCompletionsResponse, Foreground), (GetCompletionsResponse, Background),
(GetDefinition, Foreground), (GetDefinition, Background),
(GetDefinitionResponse, Foreground), (GetDefinitionResponse, Background),
(GetDocumentHighlights, Background), (GetDocumentHighlights, Background),
(GetDocumentHighlightsResponse, Background), (GetDocumentHighlightsResponse, Background),
(GetReferences, Foreground), (GetReferences, Background),
(GetReferencesResponse, Foreground), (GetReferencesResponse, Background),
(GetProjectSymbols, Background), (GetProjectSymbols, Background),
(GetProjectSymbolsResponse, Background), (GetProjectSymbolsResponse, Background),
(GetUsers, Foreground), (GetUsers, Foreground),
@ -176,10 +176,10 @@ messages!(
(JoinProjectResponse, Foreground), (JoinProjectResponse, Foreground),
(LeaveChannel, Foreground), (LeaveChannel, Foreground),
(LeaveProject, Foreground), (LeaveProject, Foreground),
(OpenBuffer, Foreground), (OpenBuffer, Background),
(OpenBufferForSymbol, Foreground), (OpenBufferForSymbol, Background),
(OpenBufferForSymbolResponse, Foreground), (OpenBufferForSymbolResponse, Background),
(OpenBufferResponse, Foreground), (OpenBufferResponse, Background),
(PerformRename, Background), (PerformRename, Background),
(PerformRenameResponse, Background), (PerformRenameResponse, Background),
(PrepareRename, Background), (PrepareRename, Background),
@ -199,7 +199,7 @@ messages!(
(UnregisterProject, Foreground), (UnregisterProject, Foreground),
(UnregisterWorktree, Foreground), (UnregisterWorktree, Foreground),
(UnshareProject, Foreground), (UnshareProject, Foreground),
(UpdateBuffer, Foreground), (UpdateBuffer, Background),
(UpdateBufferFile, Foreground), (UpdateBufferFile, Foreground),
(UpdateContacts, Foreground), (UpdateContacts, Foreground),
(UpdateDiagnosticSummary, Foreground), (UpdateDiagnosticSummary, Foreground),

View file

@ -4911,12 +4911,9 @@ mod tests {
); );
(buffer.version(), buffer.save(cx)) (buffer.version(), buffer.save(cx))
}); });
let save = cx.spawn(|cx| async move { let save = cx.background().spawn(async move {
let (saved_version, _) = save.await.expect("save request failed"); let (saved_version, _) = save.await.expect("save request failed");
buffer.read_with(&cx, |buffer, _| { assert!(saved_version.observed_all(&requested_version));
assert!(buffer.version().observed_all(&saved_version));
assert!(saved_version.observed_all(&requested_version));
});
}); });
if rng.lock().gen_bool(0.3) { if rng.lock().gen_bool(0.3) {
log::info!("Guest {}: detaching save request", guest_id); log::info!("Guest {}: detaching save request", guest_id);

View file

@ -1307,6 +1307,32 @@ impl Buffer {
} }
} }
pub fn wait_for_anchors<'a>(
&mut self,
anchors: impl IntoIterator<Item = &'a Anchor>,
) -> impl 'static + Future<Output = ()> {
let mut futures = Vec::new();
for anchor in anchors {
if !self.version.observed(anchor.timestamp)
&& *anchor != Anchor::max()
&& *anchor != Anchor::min()
{
let (tx, rx) = oneshot::channel();
self.edit_id_resolvers
.entry(anchor.timestamp)
.or_default()
.push(tx);
futures.push(rx);
}
}
async move {
for mut future in futures {
future.recv().await;
}
}
}
pub fn wait_for_version(&mut self, version: clock::Global) -> impl Future<Output = ()> { pub fn wait_for_version(&mut self, version: clock::Global) -> impl Future<Output = ()> {
let (tx, mut rx) = barrier::channel(); let (tx, mut rx) = barrier::channel();
if !self.snapshot.version.observed_all(&version) { if !self.snapshot.version.observed_all(&version) {