Stop extensions' servers and message loops before removing their files (#34208)

Fixes an issue that caused Windows to fail when removing extension's
directories, as Zed had never stop any related processes.

Now:

* Zed shuts down and waits until the end when the language servers are
shut down

* Adds `impl Drop for WasmExtension` where does
`self.tx.close_channel();` to stop a receiver loop that holds the "lock"
on the extension's work dir.
The extension was dropped, but the channel was not closed for some
reason.

* Does more unregistration to ensure `Arc<WasmExtension>` with the `tx`
does not leak further

* Tidies up the related errors which had never reported a problematic
path before

Release Notes:

- N/A

---------

Co-authored-by: Smit Barmase <heysmitbarmase@gmail.com>
Co-authored-by: Smit <smit@zed.dev>
This commit is contained in:
Kirill Bulatov 2025-07-10 22:25:10 +03:00 committed by GitHub
parent c549b712fd
commit c6603e4fba
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 273 additions and 84 deletions

1
Cargo.lock generated
View file

@ -8978,6 +8978,7 @@ dependencies = [
"gpui",
"language",
"lsp",
"project",
"serde",
"serde_json",
"util",

View file

@ -34,6 +34,11 @@ impl ExtensionSlashCommandProxy for SlashCommandRegistryProxy {
self.slash_command_registry
.register_command(ExtensionSlashCommand::new(extension, command), false)
}
fn unregister_slash_command(&self, command_name: Arc<str>) {
self.slash_command_registry
.unregister_command_by_name(&command_name)
}
}
/// An adapter that allows an [`LspAdapterDelegate`] to be used as a [`WorktreeDelegate`].

View file

