Move logic for starting language servers to the project

This commit is contained in:
Max Brunsfeld 2022-01-19 14:05:06 -08:00
parent 10c64f527c
commit f43dcd6763
9 changed files with 846 additions and 863 deletions

View file

@ -5,7 +5,7 @@ pub mod worktree;
use anyhow::{anyhow, Result};
use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
use clock::ReplicaId;
use collections::HashMap;
use collections::{hash_map, HashMap, HashSet};
use futures::Future;
use fuzzy::{PathMatch, PathMatchCandidate, PathMatchCandidateSet};
use gpui::{
@ -27,7 +27,7 @@ pub struct Project {
worktrees: Vec<ModelHandle<Worktree>>,
active_entry: Option<ProjectEntry>,
languages: Arc<LanguageRegistry>,
language_servers: HashMap<(Arc<Path>, String), Arc<LanguageServer>>,
language_servers: HashMap<(WorktreeId, String), Arc<LanguageServer>>,
client: Arc<client::Client>,
user_store: ModelHandle<UserStore>,
fs: Arc<dyn Fs>,
@ -63,6 +63,7 @@ pub enum Event {
ActiveEntryChanged(Option<ProjectEntry>),
WorktreeRemoved(WorktreeId),
DiskBasedDiagnosticsStarted,
DiskBasedDiagnosticsUpdated { worktree_id: WorktreeId },
DiskBasedDiagnosticsFinished,
DiagnosticsUpdated(ProjectPath),
}
@ -223,7 +224,6 @@ impl Project {
worktree,
client.clone(),
user_store.clone(),
languages.clone(),
cx,
)
.await?,
@ -463,39 +463,21 @@ impl Project {
path: ProjectPath,
cx: &mut ModelContext<Self>,
) -> Task<Result<ModelHandle<Buffer>>> {
if let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) {
let language_server = worktree
.read(cx)
.as_local()
.map(|w| w.abs_path().clone())
.and_then(|worktree_abs_path| {
let abs_path = worktree.abs_path().join(&path.path);
let language = self.languages.select_language(&abs_path).cloned();
if let Some(language) = language {
if let Some(language_server) =
self.register_language_server(worktree.abs_path(), &language, cx)
{
worktree.register_language(&language, &language_server);
}
}
});
worktree.update(cx, |worktree, cx| {
if let Some(worktree) = worktree.as_local_mut() {
let abs_path = worktree.abs_path().join(&path.path);
let language = self.languages.select_language(&abs_path).cloned();
if let Some(language) = language {
if let Some(language_server) =
self.register_language_server(worktree.abs_path(), &language, cx)
{
worktree.register_language(&language, &language_server);
}
}
}
worktree.open_buffer(path.path, cx)
})
let worktree = if let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) {
worktree
} else {
cx.spawn(|_, _| async move { Err(anyhow!("no such worktree")) })
}
return cx.spawn(|_, _| async move { Err(anyhow!("no such worktree")) });
};
let buffer_task = worktree.update(cx, |worktree, cx| worktree.open_buffer(path.path, cx));
cx.spawn(|this, mut cx| async move {
let (buffer, buffer_is_new) = buffer_task.await?;
if buffer_is_new {
this.update(&mut cx, |this, cx| {
this.buffer_added(worktree, buffer.clone(), cx)
});
}
Ok(buffer)
})
}
pub fn save_buffer_as(
@ -504,189 +486,213 @@ impl Project {
abs_path: PathBuf,
cx: &mut ModelContext<Project>,
) -> Task<Result<()>> {
let result = self.worktree_for_abs_path(&abs_path, cx);
let languages = self.languages.clone();
let worktree_task = self.worktree_for_abs_path(&abs_path, cx);
cx.spawn(|this, mut cx| async move {
let (worktree, path) = result.await?;
let (worktree, path) = worktree_task.await?;
worktree
.update(&mut cx, |worktree, cx| {
let worktree = worktree.as_local_mut().unwrap();
let language = languages.select_language(&abs_path);
if let Some(language) = language {
if let Some(language_server) = this.update(cx, |this, cx| {
this.register_language_server(worktree.abs_path(), language, cx)
}) {
worktree.register_language(&language, &language_server);
}
}
worktree.save_buffer_as(buffer.clone(), path, cx)
worktree
.as_local_mut()
.unwrap()
.save_buffer_as(buffer.clone(), path, cx)
})
.await?;
this.update(&mut cx, |this, cx| this.buffer_added(worktree, buffer, cx));
Ok(())
})
}
fn register_language_server(
fn buffer_added(
&mut self,
worktree_abs_path: Arc<Path>,
language: &Arc<Language>,
worktree: ModelHandle<Worktree>,
buffer: ModelHandle<Buffer>,
cx: &mut ModelContext<Self>,
) -> Option<()> {
// Set the buffer's language
let full_path = buffer.read(cx).file()?.full_path();
let language = self.languages.select_language(&full_path)?.clone();
buffer.update(cx, |buffer, cx| {
buffer.set_language(Some(language.clone()), cx);
});
// For local worktrees, start a language server if needed.
let worktree = worktree.read(cx);
let worktree_id = worktree.id();
let worktree_abs_path = worktree.as_local()?.abs_path().clone();
let language_server = match self
.language_servers
.entry((worktree_id, language.name().to_string()))
{
hash_map::Entry::Occupied(e) => Some(e.get().clone()),
hash_map::Entry::Vacant(e) => Self::start_language_server(
self.client.clone(),
language,
worktree_id,
&worktree_abs_path,
cx,
)
.map(|server| e.insert(server).clone()),
};
buffer.update(cx, |buffer, cx| {
buffer.set_language_server(language_server, cx)
});
None
}
fn start_language_server(
rpc: Arc<Client>,
language: Arc<Language>,
worktree_id: WorktreeId,
worktree_path: &Path,
cx: &mut ModelContext<Self>,
) -> Option<Arc<LanguageServer>> {
if let Some(server) = self
.language_servers
.get(&(worktree_abs_path.clone(), language.name().to_string()))
{
return Some(server.clone());
}
if let Some(language_server) = language
.start_server(&worktree_abs_path, cx)
let language_server = language
.start_server(worktree_path, cx)
.log_err()
.flatten()
{
enum DiagnosticProgress {
Updating,
Updated,
}
.flatten()?;
let disk_based_sources = language
.disk_based_diagnostic_sources()
.cloned()
.unwrap_or_default();
let disk_based_diagnostics_progress_token =
language.disk_based_diagnostics_progress_token().cloned();
let (diagnostics_tx, diagnostics_rx) = smol::channel::unbounded();
let (disk_based_diagnostics_done_tx, disk_based_diagnostics_done_rx) =
smol::channel::unbounded();
language_server
.on_notification::<lsp::notification::PublishDiagnostics, _>(move |params| {
smol::block_on(diagnostics_tx.send(params)).ok();
})
.detach();
cx.spawn_weak(|this, mut cx| {
let has_disk_based_diagnostic_progress_token =
disk_based_diagnostics_progress_token.is_some();
let disk_based_diagnostics_done_tx = disk_based_diagnostics_done_tx.clone();
async move {
while let Ok(diagnostics) = diagnostics_rx.recv().await {
if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
handle.update(&mut cx, |this, cx| {
if !has_disk_based_diagnostic_progress_token {
smol::block_on(
disk_based_diagnostics_done_tx
.send(DiagnosticProgress::Updating),
)
.ok();
}
this.update_diagnostics(diagnostics, &disk_based_sources, cx)
.log_err();
if !has_disk_based_diagnostic_progress_token {
smol::block_on(
disk_based_diagnostics_done_tx
.send(DiagnosticProgress::Updated),
)
.ok();
}
})
} else {
break;
}
}
}
})
.detach();
let mut pending_disk_based_diagnostics: i32 = 0;
language_server
.on_notification::<lsp::notification::Progress, _>(move |params| {
let token = match params.token {
lsp::NumberOrString::Number(_) => None,
lsp::NumberOrString::String(token) => Some(token),
};
if token == disk_based_diagnostics_progress_token {
match params.value {
lsp::ProgressParamsValue::WorkDone(progress) => match progress {
lsp::WorkDoneProgress::Begin(_) => {
if pending_disk_based_diagnostics == 0 {
smol::block_on(
disk_based_diagnostics_done_tx
.send(DiagnosticProgress::Updating),
)
.ok();
}
pending_disk_based_diagnostics += 1;
}
lsp::WorkDoneProgress::End(_) => {
pending_disk_based_diagnostics -= 1;
if pending_disk_based_diagnostics == 0 {
smol::block_on(
disk_based_diagnostics_done_tx
.send(DiagnosticProgress::Updated),
)
.ok();
}
}
_ => {}
},
}
}
})
.detach();
let rpc = self.client.clone();
cx.spawn_weak(|this, mut cx| async move {
while let Ok(progress) = disk_based_diagnostics_done_rx.recv().await {
if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
match progress {
DiagnosticProgress::Updating => {
let message = handle.update(&mut cx, |this, cx| {
cx.emit(Event::DiskBasedDiagnosticsUpdating);
let this = this.as_local().unwrap();
this.share.as_ref().map(|share| {
proto::DiskBasedDiagnosticsUpdating {
project_id: share.project_id,
worktree_id: this.id().to_proto(),
}
})
});
if let Some(message) = message {
rpc.send(message).await.log_err();
}
}
DiagnosticProgress::Updated => {
let message = handle.update(&mut cx, |this, cx| {
cx.emit(Event::DiskBasedDiagnosticsUpdated);
let this = this.as_local().unwrap();
this.share.as_ref().map(|share| {
proto::DiskBasedDiagnosticsUpdated {
project_id: share.project_id,
worktree_id: this.id().to_proto(),
}
})
});
if let Some(message) = message {
rpc.send(message).await.log_err();
}
}
}
} else {
break;
}
}
})
.detach();
self.language_servers.insert(
(worktree_abs_path.clone(), language.name().to_string()),
language_server.clone(),
);
Some(language_server)
} else {
None
enum DiagnosticProgress {
Updating,
Publish(lsp::PublishDiagnosticsParams),
Updated,
}
let disk_based_sources = language
.disk_based_diagnostic_sources()
.cloned()
.unwrap_or_default();
let disk_based_diagnostics_progress_token =
language.disk_based_diagnostics_progress_token().cloned();
let has_disk_based_diagnostic_progress_token =
disk_based_diagnostics_progress_token.is_some();
let (diagnostics_tx, diagnostics_rx) = smol::channel::unbounded();
language_server
.on_notification::<lsp::notification::PublishDiagnostics, _>({
let diagnostics_tx = diagnostics_tx.clone();
move |params| {
if !has_disk_based_diagnostic_progress_token {
smol::block_on(diagnostics_tx.send(DiagnosticProgress::Updating)).ok();
}
smol::block_on(diagnostics_tx.send(DiagnosticProgress::Publish(params))).ok();
if !has_disk_based_diagnostic_progress_token {
smol::block_on(diagnostics_tx.send(DiagnosticProgress::Updated)).ok();
}
}
})
.detach();
let mut pending_disk_based_diagnostics: i32 = 0;
language_server
.on_notification::<lsp::notification::Progress, _>(move |params| {
let token = match params.token {
lsp::NumberOrString::Number(_) => None,
lsp::NumberOrString::String(token) => Some(token),
};
if token == disk_based_diagnostics_progress_token {
match params.value {
lsp::ProgressParamsValue::WorkDone(progress) => match progress {
lsp::WorkDoneProgress::Begin(_) => {
if pending_disk_based_diagnostics == 0 {
smol::block_on(
diagnostics_tx.send(DiagnosticProgress::Updating),
)
.ok();
}
pending_disk_based_diagnostics += 1;
}
lsp::WorkDoneProgress::End(_) => {
pending_disk_based_diagnostics -= 1;
if pending_disk_based_diagnostics == 0 {
smol::block_on(
diagnostics_tx.send(DiagnosticProgress::Updated),
)
.ok();
}
}
_ => {}
},
}
}
})
.detach();
cx.spawn_weak(|this, mut cx| async move {
while let Ok(message) = diagnostics_rx.recv().await {
let this = cx.read(|cx| this.upgrade(cx))?;
match message {
DiagnosticProgress::Updating => {
let project_id = this.update(&mut cx, |this, cx| {
cx.emit(Event::DiskBasedDiagnosticsStarted);
this.remote_id()
});
if let Some(project_id) = project_id {
rpc.send(proto::DiskBasedDiagnosticsUpdating {
project_id,
worktree_id: worktree_id.to_proto(),
})
.await
.log_err();
}
}
DiagnosticProgress::Publish(params) => {
this.update(&mut cx, |this, cx| {
this.update_diagnostics(params, &disk_based_sources, cx)
.log_err();
});
}
DiagnosticProgress::Updated => {
let project_id = this.update(&mut cx, |this, cx| {
cx.emit(Event::DiskBasedDiagnosticsFinished);
this.remote_id()
});
if let Some(project_id) = project_id {
rpc.send(proto::DiskBasedDiagnosticsUpdated {
project_id,
worktree_id: worktree_id.to_proto(),
})
.await
.log_err();
}
}
}
}
Some(())
})
.detach();
Some(language_server)
}
fn update_diagnostics(
&mut self,
diagnostics: lsp::PublishDiagnosticsParams,
disk_based_sources: &HashSet<String>,
cx: &mut ModelContext<Self>,
) -> Result<()> {
let path = diagnostics
.uri
.to_file_path()
.map_err(|_| anyhow!("URI is not a file"))?;
for tree in &self.worktrees {
let relative_path = tree.update(cx, |tree, _| {
path.strip_prefix(tree.as_local()?.abs_path()).ok()
});
if let Some(relative_path) = relative_path {
return tree.update(cx, |tree, cx| {
tree.as_local_mut().unwrap().update_diagnostics(
relative_path.into(),
diagnostics,
disk_based_sources,
cx,
)
});
}
}
todo!()
}
pub fn worktree_for_abs_path(
@ -726,12 +732,10 @@ impl Project {
let fs = self.fs.clone();
let client = self.client.clone();
let user_store = self.user_store.clone();
let languages = self.languages.clone();
let path = Arc::from(abs_path.as_ref());
cx.spawn(|project, mut cx| async move {
let worktree =
Worktree::open_local(client.clone(), user_store, path, fs, languages, &mut cx)
.await?;
Worktree::open_local(client.clone(), user_store, path, fs, &mut cx).await?;
let (remote_project_id, is_shared) = project.update(&mut cx, |project, cx| {
project.add_worktree(worktree.clone(), cx);
@ -936,13 +940,11 @@ impl Project {
.worktree
.ok_or_else(|| anyhow!("invalid worktree"))?;
let user_store = self.user_store.clone();
let languages = self.languages.clone();
cx.spawn(|this, mut cx| {
async move {
let worktree = Worktree::remote(
remote_id, replica_id, worktree, client, user_store, languages, &mut cx,
)
.await?;
let worktree =
Worktree::remote(remote_id, replica_id, worktree, client, user_store, &mut cx)
.await?;
this.update(&mut cx, |this, cx| this.add_worktree(worktree, cx));
Ok(())
}
@ -1248,6 +1250,25 @@ impl Entity for Project {
}
}
}
fn app_will_quit(
&mut self,
_: &mut MutableAppContext,
) -> Option<std::pin::Pin<Box<dyn 'static + Future<Output = ()>>>> {
use futures::FutureExt;
let shutdown_futures = self
.language_servers
.drain()
.filter_map(|(_, server)| server.shutdown())
.collect::<Vec<_>>();
Some(
async move {
futures::future::join_all(shutdown_futures).await;
}
.boxed(),
)
}
}
impl Collaborator {
@ -1275,8 +1296,12 @@ mod tests {
use super::*;
use client::test::FakeHttpClient;
use fs::RealFs;
use gpui::TestAppContext;
use language::LanguageRegistry;
use futures::StreamExt;
use gpui::{test::subscribe, TestAppContext};
use language::{
tree_sitter_rust, Diagnostic, LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
};
use lsp::Url;
use serde_json::json;
use std::{os::unix, path::PathBuf};
use util::test::temp_tree;
@ -1344,6 +1369,114 @@ mod tests {
);
}
#[gpui::test]
async fn test_language_server_diagnostics(mut cx: gpui::TestAppContext) {
let (language_server_config, mut fake_server) =
LanguageServerConfig::fake(cx.background()).await;
let progress_token = language_server_config
.disk_based_diagnostics_progress_token
.clone()
.unwrap();
let mut languages = LanguageRegistry::new();
languages.add(Arc::new(Language::new(
LanguageConfig {
name: "Rust".to_string(),
path_suffixes: vec!["rs".to_string()],
language_server: Some(language_server_config),
..Default::default()
},
Some(tree_sitter_rust::language()),
)));
let dir = temp_tree(json!({
"a.rs": "fn a() { A }",
"b.rs": "const y: i32 = 1",
}));
let http_client = FakeHttpClient::with_404_response();
let client = Client::new(http_client.clone());
let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
let tree = Worktree::open_local(
client,
user_store,
dir.path(),
Arc::new(RealFs),
&mut cx.to_async(),
)
.await
.unwrap();
cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
.await;
// Cause worktree to start the fake language server
let _buffer = tree
.update(&mut cx, |tree, cx| tree.open_buffer("b.rs", cx))
.await
.unwrap();
let mut events = subscribe(&tree, &mut cx);
fake_server.start_progress(&progress_token).await;
assert_eq!(
events.next().await.unwrap(),
Event::DiskBasedDiagnosticsUpdating
);
fake_server.start_progress(&progress_token).await;
fake_server.end_progress(&progress_token).await;
fake_server.start_progress(&progress_token).await;
fake_server
.notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
uri: Url::from_file_path(dir.path().join("a.rs")).unwrap(),
version: None,
diagnostics: vec![lsp::Diagnostic {
range: lsp::Range::new(lsp::Position::new(0, 9), lsp::Position::new(0, 10)),
severity: Some(lsp::DiagnosticSeverity::ERROR),
message: "undefined variable 'A'".to_string(),
..Default::default()
}],
})
.await;
assert_eq!(
events.next().await.unwrap(),
Event::DiagnosticsUpdated(Arc::from(Path::new("a.rs")))
);
fake_server.end_progress(&progress_token).await;
fake_server.end_progress(&progress_token).await;
assert_eq!(
events.next().await.unwrap(),
Event::DiskBasedDiagnosticsUpdated
);
let (buffer, _) = tree
.update(&mut cx, |tree, cx| tree.open_buffer("a.rs", cx))
.await
.unwrap();
buffer.read_with(&cx, |buffer, _| {
let snapshot = buffer.snapshot();
let diagnostics = snapshot
.diagnostics_in_range::<_, Point>(0..buffer.len())
.collect::<Vec<_>>();
assert_eq!(
diagnostics,
&[DiagnosticEntry {
range: Point::new(0, 9)..Point::new(0, 10),
diagnostic: Diagnostic {
severity: lsp::DiagnosticSeverity::ERROR,
message: "undefined variable 'A'".to_string(),
group_id: 0,
is_primary: true,
..Default::default()
}
}]
)
});
}
#[gpui::test]
async fn test_search_worktree_without_files(mut cx: gpui::TestAppContext) {
let dir = temp_tree(json!({