Use unbounded channel(s) for LSP binary status messaging

Co-Authored-By: Antonio Scandurra <antonio@zed.dev>
This commit is contained in:
Julia 2023-09-12 08:35:58 -04:00
parent c545788168
commit b0facf8e1e

View file

@ -13,7 +13,7 @@ use anyhow::{anyhow, Context, Result};
use async_trait::async_trait; use async_trait::async_trait;
use collections::{HashMap, HashSet}; use collections::{HashMap, HashSet};
use futures::{ use futures::{
channel::oneshot, channel::{mpsc, oneshot},
future::{BoxFuture, Shared}, future::{BoxFuture, Shared},
FutureExt, TryFutureExt as _, FutureExt, TryFutureExt as _,
}; };
@ -48,9 +48,6 @@ use unicase::UniCase;
use util::{http::HttpClient, paths::PathExt}; use util::{http::HttpClient, paths::PathExt};
use util::{post_inc, ResultExt, TryFutureExt as _, UnwrapFuture}; use util::{post_inc, ResultExt, TryFutureExt as _, UnwrapFuture};
#[cfg(any(test, feature = "test-support"))]
use futures::channel::mpsc;
pub use buffer::Operation; pub use buffer::Operation;
pub use buffer::*; pub use buffer::*;
pub use diagnostic_set::DiagnosticEntry; pub use diagnostic_set::DiagnosticEntry;
@ -64,6 +61,27 @@ pub fn init(cx: &mut AppContext) {
language_settings::init(cx); language_settings::init(cx);
} }
#[derive(Clone, Default)]
struct LspBinaryStatusSender {
txs: Arc<Mutex<Vec<mpsc::UnboundedSender<(Arc<Language>, LanguageServerBinaryStatus)>>>>,
}
impl LspBinaryStatusSender {
fn subscribe(&self) -> mpsc::UnboundedReceiver<(Arc<Language>, LanguageServerBinaryStatus)> {
let (tx, rx) = mpsc::unbounded();
self.txs.lock().push(tx);
rx
}
fn send(&self, language: Arc<Language>, status: LanguageServerBinaryStatus) {
let mut txs = self.txs.lock();
txs.retain(|tx| {
tx.unbounded_send((language.clone(), status.clone()))
.is_ok()
});
}
}
thread_local! { thread_local! {
static PARSER: RefCell<Parser> = RefCell::new(Parser::new()); static PARSER: RefCell<Parser> = RefCell::new(Parser::new());
} }
@ -594,14 +612,13 @@ struct AvailableLanguage {
pub struct LanguageRegistry { pub struct LanguageRegistry {
state: RwLock<LanguageRegistryState>, state: RwLock<LanguageRegistryState>,
language_server_download_dir: Option<Arc<Path>>, language_server_download_dir: Option<Arc<Path>>,
lsp_binary_statuses_tx: async_broadcast::Sender<(Arc<Language>, LanguageServerBinaryStatus)>,
lsp_binary_statuses_rx: async_broadcast::Receiver<(Arc<Language>, LanguageServerBinaryStatus)>,
login_shell_env_loaded: Shared<Task<()>>, login_shell_env_loaded: Shared<Task<()>>,
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
lsp_binary_paths: Mutex< lsp_binary_paths: Mutex<
HashMap<LanguageServerName, Shared<Task<Result<LanguageServerBinary, Arc<anyhow::Error>>>>>, HashMap<LanguageServerName, Shared<Task<Result<LanguageServerBinary, Arc<anyhow::Error>>>>>,
>, >,
executor: Option<Arc<Background>>, executor: Option<Arc<Background>>,
lsp_binary_status_tx: LspBinaryStatusSender,
} }
struct LanguageRegistryState { struct LanguageRegistryState {
@ -624,7 +641,6 @@ pub struct PendingLanguageServer {
impl LanguageRegistry { impl LanguageRegistry {
pub fn new(login_shell_env_loaded: Task<()>) -> Self { pub fn new(login_shell_env_loaded: Task<()>) -> Self {
let (lsp_binary_statuses_tx, lsp_binary_statuses_rx) = async_broadcast::broadcast(16);
Self { Self {
state: RwLock::new(LanguageRegistryState { state: RwLock::new(LanguageRegistryState {
next_language_server_id: 0, next_language_server_id: 0,
@ -638,11 +654,10 @@ impl LanguageRegistry {
reload_count: 0, reload_count: 0,
}), }),
language_server_download_dir: None, language_server_download_dir: None,
lsp_binary_statuses_tx,
lsp_binary_statuses_rx,
login_shell_env_loaded: login_shell_env_loaded.shared(), login_shell_env_loaded: login_shell_env_loaded.shared(),
lsp_binary_paths: Default::default(), lsp_binary_paths: Default::default(),
executor: None, executor: None,
lsp_binary_status_tx: Default::default(),
} }
} }
@ -918,8 +933,8 @@ impl LanguageRegistry {
let container_dir: Arc<Path> = Arc::from(download_dir.join(adapter.name.0.as_ref())); let container_dir: Arc<Path> = Arc::from(download_dir.join(adapter.name.0.as_ref()));
let root_path = root_path.clone(); let root_path = root_path.clone();
let adapter = adapter.clone(); let adapter = adapter.clone();
let lsp_binary_statuses = self.lsp_binary_statuses_tx.clone();
let login_shell_env_loaded = self.login_shell_env_loaded.clone(); let login_shell_env_loaded = self.login_shell_env_loaded.clone();
let lsp_binary_statuses = self.lsp_binary_status_tx.clone();
let task = { let task = {
let container_dir = container_dir.clone(); let container_dir = container_dir.clone();
@ -976,8 +991,8 @@ impl LanguageRegistry {
pub fn language_server_binary_statuses( pub fn language_server_binary_statuses(
&self, &self,
) -> async_broadcast::Receiver<(Arc<Language>, LanguageServerBinaryStatus)> { ) -> mpsc::UnboundedReceiver<(Arc<Language>, LanguageServerBinaryStatus)> {
self.lsp_binary_statuses_rx.clone() self.lsp_binary_status_tx.subscribe()
} }
pub fn delete_server_container( pub fn delete_server_container(
@ -1054,7 +1069,7 @@ async fn get_binary(
language: Arc<Language>, language: Arc<Language>,
delegate: Arc<dyn LspAdapterDelegate>, delegate: Arc<dyn LspAdapterDelegate>,
container_dir: Arc<Path>, container_dir: Arc<Path>,
statuses: async_broadcast::Sender<(Arc<Language>, LanguageServerBinaryStatus)>, statuses: LspBinaryStatusSender,
mut cx: AsyncAppContext, mut cx: AsyncAppContext,
) -> Result<LanguageServerBinary> { ) -> Result<LanguageServerBinary> {
if !container_dir.exists() { if !container_dir.exists() {
@ -1081,19 +1096,15 @@ async fn get_binary(
.cached_server_binary(container_dir.to_path_buf(), delegate.as_ref()) .cached_server_binary(container_dir.to_path_buf(), delegate.as_ref())
.await .await
{ {
statuses statuses.send(language.clone(), LanguageServerBinaryStatus::Cached);
.broadcast((language.clone(), LanguageServerBinaryStatus::Cached))
.await?;
return Ok(binary); return Ok(binary);
} else { } else {
statuses statuses.send(
.broadcast(( language.clone(),
language.clone(), LanguageServerBinaryStatus::Failed {
LanguageServerBinaryStatus::Failed { error: format!("{:?}", error),
error: format!("{:?}", error), },
}, );
))
.await?;
} }
} }
@ -1105,27 +1116,21 @@ async fn fetch_latest_binary(
language: Arc<Language>, language: Arc<Language>,
delegate: &dyn LspAdapterDelegate, delegate: &dyn LspAdapterDelegate,
container_dir: &Path, container_dir: &Path,
lsp_binary_statuses_tx: async_broadcast::Sender<(Arc<Language>, LanguageServerBinaryStatus)>, lsp_binary_statuses_tx: LspBinaryStatusSender,
) -> Result<LanguageServerBinary> { ) -> Result<LanguageServerBinary> {
let container_dir: Arc<Path> = container_dir.into(); let container_dir: Arc<Path> = container_dir.into();
lsp_binary_statuses_tx lsp_binary_statuses_tx.send(
.broadcast(( language.clone(),
language.clone(), LanguageServerBinaryStatus::CheckingForUpdate,
LanguageServerBinaryStatus::CheckingForUpdate, );
))
.await?;
let version_info = adapter.fetch_latest_server_version(delegate).await?; let version_info = adapter.fetch_latest_server_version(delegate).await?;
lsp_binary_statuses_tx lsp_binary_statuses_tx.send(language.clone(), LanguageServerBinaryStatus::Downloading);
.broadcast((language.clone(), LanguageServerBinaryStatus::Downloading))
.await?;
let binary = adapter let binary = adapter
.fetch_server_binary(version_info, container_dir.to_path_buf(), delegate) .fetch_server_binary(version_info, container_dir.to_path_buf(), delegate)
.await?; .await?;
lsp_binary_statuses_tx lsp_binary_statuses_tx.send(language.clone(), LanguageServerBinaryStatus::Downloaded);
.broadcast((language.clone(), LanguageServerBinaryStatus::Downloaded))
.await?;
Ok(binary) Ok(binary)
} }