@ -8,6 +8,7 @@ mod tool_metrics;
use assertions::{AssertionsReport, display_error_row};
use instance::{ExampleInstance, JudgeOutput, RunOutput, run_git};
use language_extension::LspAccess;
pub(crate) use tool_metrics::*;
use ::fs::RealFs;
@ -415,7 +416,11 @@ pub fn init(cx: &mut App) -> Arc<AgentAppState> {
language::init(cx);
debug_adapter_extension::init(extension_host_proxy.clone(), cx);
language_extension::init(extension_host_proxy.clone(), languages.clone());
language_extension::init(
LspAccess::Noop,
extension_host_proxy.clone(),
languages.clone(),
);
language_model::init(client.clone(), cx);
language_models::init(user_store.clone(), client.clone(), cx);
languages::init(languages.clone(), node_runtime.clone(), cx);

View file

@ -286,7 +286,8 @@ pub trait ExtensionLanguageServerProxy: Send + Sync + 'static {
&self,
language: &LanguageName,
language_server_id: &LanguageServerName,
);
cx: &mut App,
) -> Task<Result<()>>;
fn update_language_server_status(
&self,
@ -313,12 +314,13 @@ impl ExtensionLanguageServerProxy for ExtensionHostProxy {
&self,
language: &LanguageName,
language_server_id: &LanguageServerName,
) {
cx: &mut App,
) -> Task<Result<()>> {
let Some(proxy) = self.language_server_proxy.read().clone() else {
return;
return Task::ready(Ok(()));
};
proxy.remove_language_server(language, language_server_id)
proxy.remove_language_server(language, language_server_id, cx)
}
fn update_language_server_status(
@ -350,6 +352,8 @@ impl ExtensionSnippetProxy for ExtensionHostProxy {
pub trait ExtensionSlashCommandProxy: Send + Sync + 'static {
fn register_slash_command(&self, extension: Arc<dyn Extension>, command: SlashCommand);
fn unregister_slash_command(&self, command_name: Arc<str>);
}
impl ExtensionSlashCommandProxy for ExtensionHostProxy {
@ -360,6 +364,14 @@ impl ExtensionSlashCommandProxy for ExtensionHostProxy {
proxy.register_slash_command(extension, command)
}
fn unregister_slash_command(&self, command_name: Arc<str>) {
let Some(proxy) = self.slash_command_proxy.read().clone() else {
return;
};
proxy.unregister_slash_command(command_name)
}
}
pub trait ExtensionContextServerProxy: Send + Sync + 'static {
@ -398,6 +410,8 @@ impl ExtensionContextServerProxy for ExtensionHostProxy {
pub trait ExtensionIndexedDocsProviderProxy: Send + Sync + 'static {
fn register_indexed_docs_provider(&self, extension: Arc<dyn Extension>, provider_id: Arc<str>);
fn unregister_indexed_docs_provider(&self, provider_id: Arc<str>);
}
impl ExtensionIndexedDocsProviderProxy for ExtensionHostProxy {
@ -408,6 +422,14 @@ impl ExtensionIndexedDocsProviderProxy for ExtensionHostProxy {
proxy.register_indexed_docs_provider(extension, provider_id)
}
fn unregister_indexed_docs_provider(&self, provider_id: Arc<str>) {
let Some(proxy) = self.indexed_docs_provider_proxy.read().clone() else {
return;
};
proxy.unregister_indexed_docs_provider(provider_id)
}
}
pub trait ExtensionDebugAdapterProviderProxy: Send + Sync + 'static {

View file

@ -20,6 +20,7 @@ use extension::{
ExtensionSnippetProxy, ExtensionThemeProxy,
};
use fs::{Fs, RemoveOptions};
use futures::future::join_all;
use futures::{
AsyncReadExt as _, Future, FutureExt as _, StreamExt as _,
channel::{
@ -860,8 +861,8 @@ impl ExtensionStore {
btree_map::Entry::Vacant(e) => e.insert(ExtensionOperation::Remove),
};
cx.spawn(async move |this, cx| {
let _finish = cx.on_drop(&this, {
cx.spawn(async move |extension_store, cx| {
let _finish = cx.on_drop(&extension_store, {
let extension_id = extension_id.clone();
move |this, cx| {
this.outstanding_operations.remove(extension_id.as_ref());
@ -876,22 +877,39 @@ impl ExtensionStore {
ignore_if_not_exists: true,
},
)
.await?;
.await
.with_context(|| format!("Removing extension dir {extension_dir:?}"))?;
// todo(windows)
// Stop the server here.
this.update(cx, |this, cx| this.reload(None, cx))?.await;
extension_store
.update(cx, |extension_store, cx| extension_store.reload(None, cx))?
.await;
fs.remove_dir(
// There's a race between wasm extension fully stopping and the directory removal.
// On Windows, it's impossible to remove a directory that has a process running in it.
for i in 0..3 {
cx.background_executor()
.timer(Duration::from_millis(i * 100))
.await;
let removal_result = fs
.remove_dir(
&work_dir,
RemoveOptions {
recursive: true,
ignore_if_not_exists: true,
},
)
.await?;
.await;
match removal_result {
Ok(()) => break,
Err(e) => {
if i == 2 {
log::error!("Failed to remove extension work dir {work_dir:?} : {e}");
}
}
}
}
this.update(cx, |_, cx| {
extension_store.update(cx, |_, cx| {
cx.emit(Event::ExtensionUninstalled(extension_id.clone()));
if let Some(events) = ExtensionEvents::try_global(cx) {
if let Some(manifest) = extension_manifest {
@ -1143,27 +1161,38 @@ impl ExtensionStore {
})
.collect::<Vec<_>>();
let mut grammars_to_remove = Vec::new();
let mut server_removal_tasks = Vec::with_capacity(extensions_to_unload.len());
for extension_id in &extensions_to_unload {
let Some(extension) = old_index.extensions.get(extension_id) else {
continue;
};
grammars_to_remove.extend(extension.manifest.grammars.keys().cloned());
for (language_server_name, config) in extension.manifest.language_servers.iter() {
for (language_server_name, config) in &extension.manifest.language_servers {
for language in config.languages() {
self.proxy
.remove_language_server(&language, language_server_name);
server_removal_tasks.push(self.proxy.remove_language_server(
&language,
language_server_name,
cx,
));
}
}
for (server_id, _) in extension.manifest.context_servers.iter() {
for (server_id, _) in &extension.manifest.context_servers {
self.proxy.unregister_context_server(server_id.clone(), cx);
}
for (adapter, _) in extension.manifest.debug_adapters.iter() {
for (adapter, _) in &extension.manifest.debug_adapters {
self.proxy.unregister_debug_adapter(adapter.clone());
}
for (locator, _) in extension.manifest.debug_locators.iter() {
for (locator, _) in &extension.manifest.debug_locators {
self.proxy.unregister_debug_locator(locator.clone());
}
for (command_name, _) in &extension.manifest.slash_commands {
self.proxy.unregister_slash_command(command_name.clone());
}
for (provider_id, _) in &extension.manifest.indexed_docs_providers {
self.proxy
.unregister_indexed_docs_provider(provider_id.clone());
}
}
self.wasm_extensions
@ -1268,14 +1297,15 @@ impl ExtensionStore {
cx.background_spawn({
let fs = fs.clone();
async move {
for theme_path in themes_to_add.into_iter() {
let _ = join_all(server_removal_tasks).await;
for theme_path in themes_to_add {
proxy
.load_user_theme(theme_path, fs.clone())
.await
.log_err();
}
for (icon_theme_path, icons_root_path) in icon_themes_to_add.into_iter() {
for (icon_theme_path, icons_root_path) in icon_themes_to_add {
proxy
.load_icon_theme(icon_theme_path, icons_root_path, fs.clone())
.await

View file

@ -11,6 +11,7 @@ use futures::{AsyncReadExt, StreamExt, io::BufReader};
use gpui::{AppContext as _, SemanticVersion, TestAppContext};
use http_client::{FakeHttpClient, Response};
use language::{BinaryStatus, LanguageMatcher, LanguageRegistry};
use language_extension::LspAccess;
use lsp::LanguageServerName;
use node_runtime::NodeRuntime;
use parking_lot::Mutex;
@ -271,7 +272,7 @@ async fn test_extension_store(cx: &mut TestAppContext) {
let theme_registry = Arc::new(ThemeRegistry::new(Box::new(())));
theme_extension::init(proxy.clone(), theme_registry.clone(), cx.executor());
let language_registry = Arc::new(LanguageRegistry::test(cx.executor()));
language_extension::init(proxy.clone(), language_registry.clone());
language_extension::init(LspAccess::Noop, proxy.clone(), language_registry.clone());
let node_runtime = NodeRuntime::unavailable();
let store = cx.new(|cx| {
@ -554,7 +555,11 @@ async fn test_extension_store_with_test_extension(cx: &mut TestAppContext) {
let theme_registry = Arc::new(ThemeRegistry::new(Box::new(())));
theme_extension::init(proxy.clone(), theme_registry.clone(), cx.executor());
let language_registry = project.read_with(cx, |project, _cx| project.languages().clone());
language_extension::init(proxy.clone(), language_registry.clone());
language_extension::init(
LspAccess::ViaLspStore(project.update(cx, |project, _| project.lsp_store())),
proxy.clone(),
language_registry.clone(),
);
let node_runtime = NodeRuntime::unavailable();
let mut status_updates = language_registry.language_server_binary_statuses();
@ -815,7 +820,6 @@ async fn test_extension_store_with_test_extension(cx: &mut TestAppContext) {
extension_store
.update(cx, |store, cx| store.reload(Some("gleam".into()), cx))
.await;
cx.executor().run_until_parked();
project.update(cx, |project, cx| {
project.restart_language_servers_for_buffers(vec![buffer.clone()], HashSet::default(), cx)

View file

@ -11,6 +11,7 @@ use extension::{
ExtensionLanguageServerProxy, ExtensionManifest,
};
use fs::{Fs, RemoveOptions, RenameOptions};
use futures::future::join_all;
use gpui::{App, AppContext as _, AsyncApp, Context, Entity, Task, WeakEntity};
use http_client::HttpClient;
use language::{LanguageConfig, LanguageName, LanguageQueries, LoadedLanguage};
@ -230,18 +231,27 @@ impl HeadlessExtensionStore {
.unwrap_or_default();
self.proxy.remove_languages(&languages_to_remove, &[]);
for (language_server_name, language) in self
let servers_to_remove = self
.loaded_language_servers
.remove(extension_id)
.unwrap_or_default()
{
self.proxy
.remove_language_server(&language, &language_server_name);
}
.unwrap_or_default();
let proxy = self.proxy.clone();
let path = self.extension_dir.join(&extension_id.to_string());
let fs = self.fs.clone();
cx.spawn(async move |_, _| {
cx.spawn(async move |_, cx| {
let mut removal_tasks = Vec::with_capacity(servers_to_remove.len());
cx.update(|cx| {
for (language_server_name, language) in servers_to_remove {
removal_tasks.push(proxy.remove_language_server(
&language,
&language_server_name,
cx,
));
}
})
.ok();
let _ = join_all(removal_tasks).await;
fs.remove_dir(
&path,
RemoveOptions {
@ -250,6 +260,7 @@ impl HeadlessExtensionStore {
},
)
.await
.with_context(|| format!("Removing directory {path:?}"))
})
}

View file

@ -54,7 +54,7 @@ pub struct WasmHost {
main_thread_message_tx: mpsc::UnboundedSender<MainThreadCall>,
}
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct WasmExtension {
tx: UnboundedSender<ExtensionCall>,
pub manifest: Arc<ExtensionManifest>,
@ -63,6 +63,12 @@ pub struct WasmExtension {
pub zed_api_version: SemanticVersion,
}
impl Drop for WasmExtension {
fn drop(&mut self) {
self.tx.close_channel();
}
}
#[async_trait]
impl extension::Extension for WasmExtension {
fn manifest(&self) -> Arc<ExtensionManifest> {
@ -742,7 +748,6 @@ impl WasmExtension {
{
let (return_tx, return_rx) = oneshot::channel();
self.tx
.clone()
.unbounded_send(Box::new(move |extension, store| {
async {
let result = f(extension, store).await;

View file

@ -29,6 +29,11 @@ impl ExtensionIndexedDocsProviderProxy for IndexedDocsRegistryProxy {
ProviderId(provider_id),
)));
}
fn unregister_indexed_docs_provider(&self, provider_id: Arc<str>) {
self.indexed_docs_registry
.unregister_provider(&ProviderId(provider_id));
}
}
pub struct ExtensionIndexedDocsProvider {

View file

@ -52,6 +52,10 @@ impl IndexedDocsRegistry {
);
}
pub fn unregister_provider(&self, provider_id: &ProviderId) {
self.stores_by_provider.write().remove(provider_id);
}
pub fn get_provider_store(&self, provider_id: ProviderId) -> Option<Arc<IndexedDocsStore>> {
self.stores_by_provider.read().get(&provider_id).cloned()
}

View file

@ -21,6 +21,7 @@ fs.workspace = true
gpui.workspace = true
language.workspace = true
lsp.workspace = true
project.workspace = true
serde.workspace = true
serde_json.workspace = true
util.workspace = true

View file

@ -6,21 +6,24 @@ use std::sync::Arc;
use anyhow::{Context as _, Result};
use async_trait::async_trait;
use collections::HashMap;
use collections::{HashMap, HashSet};
use extension::{Extension, ExtensionLanguageServerProxy, WorktreeDelegate};
use fs::Fs;
use futures::{Future, FutureExt};
use gpui::AsyncApp;
use futures::{Future, FutureExt, future::join_all};
use gpui::{App, AppContext, AsyncApp, Task};
use language::{
BinaryStatus, CodeLabel, HighlightId, Language, LanguageName, LanguageToolchainStore,
LspAdapter, LspAdapterDelegate,
};
use lsp::{CodeActionKind, LanguageServerBinary, LanguageServerBinaryOptions, LanguageServerName};
use lsp::{
CodeActionKind, LanguageServerBinary, LanguageServerBinaryOptions, LanguageServerName,
LanguageServerSelector,
};
use serde::Serialize;
use serde_json::Value;
use util::{ResultExt, fs::make_file_executable, maybe};
use crate::LanguageServerRegistryProxy;
use crate::{LanguageServerRegistryProxy, LspAccess};
/// An adapter that allows an [`LspAdapterDelegate`] to be used as a [`WorktreeDelegate`].
struct WorktreeDelegateAdapter(pub Arc<dyn LspAdapterDelegate>);
@ -71,10 +74,50 @@ impl ExtensionLanguageServerProxy for LanguageServerRegistryProxy {
fn remove_language_server(
&self,
language: &LanguageName,
language_server_id: &LanguageServerName,
) {
language_server_name: &LanguageServerName,
cx: &mut App,
) -> Task<Result<()>> {
self.language_registry
.remove_lsp_adapter(language, language_server_id);
.remove_lsp_adapter(language, language_server_name);
let mut tasks = Vec::new();
match &self.lsp_access {
LspAccess::ViaLspStore(lsp_store) => lsp_store.update(cx, |lsp_store, cx| {
let stop_task = lsp_store.stop_language_servers_for_buffers(
Vec::new(),
HashSet::from_iter([LanguageServerSelector::Name(
language_server_name.clone(),
)]),
cx,
);
tasks.push(stop_task);
}),
LspAccess::ViaWorkspaces(lsp_store_provider) => {
if let Ok(lsp_stores) = lsp_store_provider(cx) {
for lsp_store in lsp_stores {
lsp_store.update(cx, |lsp_store, cx| {
let stop_task = lsp_store.stop_language_servers_for_buffers(
Vec::new(),
HashSet::from_iter([LanguageServerSelector::Name(
language_server_name.clone(),
)]),
cx,
);
tasks.push(stop_task);
});
}
}
}
LspAccess::Noop => {}
}
cx.background_spawn(async move {
let results = join_all(tasks).await;
for result in results {
result?;
}
Ok(())
})
}
fn update_language_server_status(

View file

@ -5,13 +5,26 @@ use std::sync::Arc;
use anyhow::Result;
use extension::{ExtensionGrammarProxy, ExtensionHostProxy, ExtensionLanguageProxy};
use gpui::{App, Entity};
use language::{LanguageMatcher, LanguageName, LanguageRegistry, LoadedLanguage};
use project::LspStore;
#[derive(Clone)]
pub enum LspAccess {
ViaLspStore(Entity<LspStore>),
ViaWorkspaces(Arc<dyn Fn(&mut App) -> Result<Vec<Entity<LspStore>>> + Send + Sync + 'static>),
Noop,
}
pub fn init(
lsp_access: LspAccess,
extension_host_proxy: Arc<ExtensionHostProxy>,
language_registry: Arc<LanguageRegistry>,
) {
let language_server_registry_proxy = LanguageServerRegistryProxy { language_registry };
let language_server_registry_proxy = LanguageServerRegistryProxy {
language_registry,
lsp_access,
};
extension_host_proxy.register_grammar_proxy(language_server_registry_proxy.clone());
extension_host_proxy.register_language_proxy(language_server_registry_proxy.clone());
extension_host_proxy.register_language_server_proxy(language_server_registry_proxy);
@ -20,6 +33,7 @@ pub fn init(
#[derive(Clone)]
struct LanguageServerRegistryProxy {
language_registry: Arc<LanguageRegistry>,
lsp_access: LspAccess,
}
impl ExtensionGrammarProxy for LanguageServerRegistryProxy {

View file

@ -9712,7 +9712,8 @@ impl LspStore {
} else {
let buffers =
lsp_store.buffer_ids_to_buffers(envelope.payload.buffer_ids.into_iter(), cx);
lsp_store.stop_language_servers_for_buffers(
lsp_store
.stop_language_servers_for_buffers(
buffers,
envelope
.payload
@ -9720,11 +9721,11 @@ impl LspStore {
.into_iter()
.filter_map(|selector| {
Some(match selector.selector? {
proto::language_server_selector::Selector::ServerId(server_id) => {
LanguageServerSelector::Id(LanguageServerId::from_proto(
proto::language_server_selector::Selector::ServerId(
server_id,
))
}
) => LanguageServerSelector::Id(LanguageServerId::from_proto(
server_id,
)),
proto::language_server_selector::Selector::Name(name) => {
LanguageServerSelector::Name(LanguageServerName(
SharedString::from(name),
@ -9734,7 +9735,8 @@ impl LspStore {
})
.collect(),
cx,
);
)
.detach_and_log_err(cx);
}
})?;
@ -10290,9 +10292,9 @@ impl LspStore {
pub fn stop_language_servers_for_buffers(
&mut self,
buffers: Vec<Entity<Buffer>>,
also_restart_servers: HashSet<LanguageServerSelector>,
also_stop_servers: HashSet<LanguageServerSelector>,
cx: &mut Context<Self>,
) {
) -> Task<Result<()>> {
if let Some((client, project_id)) = self.upstream_client() {
let request = client.request(proto::StopLanguageServers {
project_id,
@ -10300,7 +10302,7 @@ impl LspStore {
.into_iter()
.map(|b| b.read(cx).remote_id().to_proto())
.collect(),
also_servers: also_restart_servers
also_servers: also_stop_servers
.into_iter()
.map(|selector| {
let selector = match selector {
@ -10322,24 +10324,31 @@ impl LspStore {
.collect(),
all: false,
});
cx.background_spawn(request).detach_and_log_err(cx);
cx.background_spawn(async move {
let _ = request.await?;
Ok(())
})
} else {
self.stop_local_language_servers_for_buffers(&buffers, also_restart_servers, cx)
.detach();
let task =
self.stop_local_language_servers_for_buffers(&buffers, also_stop_servers, cx);
cx.background_spawn(async move {
task.await;
Ok(())
})
}
}
fn stop_local_language_servers_for_buffers(
&mut self,
buffers: &[Entity<Buffer>],
also_restart_servers: HashSet<LanguageServerSelector>,
also_stop_servers: HashSet<LanguageServerSelector>,
cx: &mut Context<Self>,
) -> Task<()> {
let Some(local) = self.as_local_mut() else {
return Task::ready(());
};
let mut language_server_names_to_stop = BTreeSet::default();
let mut language_servers_to_stop = also_restart_servers
let mut language_servers_to_stop = also_stop_servers
.into_iter()
.flat_map(|selector| match selector {
LanguageServerSelector::Id(id) => Some(id),

View file

@ -3217,9 +3217,11 @@ impl Project {
also_restart_servers: HashSet<LanguageServerSelector>,
cx: &mut Context<Self>,
) {
self.lsp_store.update(cx, |lsp_store, cx| {
self.lsp_store
.update(cx, |lsp_store, cx| {
lsp_store.stop_language_servers_for_buffers(buffers, also_restart_servers, cx)
})
.detach_and_log_err(cx);
}
pub fn cancel_language_server_work_for_buffers(

View file

@ -77,7 +77,6 @@ impl HeadlessProject {
cx: &mut Context<Self>,
) -> Self {
debug_adapter_extension::init(proxy.clone(), cx);
language_extension::init(proxy.clone(), languages.clone());
languages::init(languages.clone(), node_runtime.clone(), cx);
let worktree_store = cx.new(|cx| {
@ -185,6 +184,11 @@ impl HeadlessProject {
});
cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
language_extension::init(
language_extension::LspAccess::ViaLspStore(lsp_store.clone()),
proxy.clone(),
languages.clone(),
);
cx.subscribe(
&buffer_store,

View file

@ -6657,6 +6657,10 @@ impl WorkspaceStore {
Ok(())
})?
}
pub fn workspaces(&self) -> &HashSet<WindowHandle<Workspace>> {
&self.workspaces
}
}
impl ViewId {

View file

@ -441,11 +441,31 @@ pub fn main() {
debug_adapter_extension::init(extension_host_proxy.clone(), cx);
language::init(cx);
language_extension::init(extension_host_proxy.clone(), languages.clone());
languages::init(languages.clone(), node_runtime.clone(), cx);
let user_store = cx.new(|cx| UserStore::new(client.clone(), cx));
let workspace_store = cx.new(|cx| WorkspaceStore::new(client.clone(), cx));
language_extension::init(
language_extension::LspAccess::ViaWorkspaces({
let workspace_store = workspace_store.clone();
Arc::new(move |cx: &mut App| {
workspace_store.update(cx, |workspace_store, cx| {
workspace_store
.workspaces()
.iter()
.map(|workspace| {
workspace.update(cx, |workspace, _, cx| {
workspace.project().read(cx).lsp_store()
})
})
.collect()
})
})
}),
extension_host_proxy.clone(),
languages.clone(),
);
Client::set_global(client.clone(), cx);
zed::init(cx);