Make language server initialization asynchronous
This commit is contained in:
parent
b8523509da
commit
1ca50d0134
4 changed files with 153 additions and 148 deletions
|
@ -39,6 +39,8 @@ pub struct Project {
|
|||
active_entry: Option<ProjectEntry>,
|
||||
languages: Arc<LanguageRegistry>,
|
||||
language_servers: HashMap<(WorktreeId, String), Arc<LanguageServer>>,
|
||||
loading_language_servers:
|
||||
HashMap<(WorktreeId, String), watch::Receiver<Option<Arc<LanguageServer>>>>,
|
||||
client: Arc<client::Client>,
|
||||
user_store: ModelHandle<UserStore>,
|
||||
fs: Arc<dyn Fs>,
|
||||
|
@ -258,6 +260,7 @@ impl Project {
|
|||
fs,
|
||||
language_servers_with_diagnostics_running: 0,
|
||||
language_servers: Default::default(),
|
||||
loading_language_servers: Default::default(),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -309,6 +312,7 @@ impl Project {
|
|||
},
|
||||
language_servers_with_diagnostics_running: 0,
|
||||
language_servers: Default::default(),
|
||||
loading_language_servers: Default::default(),
|
||||
};
|
||||
for worktree in worktrees {
|
||||
this.add_worktree(&worktree, cx);
|
||||
|
@ -776,7 +780,7 @@ impl Project {
|
|||
};
|
||||
|
||||
// If the buffer has a language, set it and start/assign the language server
|
||||
if let Some(language) = self.languages.select_language(&full_path) {
|
||||
if let Some(language) = self.languages.select_language(&full_path).cloned() {
|
||||
buffer.update(cx, |buffer, cx| {
|
||||
buffer.set_language(Some(language.clone()), cx);
|
||||
});
|
||||
|
@ -786,24 +790,20 @@ impl Project {
|
|||
if let Some(local_worktree) = worktree.and_then(|w| w.read(cx).as_local()) {
|
||||
let worktree_id = local_worktree.id();
|
||||
let worktree_abs_path = local_worktree.abs_path().clone();
|
||||
let buffer = buffer.downgrade();
|
||||
let language_server =
|
||||
self.start_language_server(worktree_id, worktree_abs_path, language, cx);
|
||||
|
||||
let language_server = match self
|
||||
.language_servers
|
||||
.entry((worktree_id, language.name().to_string()))
|
||||
{
|
||||
hash_map::Entry::Occupied(e) => Some(e.get().clone()),
|
||||
hash_map::Entry::Vacant(e) => Self::start_language_server(
|
||||
self.client.clone(),
|
||||
language.clone(),
|
||||
&worktree_abs_path,
|
||||
cx,
|
||||
)
|
||||
.map(|server| e.insert(server).clone()),
|
||||
};
|
||||
|
||||
buffer.update(cx, |buffer, cx| {
|
||||
buffer.set_language_server(language_server, cx);
|
||||
});
|
||||
cx.spawn_weak(|_, mut cx| async move {
|
||||
if let Some(language_server) = language_server.await {
|
||||
if let Some(buffer) = buffer.upgrade(&cx) {
|
||||
buffer.update(&mut cx, |buffer, cx| {
|
||||
buffer.set_language_server(Some(language_server), cx);
|
||||
});
|
||||
}
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -819,116 +819,144 @@ impl Project {
|
|||
}
|
||||
|
||||
fn start_language_server(
|
||||
rpc: Arc<Client>,
|
||||
&mut self,
|
||||
worktree_id: WorktreeId,
|
||||
worktree_path: Arc<Path>,
|
||||
language: Arc<Language>,
|
||||
worktree_path: &Path,
|
||||
cx: &mut ModelContext<Self>,
|
||||
) -> Option<Arc<LanguageServer>> {
|
||||
) -> Task<Option<Arc<LanguageServer>>> {
|
||||
enum LspEvent {
|
||||
DiagnosticsStart,
|
||||
DiagnosticsUpdate(lsp::PublishDiagnosticsParams),
|
||||
DiagnosticsFinish,
|
||||
}
|
||||
|
||||
let language_server = language
|
||||
.start_server(worktree_path, cx)
|
||||
.log_err()
|
||||
.flatten()?;
|
||||
let disk_based_sources = language
|
||||
.disk_based_diagnostic_sources()
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
let disk_based_diagnostics_progress_token =
|
||||
language.disk_based_diagnostics_progress_token().cloned();
|
||||
let has_disk_based_diagnostic_progress_token =
|
||||
disk_based_diagnostics_progress_token.is_some();
|
||||
let (diagnostics_tx, diagnostics_rx) = smol::channel::unbounded();
|
||||
let key = (worktree_id, language.name().to_string());
|
||||
if let Some(language_server) = self.language_servers.get(&key) {
|
||||
return Task::ready(Some(language_server.clone()));
|
||||
} else if let Some(mut language_server) = self.loading_language_servers.get(&key).cloned() {
|
||||
return cx
|
||||
.foreground()
|
||||
.spawn(async move { language_server.recv().await.flatten() });
|
||||
}
|
||||
|
||||
// Listen for `PublishDiagnostics` notifications.
|
||||
language_server
|
||||
.on_notification::<lsp::notification::PublishDiagnostics, _>({
|
||||
let diagnostics_tx = diagnostics_tx.clone();
|
||||
move |params| {
|
||||
if !has_disk_based_diagnostic_progress_token {
|
||||
block_on(diagnostics_tx.send(LspEvent::DiagnosticsStart)).ok();
|
||||
}
|
||||
block_on(diagnostics_tx.send(LspEvent::DiagnosticsUpdate(params))).ok();
|
||||
if !has_disk_based_diagnostic_progress_token {
|
||||
block_on(diagnostics_tx.send(LspEvent::DiagnosticsFinish)).ok();
|
||||
}
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
|
||||
// Listen for `Progress` notifications. Send an event when the language server
|
||||
// transitions between running jobs and not running any jobs.
|
||||
let mut running_jobs_for_this_server: i32 = 0;
|
||||
language_server
|
||||
.on_notification::<lsp::notification::Progress, _>(move |params| {
|
||||
let token = match params.token {
|
||||
lsp::NumberOrString::Number(_) => None,
|
||||
lsp::NumberOrString::String(token) => Some(token),
|
||||
};
|
||||
|
||||
if token == disk_based_diagnostics_progress_token {
|
||||
match params.value {
|
||||
lsp::ProgressParamsValue::WorkDone(progress) => match progress {
|
||||
lsp::WorkDoneProgress::Begin(_) => {
|
||||
running_jobs_for_this_server += 1;
|
||||
if running_jobs_for_this_server == 1 {
|
||||
block_on(diagnostics_tx.send(LspEvent::DiagnosticsStart)).ok();
|
||||
}
|
||||
}
|
||||
lsp::WorkDoneProgress::End(_) => {
|
||||
running_jobs_for_this_server -= 1;
|
||||
if running_jobs_for_this_server == 0 {
|
||||
block_on(diagnostics_tx.send(LspEvent::DiagnosticsFinish)).ok();
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
}
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
|
||||
// Process all the LSP events.
|
||||
let (mut language_server_tx, language_server_rx) = watch::channel();
|
||||
self.loading_language_servers
|
||||
.insert(key.clone(), language_server_rx);
|
||||
let language_server = language.start_server(worktree_path, cx);
|
||||
let rpc = self.client.clone();
|
||||
cx.spawn_weak(|this, mut cx| async move {
|
||||
while let Ok(message) = diagnostics_rx.recv().await {
|
||||
let this = this.upgrade(&cx)?;
|
||||
match message {
|
||||
LspEvent::DiagnosticsStart => {
|
||||
this.update(&mut cx, |this, cx| {
|
||||
this.disk_based_diagnostics_started(cx);
|
||||
if let Some(project_id) = this.remote_id() {
|
||||
rpc.send(proto::DiskBasedDiagnosticsUpdating { project_id })
|
||||
.log_err();
|
||||
}
|
||||
});
|
||||
let language_server = language_server.await.log_err().flatten();
|
||||
if let Some(this) = this.upgrade(&cx) {
|
||||
this.update(&mut cx, |this, _| {
|
||||
this.loading_language_servers.remove(&key);
|
||||
if let Some(language_server) = language_server.clone() {
|
||||
this.language_servers.insert(key, language_server);
|
||||
}
|
||||
LspEvent::DiagnosticsUpdate(mut params) => {
|
||||
language.process_diagnostics(&mut params);
|
||||
this.update(&mut cx, |this, cx| {
|
||||
this.update_diagnostics(params, &disk_based_sources, cx)
|
||||
.log_err();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
let language_server = language_server?;
|
||||
*language_server_tx.borrow_mut() = Some(language_server.clone());
|
||||
|
||||
let disk_based_sources = language
|
||||
.disk_based_diagnostic_sources()
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
let disk_based_diagnostics_progress_token =
|
||||
language.disk_based_diagnostics_progress_token().cloned();
|
||||
let has_disk_based_diagnostic_progress_token =
|
||||
disk_based_diagnostics_progress_token.is_some();
|
||||
let (diagnostics_tx, diagnostics_rx) = smol::channel::unbounded();
|
||||
|
||||
// Listen for `PublishDiagnostics` notifications.
|
||||
language_server
|
||||
.on_notification::<lsp::notification::PublishDiagnostics, _>({
|
||||
let diagnostics_tx = diagnostics_tx.clone();
|
||||
move |params| {
|
||||
if !has_disk_based_diagnostic_progress_token {
|
||||
block_on(diagnostics_tx.send(LspEvent::DiagnosticsStart)).ok();
|
||||
}
|
||||
block_on(diagnostics_tx.send(LspEvent::DiagnosticsUpdate(params))).ok();
|
||||
if !has_disk_based_diagnostic_progress_token {
|
||||
block_on(diagnostics_tx.send(LspEvent::DiagnosticsFinish)).ok();
|
||||
}
|
||||
}
|
||||
LspEvent::DiagnosticsFinish => {
|
||||
this.update(&mut cx, |this, cx| {
|
||||
this.disk_based_diagnostics_finished(cx);
|
||||
if let Some(project_id) = this.remote_id() {
|
||||
rpc.send(proto::DiskBasedDiagnosticsUpdated { project_id })
|
||||
})
|
||||
.detach();
|
||||
|
||||
// Listen for `Progress` notifications. Send an event when the language server
|
||||
// transitions between running jobs and not running any jobs.
|
||||
let mut running_jobs_for_this_server: i32 = 0;
|
||||
language_server
|
||||
.on_notification::<lsp::notification::Progress, _>(move |params| {
|
||||
let token = match params.token {
|
||||
lsp::NumberOrString::Number(_) => None,
|
||||
lsp::NumberOrString::String(token) => Some(token),
|
||||
};
|
||||
|
||||
if token == disk_based_diagnostics_progress_token {
|
||||
match params.value {
|
||||
lsp::ProgressParamsValue::WorkDone(progress) => match progress {
|
||||
lsp::WorkDoneProgress::Begin(_) => {
|
||||
running_jobs_for_this_server += 1;
|
||||
if running_jobs_for_this_server == 1 {
|
||||
block_on(diagnostics_tx.send(LspEvent::DiagnosticsStart))
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
lsp::WorkDoneProgress::End(_) => {
|
||||
running_jobs_for_this_server -= 1;
|
||||
if running_jobs_for_this_server == 0 {
|
||||
block_on(diagnostics_tx.send(LspEvent::DiagnosticsFinish))
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
}
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
|
||||
// Process all the LSP events.
|
||||
cx.spawn(|mut cx| async move {
|
||||
while let Ok(message) = diagnostics_rx.recv().await {
|
||||
let this = this.upgrade(&cx)?;
|
||||
match message {
|
||||
LspEvent::DiagnosticsStart => {
|
||||
this.update(&mut cx, |this, cx| {
|
||||
this.disk_based_diagnostics_started(cx);
|
||||
if let Some(project_id) = this.remote_id() {
|
||||
rpc.send(proto::DiskBasedDiagnosticsUpdating { project_id })
|
||||
.log_err();
|
||||
}
|
||||
});
|
||||
}
|
||||
LspEvent::DiagnosticsUpdate(mut params) => {
|
||||
language.process_diagnostics(&mut params);
|
||||
this.update(&mut cx, |this, cx| {
|
||||
this.update_diagnostics(params, &disk_based_sources, cx)
|
||||
.log_err();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
LspEvent::DiagnosticsFinish => {
|
||||
this.update(&mut cx, |this, cx| {
|
||||
this.disk_based_diagnostics_finished(cx);
|
||||
if let Some(project_id) = this.remote_id() {
|
||||
rpc.send(proto::DiskBasedDiagnosticsUpdated { project_id })
|
||||
.log_err();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(())
|
||||
})
|
||||
.detach();
|
||||
Some(())
|
||||
})
|
||||
.detach();
|
||||
|
||||
Some(language_server)
|
||||
Some(language_server)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn update_diagnostics(
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue