Capture language servers' stderr into server logs
This commit is contained in:
parent
b94955910c
commit
54e7e2f59d
2 changed files with 112 additions and 47 deletions
|
@ -12,6 +12,7 @@ use gpui::{
|
||||||
ViewHandle, WeakModelHandle,
|
ViewHandle, WeakModelHandle,
|
||||||
};
|
};
|
||||||
use language::{Buffer, LanguageServerId, LanguageServerName};
|
use language::{Buffer, LanguageServerId, LanguageServerName};
|
||||||
|
use lsp::IoKind;
|
||||||
use project::{Project, Worktree};
|
use project::{Project, Worktree};
|
||||||
use std::{borrow::Cow, sync::Arc};
|
use std::{borrow::Cow, sync::Arc};
|
||||||
use theme::{ui, Theme};
|
use theme::{ui, Theme};
|
||||||
|
@ -26,7 +27,7 @@ const RECEIVE_LINE: &str = "// Receive:\n";
|
||||||
|
|
||||||
pub struct LogStore {
|
pub struct LogStore {
|
||||||
projects: HashMap<WeakModelHandle<Project>, ProjectState>,
|
projects: HashMap<WeakModelHandle<Project>, ProjectState>,
|
||||||
io_tx: mpsc::UnboundedSender<(WeakModelHandle<Project>, LanguageServerId, bool, String)>,
|
io_tx: mpsc::UnboundedSender<(WeakModelHandle<Project>, LanguageServerId, IoKind, String)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ProjectState {
|
struct ProjectState {
|
||||||
|
@ -37,12 +38,12 @@ struct ProjectState {
|
||||||
struct LanguageServerState {
|
struct LanguageServerState {
|
||||||
log_buffer: ModelHandle<Buffer>,
|
log_buffer: ModelHandle<Buffer>,
|
||||||
rpc_state: Option<LanguageServerRpcState>,
|
rpc_state: Option<LanguageServerRpcState>,
|
||||||
|
_subscription: Option<lsp::Subscription>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct LanguageServerRpcState {
|
struct LanguageServerRpcState {
|
||||||
buffer: ModelHandle<Buffer>,
|
buffer: ModelHandle<Buffer>,
|
||||||
last_message_kind: Option<MessageKind>,
|
last_message_kind: Option<MessageKind>,
|
||||||
_subscription: lsp::Subscription,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct LspLogView {
|
pub struct LspLogView {
|
||||||
|
@ -118,11 +119,11 @@ impl LogStore {
|
||||||
io_tx,
|
io_tx,
|
||||||
};
|
};
|
||||||
cx.spawn_weak(|this, mut cx| async move {
|
cx.spawn_weak(|this, mut cx| async move {
|
||||||
while let Some((project, server_id, is_output, mut message)) = io_rx.next().await {
|
while let Some((project, server_id, io_kind, mut message)) = io_rx.next().await {
|
||||||
if let Some(this) = this.upgrade(&cx) {
|
if let Some(this) = this.upgrade(&cx) {
|
||||||
this.update(&mut cx, |this, cx| {
|
this.update(&mut cx, |this, cx| {
|
||||||
message.push('\n');
|
message.push('\n');
|
||||||
this.on_io(project, server_id, is_output, &message, cx);
|
this.on_io(project, server_id, io_kind, &message, cx);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -168,22 +169,29 @@ impl LogStore {
|
||||||
cx: &mut ModelContext<Self>,
|
cx: &mut ModelContext<Self>,
|
||||||
) -> Option<ModelHandle<Buffer>> {
|
) -> Option<ModelHandle<Buffer>> {
|
||||||
let project_state = self.projects.get_mut(&project.downgrade())?;
|
let project_state = self.projects.get_mut(&project.downgrade())?;
|
||||||
Some(
|
let server_state = project_state.servers.entry(id).or_insert_with(|| {
|
||||||
project_state
|
cx.notify();
|
||||||
.servers
|
LanguageServerState {
|
||||||
.entry(id)
|
rpc_state: None,
|
||||||
.or_insert_with(|| {
|
log_buffer: cx
|
||||||
cx.notify();
|
.add_model(|cx| Buffer::new(0, cx.model_id() as u64, ""))
|
||||||
LanguageServerState {
|
.clone(),
|
||||||
rpc_state: None,
|
_subscription: None,
|
||||||
log_buffer: cx
|
}
|
||||||
.add_model(|cx| Buffer::new(0, cx.model_id() as u64, ""))
|
});
|
||||||
.clone(),
|
|
||||||
}
|
let server = project.read(cx).language_server_for_id(id);
|
||||||
})
|
let weak_project = project.downgrade();
|
||||||
.log_buffer
|
let io_tx = self.io_tx.clone();
|
||||||
.clone(),
|
server_state._subscription = server.map(|server| {
|
||||||
)
|
server.on_io(move |io_kind, message| {
|
||||||
|
io_tx
|
||||||
|
.unbounded_send((weak_project, id, io_kind, message.to_string()))
|
||||||
|
.ok();
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
Some(server_state.log_buffer.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add_language_server_log(
|
fn add_language_server_log(
|
||||||
|
@ -230,7 +238,7 @@ impl LogStore {
|
||||||
Some(server_state.log_buffer.clone())
|
Some(server_state.log_buffer.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn enable_rpc_trace_for_language_server(
|
fn enable_rpc_trace_for_language_server(
|
||||||
&mut self,
|
&mut self,
|
||||||
project: &ModelHandle<Project>,
|
project: &ModelHandle<Project>,
|
||||||
server_id: LanguageServerId,
|
server_id: LanguageServerId,
|
||||||
|
@ -239,9 +247,7 @@ impl LogStore {
|
||||||
let weak_project = project.downgrade();
|
let weak_project = project.downgrade();
|
||||||
let project_state = self.projects.get_mut(&weak_project)?;
|
let project_state = self.projects.get_mut(&weak_project)?;
|
||||||
let server_state = project_state.servers.get_mut(&server_id)?;
|
let server_state = project_state.servers.get_mut(&server_id)?;
|
||||||
let server = project.read(cx).language_server_for_id(server_id)?;
|
|
||||||
let rpc_state = server_state.rpc_state.get_or_insert_with(|| {
|
let rpc_state = server_state.rpc_state.get_or_insert_with(|| {
|
||||||
let io_tx = self.io_tx.clone();
|
|
||||||
let language = project.read(cx).languages().language_for_name("JSON");
|
let language = project.read(cx).languages().language_for_name("JSON");
|
||||||
let buffer = cx.add_model(|cx| Buffer::new(0, cx.model_id() as u64, ""));
|
let buffer = cx.add_model(|cx| Buffer::new(0, cx.model_id() as u64, ""));
|
||||||
cx.spawn_weak({
|
cx.spawn_weak({
|
||||||
|
@ -258,11 +264,6 @@ impl LogStore {
|
||||||
LanguageServerRpcState {
|
LanguageServerRpcState {
|
||||||
buffer,
|
buffer,
|
||||||
last_message_kind: None,
|
last_message_kind: None,
|
||||||
_subscription: server.on_io(move |is_received, json| {
|
|
||||||
io_tx
|
|
||||||
.unbounded_send((weak_project, server_id, is_received, json.to_string()))
|
|
||||||
.ok();
|
|
||||||
}),
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
Some(rpc_state.buffer.clone())
|
Some(rpc_state.buffer.clone())
|
||||||
|
@ -285,10 +286,25 @@ impl LogStore {
|
||||||
&mut self,
|
&mut self,
|
||||||
project: WeakModelHandle<Project>,
|
project: WeakModelHandle<Project>,
|
||||||
language_server_id: LanguageServerId,
|
language_server_id: LanguageServerId,
|
||||||
is_received: bool,
|
io_kind: IoKind,
|
||||||
message: &str,
|
message: &str,
|
||||||
cx: &mut AppContext,
|
cx: &mut AppContext,
|
||||||
) -> Option<()> {
|
) -> Option<()> {
|
||||||
|
let is_received = match io_kind {
|
||||||
|
IoKind::StdOut => true,
|
||||||
|
IoKind::StdIn => false,
|
||||||
|
IoKind::StdErr => {
|
||||||
|
let project = project.upgrade(cx)?;
|
||||||
|
project.update(cx, |_, cx| {
|
||||||
|
cx.emit(project::Event::LanguageServerLog(
|
||||||
|
language_server_id,
|
||||||
|
format!("stderr: {}\n", message.trim()),
|
||||||
|
))
|
||||||
|
});
|
||||||
|
return Some(());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let state = self
|
let state = self
|
||||||
.projects
|
.projects
|
||||||
.get_mut(&project)?
|
.get_mut(&project)?
|
||||||
|
|
|
@ -35,7 +35,14 @@ const CONTENT_LEN_HEADER: &str = "Content-Length: ";
|
||||||
|
|
||||||
type NotificationHandler = Box<dyn Send + FnMut(Option<usize>, &str, AsyncAppContext)>;
|
type NotificationHandler = Box<dyn Send + FnMut(Option<usize>, &str, AsyncAppContext)>;
|
||||||
type ResponseHandler = Box<dyn Send + FnOnce(Result<String, Error>)>;
|
type ResponseHandler = Box<dyn Send + FnOnce(Result<String, Error>)>;
|
||||||
type IoHandler = Box<dyn Send + FnMut(bool, &str)>;
|
type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub enum IoKind {
|
||||||
|
StdOut,
|
||||||
|
StdIn,
|
||||||
|
StdErr,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
pub struct LanguageServerBinary {
|
pub struct LanguageServerBinary {
|
||||||
|
@ -144,16 +151,18 @@ impl LanguageServer {
|
||||||
.args(binary.arguments)
|
.args(binary.arguments)
|
||||||
.stdin(Stdio::piped())
|
.stdin(Stdio::piped())
|
||||||
.stdout(Stdio::piped())
|
.stdout(Stdio::piped())
|
||||||
.stderr(Stdio::inherit())
|
.stderr(Stdio::piped())
|
||||||
.kill_on_drop(true)
|
.kill_on_drop(true)
|
||||||
.spawn()?;
|
.spawn()?;
|
||||||
|
|
||||||
let stdin = server.stdin.take().unwrap();
|
let stdin = server.stdin.take().unwrap();
|
||||||
let stout = server.stdout.take().unwrap();
|
let stdout = server.stdout.take().unwrap();
|
||||||
|
let stderr = server.stderr.take().unwrap();
|
||||||
let mut server = Self::new_internal(
|
let mut server = Self::new_internal(
|
||||||
server_id.clone(),
|
server_id.clone(),
|
||||||
stdin,
|
stdin,
|
||||||
stout,
|
stdout,
|
||||||
|
stderr,
|
||||||
Some(server),
|
Some(server),
|
||||||
root_path,
|
root_path,
|
||||||
code_action_kinds,
|
code_action_kinds,
|
||||||
|
@ -181,10 +190,11 @@ impl LanguageServer {
|
||||||
Ok(server)
|
Ok(server)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new_internal<Stdin, Stdout, F>(
|
fn new_internal<Stdin, Stdout, Stderr, F>(
|
||||||
server_id: LanguageServerId,
|
server_id: LanguageServerId,
|
||||||
stdin: Stdin,
|
stdin: Stdin,
|
||||||
stdout: Stdout,
|
stdout: Stdout,
|
||||||
|
stderr: Stderr,
|
||||||
server: Option<Child>,
|
server: Option<Child>,
|
||||||
root_path: &Path,
|
root_path: &Path,
|
||||||
code_action_kinds: Option<Vec<CodeActionKind>>,
|
code_action_kinds: Option<Vec<CodeActionKind>>,
|
||||||
|
@ -194,7 +204,8 @@ impl LanguageServer {
|
||||||
where
|
where
|
||||||
Stdin: AsyncWrite + Unpin + Send + 'static,
|
Stdin: AsyncWrite + Unpin + Send + 'static,
|
||||||
Stdout: AsyncRead + Unpin + Send + 'static,
|
Stdout: AsyncRead + Unpin + Send + 'static,
|
||||||
F: FnMut(AnyNotification) + 'static + Send,
|
Stderr: AsyncRead + Unpin + Send + 'static,
|
||||||
|
F: FnMut(AnyNotification) + 'static + Send + Clone,
|
||||||
{
|
{
|
||||||
let (outbound_tx, outbound_rx) = channel::unbounded::<String>();
|
let (outbound_tx, outbound_rx) = channel::unbounded::<String>();
|
||||||
let (output_done_tx, output_done_rx) = barrier::channel();
|
let (output_done_tx, output_done_rx) = barrier::channel();
|
||||||
|
@ -203,17 +214,26 @@ impl LanguageServer {
|
||||||
let response_handlers =
|
let response_handlers =
|
||||||
Arc::new(Mutex::new(Some(HashMap::<_, ResponseHandler>::default())));
|
Arc::new(Mutex::new(Some(HashMap::<_, ResponseHandler>::default())));
|
||||||
let io_handlers = Arc::new(Mutex::new(HashMap::default()));
|
let io_handlers = Arc::new(Mutex::new(HashMap::default()));
|
||||||
let input_task = cx.spawn(|cx| {
|
|
||||||
Self::handle_input(
|
let stdout_input_task = cx.spawn(|cx| {
|
||||||
stdout,
|
{
|
||||||
on_unhandled_notification,
|
Self::handle_input(
|
||||||
notification_handlers.clone(),
|
stdout,
|
||||||
response_handlers.clone(),
|
on_unhandled_notification.clone(),
|
||||||
io_handlers.clone(),
|
notification_handlers.clone(),
|
||||||
cx,
|
response_handlers.clone(),
|
||||||
)
|
io_handlers.clone(),
|
||||||
|
cx,
|
||||||
|
)
|
||||||
|
}
|
||||||
.log_err()
|
.log_err()
|
||||||
});
|
});
|
||||||
|
let stderr_input_task =
|
||||||
|
cx.spawn(|_| Self::handle_stderr(stderr, io_handlers.clone()).log_err());
|
||||||
|
let input_task = cx.spawn(|_| async move {
|
||||||
|
let (stdout, stderr) = futures::join!(stdout_input_task, stderr_input_task);
|
||||||
|
stdout.or(stderr)
|
||||||
|
});
|
||||||
let output_task = cx.background().spawn({
|
let output_task = cx.background().spawn({
|
||||||
Self::handle_output(
|
Self::handle_output(
|
||||||
stdin,
|
stdin,
|
||||||
|
@ -284,7 +304,7 @@ impl LanguageServer {
|
||||||
if let Ok(message) = str::from_utf8(&buffer) {
|
if let Ok(message) = str::from_utf8(&buffer) {
|
||||||
log::trace!("incoming message:{}", message);
|
log::trace!("incoming message:{}", message);
|
||||||
for handler in io_handlers.lock().values_mut() {
|
for handler in io_handlers.lock().values_mut() {
|
||||||
handler(true, message);
|
handler(IoKind::StdOut, message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -327,6 +347,30 @@ impl LanguageServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_stderr<Stderr>(
|
||||||
|
stderr: Stderr,
|
||||||
|
io_handlers: Arc<Mutex<HashMap<usize, IoHandler>>>,
|
||||||
|
) -> anyhow::Result<()>
|
||||||
|
where
|
||||||
|
Stderr: AsyncRead + Unpin + Send + 'static,
|
||||||
|
{
|
||||||
|
let mut stderr = BufReader::new(stderr);
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
loop {
|
||||||
|
buffer.clear();
|
||||||
|
stderr.read_until(b'\n', &mut buffer).await?;
|
||||||
|
if let Ok(message) = str::from_utf8(&buffer) {
|
||||||
|
log::trace!("incoming stderr message:{message}");
|
||||||
|
for handler in io_handlers.lock().values_mut() {
|
||||||
|
handler(IoKind::StdErr, message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Don't starve the main thread when receiving lots of messages at once.
|
||||||
|
smol::future::yield_now().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_output<Stdin>(
|
async fn handle_output<Stdin>(
|
||||||
stdin: Stdin,
|
stdin: Stdin,
|
||||||
outbound_rx: channel::Receiver<String>,
|
outbound_rx: channel::Receiver<String>,
|
||||||
|
@ -348,7 +392,7 @@ impl LanguageServer {
|
||||||
while let Ok(message) = outbound_rx.recv().await {
|
while let Ok(message) = outbound_rx.recv().await {
|
||||||
log::trace!("outgoing message:{}", message);
|
log::trace!("outgoing message:{}", message);
|
||||||
for handler in io_handlers.lock().values_mut() {
|
for handler in io_handlers.lock().values_mut() {
|
||||||
handler(false, &message);
|
handler(IoKind::StdIn, &message);
|
||||||
}
|
}
|
||||||
|
|
||||||
content_len_buffer.clear();
|
content_len_buffer.clear();
|
||||||
|
@ -532,7 +576,7 @@ impl LanguageServer {
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn on_io<F>(&self, f: F) -> Subscription
|
pub fn on_io<F>(&self, f: F) -> Subscription
|
||||||
where
|
where
|
||||||
F: 'static + Send + FnMut(bool, &str),
|
F: 'static + Send + FnMut(IoKind, &str),
|
||||||
{
|
{
|
||||||
let id = self.next_id.fetch_add(1, SeqCst);
|
let id = self.next_id.fetch_add(1, SeqCst);
|
||||||
self.io_handlers.lock().insert(id, Box::new(f));
|
self.io_handlers.lock().insert(id, Box::new(f));
|
||||||
|
@ -845,12 +889,16 @@ impl LanguageServer {
|
||||||
) -> (Self, FakeLanguageServer) {
|
) -> (Self, FakeLanguageServer) {
|
||||||
let (stdin_writer, stdin_reader) = async_pipe::pipe();
|
let (stdin_writer, stdin_reader) = async_pipe::pipe();
|
||||||
let (stdout_writer, stdout_reader) = async_pipe::pipe();
|
let (stdout_writer, stdout_reader) = async_pipe::pipe();
|
||||||
|
// writers will be dropped after we exit, so readers will also be noop for the fake servers
|
||||||
|
let (_stderr_writer, stderr_reader) = async_pipe::pipe();
|
||||||
|
let (_stderr_writer_2, stderr_reader_2) = async_pipe::pipe();
|
||||||
let (notifications_tx, notifications_rx) = channel::unbounded();
|
let (notifications_tx, notifications_rx) = channel::unbounded();
|
||||||
|
|
||||||
let server = Self::new_internal(
|
let server = Self::new_internal(
|
||||||
LanguageServerId(0),
|
LanguageServerId(0),
|
||||||
stdin_writer,
|
stdin_writer,
|
||||||
stdout_reader,
|
stdout_reader,
|
||||||
|
stderr_reader,
|
||||||
None,
|
None,
|
||||||
Path::new("/"),
|
Path::new("/"),
|
||||||
None,
|
None,
|
||||||
|
@ -862,6 +910,7 @@ impl LanguageServer {
|
||||||
LanguageServerId(0),
|
LanguageServerId(0),
|
||||||
stdout_writer,
|
stdout_writer,
|
||||||
stdin_reader,
|
stdin_reader,
|
||||||
|
stderr_reader_2,
|
||||||
None,
|
None,
|
||||||
Path::new("/"),
|
Path::new("/"),
|
||||||
None,
|
None,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue