Implement RunningKernel trait for native and remote kernels (#20934)
This PR introduces a unified interface for both native and remote kernels through the `RunningKernel` trait. When either the native kernel or the remote kernels are started, they return a `Box<dyn RunningKernel>` to make it easier to work with the session. As a bonus of this refactor, I've dropped some of the mpsc channels to instead opt for passing messages directly to `session.route(message)`. There was a lot of simplification of `Session` by moving responsibilities to `NativeRunningKernel`. No release notes yet until this is finalized. * [x] Detect remote kernelspecs from configured remote servers * [x] Launch kernel on demand For now, this allows you to set env vars `JUPYTER_SERVER` and `JUPYTER_TOKEN` to access a remote server. `JUPYTER_SERVER` should be a base path like `http://localhost:8888` or `https://notebooks.gesis.org/binder/jupyter/user/rubydata-binder-w6igpy4l/` Release Notes: - N/A
This commit is contained in:
parent
f74f670865
commit
72613b7668
8 changed files with 478 additions and 230 deletions
|
@ -1,4 +1,5 @@
|
|||
use crate::components::KernelListItem;
|
||||
use crate::kernels::RemoteRunningKernel;
|
||||
use crate::setup_editor_session_actions;
|
||||
use crate::{
|
||||
kernels::{Kernel, KernelSpecification, NativeRunningKernel},
|
||||
|
@ -15,8 +16,7 @@ use editor::{
|
|||
scroll::Autoscroll,
|
||||
Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
|
||||
};
|
||||
use futures::io::BufReader;
|
||||
use futures::{AsyncBufReadExt as _, FutureExt as _, StreamExt as _};
|
||||
use futures::FutureExt as _;
|
||||
use gpui::{
|
||||
div, prelude::*, EventEmitter, Model, Render, Subscription, Task, View, ViewContext, WeakView,
|
||||
};
|
||||
|
@ -29,14 +29,13 @@ use runtimelib::{
|
|||
use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
|
||||
use theme::ActiveTheme;
|
||||
use ui::{prelude::*, IconButtonShape, Tooltip};
|
||||
use util::ResultExt as _;
|
||||
|
||||
pub struct Session {
|
||||
fs: Arc<dyn Fs>,
|
||||
editor: WeakView<Editor>,
|
||||
pub kernel: Kernel,
|
||||
blocks: HashMap<String, EditorBlock>,
|
||||
messaging_task: Option<Task<()>>,
|
||||
process_status_task: Option<Task<()>>,
|
||||
pub kernel_specification: KernelSpecification,
|
||||
telemetry: Arc<Telemetry>,
|
||||
_buffer_subscription: Subscription,
|
||||
|
@ -219,8 +218,6 @@ impl Session {
|
|||
fs,
|
||||
editor,
|
||||
kernel: Kernel::StartingKernel(Task::ready(()).shared()),
|
||||
messaging_task: None,
|
||||
process_status_task: None,
|
||||
blocks: HashMap::default(),
|
||||
kernel_specification,
|
||||
_buffer_subscription: subscription,
|
||||
|
@ -246,6 +243,8 @@ impl Session {
|
|||
cx.entity_id().to_string(),
|
||||
);
|
||||
|
||||
let session_view = cx.view().clone();
|
||||
|
||||
let kernel = match self.kernel_specification.clone() {
|
||||
KernelSpecification::Jupyter(kernel_specification)
|
||||
| KernelSpecification::PythonEnv(kernel_specification) => NativeRunningKernel::new(
|
||||
|
@ -253,11 +252,15 @@ impl Session {
|
|||
entity_id,
|
||||
working_directory,
|
||||
self.fs.clone(),
|
||||
session_view,
|
||||
cx,
|
||||
),
|
||||
KernelSpecification::Remote(remote_kernel_specification) => RemoteRunningKernel::new(
|
||||
remote_kernel_specification,
|
||||
working_directory,
|
||||
session_view,
|
||||
cx,
|
||||
),
|
||||
KernelSpecification::Remote(_remote_kernel_specification) => {
|
||||
unimplemented!()
|
||||
}
|
||||
};
|
||||
|
||||
let pending_kernel = cx
|
||||
|
@ -265,119 +268,15 @@ impl Session {
|
|||
let kernel = kernel.await;
|
||||
|
||||
match kernel {
|
||||
Ok((mut kernel, mut messages_rx)) => {
|
||||
Ok(kernel) => {
|
||||
this.update(&mut cx, |session, cx| {
|
||||
let stderr = kernel.process.stderr.take();
|
||||
|
||||
cx.spawn(|_session, mut _cx| async move {
|
||||
if stderr.is_none() {
|
||||
return;
|
||||
}
|
||||
let reader = BufReader::new(stderr.unwrap());
|
||||
let mut lines = reader.lines();
|
||||
while let Some(Ok(line)) = lines.next().await {
|
||||
// todo!(): Log stdout and stderr to something the session can show
|
||||
log::error!("kernel: {}", line);
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
|
||||
let stdout = kernel.process.stdout.take();
|
||||
|
||||
cx.spawn(|_session, mut _cx| async move {
|
||||
if stdout.is_none() {
|
||||
return;
|
||||
}
|
||||
let reader = BufReader::new(stdout.unwrap());
|
||||
let mut lines = reader.lines();
|
||||
while let Some(Ok(line)) = lines.next().await {
|
||||
log::info!("kernel: {}", line);
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
|
||||
let status = kernel.process.status();
|
||||
session.kernel(Kernel::RunningKernel(Box::new(kernel)), cx);
|
||||
|
||||
let process_status_task = cx.spawn(|session, mut cx| async move {
|
||||
let error_message = match status.await {
|
||||
Ok(status) => {
|
||||
if status.success() {
|
||||
log::info!("kernel process exited successfully");
|
||||
return;
|
||||
}
|
||||
|
||||
format!("kernel process exited with status: {:?}", status)
|
||||
}
|
||||
Err(err) => {
|
||||
format!("kernel process exited with error: {:?}", err)
|
||||
}
|
||||
};
|
||||
|
||||
log::error!("{}", error_message);
|
||||
|
||||
session
|
||||
.update(&mut cx, |session, cx| {
|
||||
session.kernel(
|
||||
Kernel::ErroredLaunch(error_message.clone()),
|
||||
cx,
|
||||
);
|
||||
|
||||
session.blocks.values().for_each(|block| {
|
||||
block.execution_view.update(
|
||||
cx,
|
||||
|execution_view, cx| {
|
||||
match execution_view.status {
|
||||
ExecutionStatus::Finished => {
|
||||
// Do nothing when the output was good
|
||||
}
|
||||
_ => {
|
||||
// All other cases, set the status to errored
|
||||
execution_view.status =
|
||||
ExecutionStatus::KernelErrored(
|
||||
error_message.clone(),
|
||||
)
|
||||
}
|
||||
}
|
||||
cx.notify();
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
cx.notify();
|
||||
})
|
||||
.ok();
|
||||
});
|
||||
|
||||
session.process_status_task = Some(process_status_task);
|
||||
|
||||
session.messaging_task = Some(cx.spawn(|session, mut cx| async move {
|
||||
while let Some(message) = messages_rx.next().await {
|
||||
session
|
||||
.update(&mut cx, |session, cx| {
|
||||
session.route(&message, cx);
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
}));
|
||||
|
||||
// todo!(@rgbkrk): send KernelInfoRequest once our shell channel read/writes are split
|
||||
// cx.spawn(|this, mut cx| async move {
|
||||
// cx.background_executor()
|
||||
// .timer(Duration::from_millis(120))
|
||||
// .await;
|
||||
// this.update(&mut cx, |this, cx| {
|
||||
// this.send(KernelInfoRequest {}.into(), cx).ok();
|
||||
// })
|
||||
// .ok();
|
||||
// })
|
||||
// .detach();
|
||||
session.kernel(Kernel::RunningKernel(kernel), cx);
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
Err(err) => {
|
||||
this.update(&mut cx, |session, cx| {
|
||||
session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
|
||||
session.kernel_errored(err.to_string(), cx);
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
|
@ -389,6 +288,26 @@ impl Session {
|
|||
cx.notify();
|
||||
}
|
||||
|
||||
pub fn kernel_errored(&mut self, error_message: String, cx: &mut ViewContext<Self>) {
|
||||
self.kernel(Kernel::ErroredLaunch(error_message.clone()), cx);
|
||||
|
||||
self.blocks.values().for_each(|block| {
|
||||
block.execution_view.update(cx, |execution_view, cx| {
|
||||
match execution_view.status {
|
||||
ExecutionStatus::Finished => {
|
||||
// Do nothing when the output was good
|
||||
}
|
||||
_ => {
|
||||
// All other cases, set the status to errored
|
||||
execution_view.status =
|
||||
ExecutionStatus::KernelErrored(error_message.clone())
|
||||
}
|
||||
}
|
||||
cx.notify();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
fn on_buffer_event(
|
||||
&mut self,
|
||||
buffer: Model<MultiBuffer>,
|
||||
|
@ -559,7 +478,7 @@ impl Session {
|
|||
}
|
||||
}
|
||||
|
||||
fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
|
||||
pub fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
|
||||
let parent_message_id = match message.parent_header.as_ref() {
|
||||
Some(header) => &header.msg_id,
|
||||
None => return,
|
||||
|
@ -639,21 +558,17 @@ impl Session {
|
|||
Kernel::RunningKernel(mut kernel) => {
|
||||
let mut request_tx = kernel.request_tx().clone();
|
||||
|
||||
let forced = kernel.force_shutdown(cx);
|
||||
|
||||
cx.spawn(|this, mut cx| async move {
|
||||
let message: JupyterMessage = ShutdownRequest { restart: false }.into();
|
||||
request_tx.try_send(message).ok();
|
||||
|
||||
forced.await.log_err();
|
||||
|
||||
// Give the kernel a bit of time to clean up
|
||||
cx.background_executor().timer(Duration::from_secs(3)).await;
|
||||
|
||||
this.update(&mut cx, |session, _cx| {
|
||||
session.messaging_task.take();
|
||||
session.process_status_task.take();
|
||||
})
|
||||
.ok();
|
||||
|
||||
kernel.force_shutdown().ok();
|
||||
|
||||
this.update(&mut cx, |session, cx| {
|
||||
session.clear_outputs(cx);
|
||||
session.kernel(Kernel::Shutdown, cx);
|
||||
|
@ -664,8 +579,6 @@ impl Session {
|
|||
.detach();
|
||||
}
|
||||
_ => {
|
||||
self.messaging_task.take();
|
||||
self.process_status_task.take();
|
||||
self.kernel(Kernel::Shutdown, cx);
|
||||
}
|
||||
}
|
||||
|
@ -682,23 +595,19 @@ impl Session {
|
|||
Kernel::RunningKernel(mut kernel) => {
|
||||
let mut request_tx = kernel.request_tx().clone();
|
||||
|
||||
let forced = kernel.force_shutdown(cx);
|
||||
|
||||
cx.spawn(|this, mut cx| async move {
|
||||
// Send shutdown request with restart flag
|
||||
log::debug!("restarting kernel");
|
||||
let message: JupyterMessage = ShutdownRequest { restart: true }.into();
|
||||
request_tx.try_send(message).ok();
|
||||
|
||||
this.update(&mut cx, |session, _cx| {
|
||||
session.messaging_task.take();
|
||||
session.process_status_task.take();
|
||||
})
|
||||
.ok();
|
||||
|
||||
// Wait for kernel to shutdown
|
||||
cx.background_executor().timer(Duration::from_secs(1)).await;
|
||||
|
||||
// Force kill the kernel if it hasn't shut down
|
||||
kernel.force_shutdown().ok();
|
||||
forced.await.log_err();
|
||||
|
||||
// Start a new kernel
|
||||
this.update(&mut cx, |session, cx| {
|
||||
|
@ -711,9 +620,6 @@ impl Session {
|
|||
.detach();
|
||||
}
|
||||
_ => {
|
||||
// If it's not already running, we can just clean up and start a new kernel
|
||||
self.messaging_task.take();
|
||||
self.process_status_task.take();
|
||||
self.clear_outputs(cx);
|
||||
self.start_kernel(cx);
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue