diff --git a/Cargo.lock b/Cargo.lock index 34bccbc8b4..d950324d2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9926,6 +9926,7 @@ dependencies = [ "editor", "env_logger 0.11.5", "feature_flags", + "file_icons", "futures 0.3.31", "gpui", "http_client", diff --git a/crates/repl/Cargo.toml b/crates/repl/Cargo.toml index 60e8734771..293a58e762 100644 --- a/crates/repl/Cargo.toml +++ b/crates/repl/Cargo.toml @@ -22,8 +22,10 @@ collections.workspace = true command_palette_hooks.workspace = true editor.workspace = true feature_flags.workspace = true +file_icons.workspace = true futures.workspace = true gpui.workspace = true +http_client.workspace = true image.workspace = true jupyter-websocket-client.workspace = true jupyter-protocol.workspace = true diff --git a/crates/repl/src/components/kernel_options.rs b/crates/repl/src/components/kernel_options.rs index fc0213e54e..8fd9b412ea 100644 --- a/crates/repl/src/components/kernel_options.rs +++ b/crates/repl/src/components/kernel_options.rs @@ -34,6 +34,16 @@ pub struct KernelPickerDelegate { on_select: OnSelect, } +// Helper function to truncate long paths +fn truncate_path(path: &SharedString, max_length: usize) -> SharedString { + if path.len() <= max_length { + path.to_string().into() + } else { + let truncated = path.chars().rev().take(max_length - 3).collect::(); + format!("...{}", truncated.chars().rev().collect::()).into() + } +} + impl KernelSelector { pub fn new(on_select: OnSelect, worktree_id: WorktreeId, trigger: T) -> Self { KernelSelector { @@ -116,11 +126,25 @@ impl PickerDelegate for KernelPickerDelegate { &self, ix: usize, selected: bool, - _cx: &mut ViewContext>, + cx: &mut ViewContext>, ) -> Option { let kernelspec = self.filtered_kernels.get(ix)?; - let is_selected = self.selected_kernelspec.as_ref() == Some(kernelspec); + let icon = kernelspec.icon(cx); + + let (name, kernel_type, path_or_url) = match kernelspec { + KernelSpecification::Jupyter(_) => (kernelspec.name(), "Jupyter", None), + KernelSpecification::PythonEnv(_) => ( + kernelspec.name(), + "Python Env", + Some(truncate_path(&kernelspec.path(), 42)), + ), + KernelSpecification::Remote(_) => ( + kernelspec.name(), + "Remote", + Some(truncate_path(&kernelspec.path(), 42)), + ), + }; Some( ListItem::new(ix) @@ -128,25 +152,46 @@ impl PickerDelegate for KernelPickerDelegate { .spacing(ListItemSpacing::Sparse) .selected(selected) .child( - v_flex() - .min_w(px(600.)) + h_flex() .w_full() - .gap_0p5() + .gap_3() + .child(icon.color(Color::Default).size(IconSize::Medium)) .child( - h_flex() - .w_full() - .gap_1() - .child(Label::new(kernelspec.name()).weight(FontWeight::MEDIUM)) + v_flex() + .flex_grow() + .gap_0p5() .child( - Label::new(kernelspec.language()) - .size(LabelSize::Small) - .color(Color::Muted), + h_flex() + .justify_between() + .child( + div().w_48().text_ellipsis().child( + Label::new(name) + .weight(FontWeight::MEDIUM) + .size(LabelSize::Default), + ), + ) + .when_some(path_or_url.clone(), |flex, path| { + flex.text_ellipsis().child( + Label::new(path) + .size(LabelSize::Small) + .color(Color::Muted), + ) + }), + ) + .child( + h_flex() + .gap_1() + .child( + Label::new(kernelspec.language()) + .size(LabelSize::Small) + .color(Color::Muted), + ) + .child( + Label::new(kernel_type) + .size(LabelSize::Small) + .color(Color::Muted), + ), ), - ) - .child( - Label::new(kernelspec.path()) - .size(LabelSize::XSmall) - .color(Color::Muted), ), ) .when(is_selected, |item| { @@ -199,7 +244,9 @@ impl RenderOnce for KernelSelector { }; let picker_view = cx.new_view(|cx| { - let picker = Picker::uniform_list(delegate, cx).max_height(Some(rems(20.).into())); + let picker = Picker::uniform_list(delegate, cx) + .width(rems(30.)) + .max_height(Some(rems(20.).into())); picker }); diff --git a/crates/repl/src/kernels/mod.rs b/crates/repl/src/kernels/mod.rs index 3fe4c3c12d..47fde97154 100644 --- a/crates/repl/src/kernels/mod.rs +++ b/crates/repl/src/kernels/mod.rs @@ -6,7 +6,7 @@ use futures::{ future::Shared, stream, }; -use gpui::{AppContext, Model, Task}; +use gpui::{AppContext, Model, Task, WindowContext}; use language::LanguageName; pub use native_kernel::*; @@ -16,7 +16,7 @@ pub use remote_kernels::*; use anyhow::Result; use runtimelib::{ExecutionState, JupyterKernelspec, JupyterMessage, KernelInfoReply}; -use ui::SharedString; +use ui::{Icon, IconName, SharedString}; pub type JupyterMessageChannel = stream::SelectAll>; @@ -59,6 +59,19 @@ impl KernelSpecification { Self::Remote(spec) => spec.kernelspec.language.clone(), }) } + + pub fn icon(&self, cx: &AppContext) -> Icon { + let lang_name = match self { + Self::Jupyter(spec) => spec.kernelspec.language.clone(), + Self::PythonEnv(spec) => spec.kernelspec.language.clone(), + Self::Remote(spec) => spec.kernelspec.language.clone(), + }; + + file_icons::FileIcons::get(cx) + .get_type_icon(&lang_name.to_lowercase()) + .map(Icon::from_path) + .unwrap_or(Icon::new(IconName::ReplNeutral)) + } } pub fn python_env_kernel_specifications( @@ -134,7 +147,7 @@ pub trait RunningKernel: Send + Debug { fn set_execution_state(&mut self, state: ExecutionState); fn kernel_info(&self) -> Option<&KernelInfoReply>; fn set_kernel_info(&mut self, info: KernelInfoReply); - fn force_shutdown(&mut self) -> anyhow::Result<()>; + fn force_shutdown(&mut self, cx: &mut WindowContext) -> Task>; } #[derive(Debug, Clone)] diff --git a/crates/repl/src/kernels/native_kernel.rs b/crates/repl/src/kernels/native_kernel.rs index 03a57b34ef..6f7c5d92ee 100644 --- a/crates/repl/src/kernels/native_kernel.rs +++ b/crates/repl/src/kernels/native_kernel.rs @@ -1,10 +1,11 @@ use anyhow::{Context as _, Result}; use futures::{ channel::mpsc::{self}, + io::BufReader, stream::{SelectAll, StreamExt}, - SinkExt as _, + AsyncBufReadExt as _, SinkExt as _, }; -use gpui::{AppContext, EntityId, Task}; +use gpui::{EntityId, Task, View, WindowContext}; use jupyter_protocol::{JupyterMessage, JupyterMessageContent, KernelInfoReply}; use project::Fs; use runtimelib::{dirs, ConnectionInfo, ExecutionState, JupyterKernelspec}; @@ -18,7 +19,9 @@ use std::{ }; use uuid::Uuid; -use super::{JupyterMessageChannel, RunningKernel}; +use crate::Session; + +use super::RunningKernel; #[derive(Debug, Clone)] pub struct LocalKernelSpecification { @@ -83,10 +86,10 @@ async fn peek_ports(ip: IpAddr) -> Result<[u16; 5]> { pub struct NativeRunningKernel { pub process: smol::process::Child, _shell_task: Task>, - _iopub_task: Task>, _control_task: Task>, _routing_task: Task>, connection_path: PathBuf, + _process_status_task: Option>, pub working_directory: PathBuf, pub request_tx: mpsc::Sender, pub execution_state: ExecutionState, @@ -107,8 +110,10 @@ impl NativeRunningKernel { entity_id: EntityId, working_directory: PathBuf, fs: Arc, - cx: &mut AppContext, - ) -> Task> { + // todo: convert to weak view + session: View, + cx: &mut WindowContext, + ) -> Task>> { cx.spawn(|cx| async move { let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); let ports = peek_ports(ip).await?; @@ -136,7 +141,7 @@ impl NativeRunningKernel { let mut cmd = kernel_specification.command(&connection_path)?; - let process = cmd + let mut process = cmd .current_dir(&working_directory) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) @@ -155,8 +160,6 @@ impl NativeRunningKernel { let mut control_socket = runtimelib::create_client_control_connection(&connection_info, &session_id).await?; - let (mut iopub, iosub) = futures::channel::mpsc::channel(100); - let (request_tx, mut request_rx) = futures::channel::mpsc::channel::(100); @@ -164,18 +167,41 @@ impl NativeRunningKernel { let (mut shell_reply_tx, shell_reply_rx) = futures::channel::mpsc::channel(100); let mut messages_rx = SelectAll::new(); - messages_rx.push(iosub); messages_rx.push(control_reply_rx); messages_rx.push(shell_reply_rx); - let iopub_task = cx.background_executor().spawn({ - async move { - while let Ok(message) = iopub_socket.read().await { - iopub.send(message).await?; + cx.spawn({ + let session = session.clone(); + + |mut cx| async move { + while let Some(message) = messages_rx.next().await { + session + .update(&mut cx, |session, cx| { + session.route(&message, cx); + }) + .ok(); } anyhow::Ok(()) } - }); + }) + .detach(); + + // iopub task + cx.spawn({ + let session = session.clone(); + + |mut cx| async move { + while let Ok(message) = iopub_socket.read().await { + session + .update(&mut cx, |session, cx| { + session.route(&message, cx); + }) + .ok(); + } + anyhow::Ok(()) + } + }) + .detach(); let (mut control_request_tx, mut control_request_rx) = futures::channel::mpsc::channel(100); @@ -221,21 +247,74 @@ impl NativeRunningKernel { } }); - anyhow::Ok(( - Self { - process, - request_tx, - working_directory, - _shell_task: shell_task, - _iopub_task: iopub_task, - _control_task: control_task, - _routing_task: routing_task, - connection_path, - execution_state: ExecutionState::Idle, - kernel_info: None, - }, - messages_rx, - )) + let stderr = process.stderr.take(); + + cx.spawn(|mut _cx| async move { + if stderr.is_none() { + return; + } + let reader = BufReader::new(stderr.unwrap()); + let mut lines = reader.lines(); + while let Some(Ok(line)) = lines.next().await { + log::error!("kernel: {}", line); + } + }) + .detach(); + + let stdout = process.stdout.take(); + + cx.spawn(|mut _cx| async move { + if stdout.is_none() { + return; + } + let reader = BufReader::new(stdout.unwrap()); + let mut lines = reader.lines(); + while let Some(Ok(line)) = lines.next().await { + log::info!("kernel: {}", line); + } + }) + .detach(); + + let status = process.status(); + + let process_status_task = cx.spawn(|mut cx| async move { + let error_message = match status.await { + Ok(status) => { + if status.success() { + log::info!("kernel process exited successfully"); + return; + } + + format!("kernel process exited with status: {:?}", status) + } + Err(err) => { + format!("kernel process exited with error: {:?}", err) + } + }; + + log::error!("{}", error_message); + + session + .update(&mut cx, |session, cx| { + session.kernel_errored(error_message, cx); + + cx.notify(); + }) + .ok(); + }); + + anyhow::Ok(Box::new(Self { + process, + request_tx, + working_directory, + _process_status_task: Some(process_status_task), + _shell_task: shell_task, + _control_task: control_task, + _routing_task: routing_task, + connection_path, + execution_state: ExecutionState::Idle, + kernel_info: None, + }) as Box) }) } } @@ -265,14 +344,17 @@ impl RunningKernel for NativeRunningKernel { self.kernel_info = Some(info); } - fn force_shutdown(&mut self) -> anyhow::Result<()> { - match self.process.kill() { + fn force_shutdown(&mut self, _cx: &mut WindowContext) -> Task> { + self._process_status_task.take(); + self.request_tx.close_channel(); + + Task::ready(match self.process.kill() { Ok(_) => Ok(()), Err(error) => Err(anyhow::anyhow!( "Failed to kill the kernel process: {}", error )), - } + }) } } diff --git a/crates/repl/src/kernels/remote_kernels.rs b/crates/repl/src/kernels/remote_kernels.rs index 9d2d5f2810..808a7dbf02 100644 --- a/crates/repl/src/kernels/remote_kernels.rs +++ b/crates/repl/src/kernels/remote_kernels.rs @@ -1,12 +1,21 @@ -use futures::{channel::mpsc, StreamExt as _}; -use gpui::AppContext; +use futures::{channel::mpsc, SinkExt as _}; +use gpui::{Task, View, WindowContext}; +use http_client::{AsyncBody, HttpClient, Request}; use jupyter_protocol::{ExecutionState, JupyterMessage, KernelInfoReply}; -// todo(kyle): figure out if this needs to be different use runtimelib::JupyterKernelspec; +use futures::StreamExt; +use smol::io::AsyncReadExt as _; + +use crate::Session; + use super::RunningKernel; -use jupyter_websocket_client::RemoteServer; -use std::fmt::Debug; +use anyhow::Result; +use jupyter_websocket_client::{ + JupyterWebSocketReader, JupyterWebSocketWriter, KernelLaunchRequest, KernelSpecsResponse, + RemoteServer, +}; +use std::{fmt::Debug, sync::Arc}; #[derive(Debug, Clone)] pub struct RemoteKernelSpecification { @@ -16,6 +25,101 @@ pub struct RemoteKernelSpecification { pub kernelspec: JupyterKernelspec, } +pub async fn launch_remote_kernel( + remote_server: &RemoteServer, + http_client: Arc, + kernel_name: &str, + _path: &str, +) -> Result { + // + let kernel_launch_request = KernelLaunchRequest { + name: kernel_name.to_string(), + // todo: add path to runtimelib + // path, + }; + + let kernel_launch_request = serde_json::to_string(&kernel_launch_request)?; + + let request = Request::builder() + .method("POST") + .uri(&remote_server.api_url("/kernels")) + .header("Authorization", format!("token {}", remote_server.token)) + .body(AsyncBody::from(kernel_launch_request))?; + + let response = http_client.send(request).await?; + + if !response.status().is_success() { + let mut body = String::new(); + response.into_body().read_to_string(&mut body).await?; + return Err(anyhow::anyhow!("Failed to launch kernel: {}", body)); + } + + let mut body = String::new(); + response.into_body().read_to_string(&mut body).await?; + + let response: jupyter_websocket_client::Kernel = serde_json::from_str(&body)?; + + Ok(response.id) +} + +pub async fn list_remote_kernelspecs( + remote_server: RemoteServer, + http_client: Arc, +) -> Result> { + let url = remote_server.api_url("/kernelspecs"); + + let request = Request::builder() + .method("GET") + .uri(&url) + .header("Authorization", format!("token {}", remote_server.token)) + .body(AsyncBody::default())?; + + let response = http_client.send(request).await?; + + if response.status().is_success() { + let mut body = response.into_body(); + + let mut body_bytes = Vec::new(); + body.read_to_end(&mut body_bytes).await?; + + let kernel_specs: KernelSpecsResponse = serde_json::from_slice(&body_bytes)?; + + let remote_kernelspecs = kernel_specs + .kernelspecs + .into_iter() + .map(|(name, spec)| RemoteKernelSpecification { + name: name.clone(), + url: remote_server.base_url.clone(), + token: remote_server.token.clone(), + // todo: line up the jupyter kernelspec from runtimelib with + // the kernelspec pulled from the API + // + // There are _small_ differences, so we may just want a impl `From` + kernelspec: JupyterKernelspec { + argv: spec.spec.argv, + display_name: spec.spec.display_name, + language: spec.spec.language, + // todo: fix up mismatch in types here + metadata: None, + interrupt_mode: None, + env: None, + }, + }) + .collect::>(); + + if remote_kernelspecs.is_empty() { + Err(anyhow::anyhow!("No kernel specs found")) + } else { + Ok(remote_kernelspecs.clone()) + } + } else { + Err(anyhow::anyhow!( + "Failed to fetch kernel specs: {}", + response.status() + )) + } +} + impl PartialEq for RemoteKernelSpecification { fn eq(&self, other: &Self) -> bool { self.name == other.name && self.url == other.url @@ -26,55 +130,91 @@ impl Eq for RemoteKernelSpecification {} pub struct RemoteRunningKernel { remote_server: RemoteServer, + _receiving_task: Task>, + _routing_task: Task>, + http_client: Arc, pub working_directory: std::path::PathBuf, pub request_tx: mpsc::Sender, pub execution_state: ExecutionState, pub kernel_info: Option, + pub kernel_id: String, } impl RemoteRunningKernel { - pub async fn new( + pub fn new( kernelspec: RemoteKernelSpecification, working_directory: std::path::PathBuf, - request_tx: mpsc::Sender, - _cx: &mut AppContext, - ) -> anyhow::Result<( - Self, - (), // Stream - )> { + session: View, + cx: &mut WindowContext, + ) -> Task>> { let remote_server = RemoteServer { base_url: kernelspec.url, token: kernelspec.token, }; - // todo: launch a kernel to get a kernel ID - let kernel_id = "not-implemented"; + let http_client = cx.http_client(); - let kernel_socket = remote_server.connect_to_kernel(kernel_id).await?; + cx.spawn(|cx| async move { + let kernel_id = launch_remote_kernel( + &remote_server, + http_client.clone(), + &kernelspec.name, + working_directory.to_str().unwrap_or_default(), + ) + .await?; - let (mut _w, mut _r) = kernel_socket.split(); + let kernel_socket = remote_server.connect_to_kernel(&kernel_id).await?; - let (_messages_tx, _messages_rx) = mpsc::channel::(100); + let (mut w, mut r): (JupyterWebSocketWriter, JupyterWebSocketReader) = + kernel_socket.split(); - // let routing_task = cx.background_executor().spawn({ - // async move { - // while let Some(message) = request_rx.next().await { - // w.send(message).await; - // } - // } - // }); - // let messages_rx = r.into(); + let (request_tx, mut request_rx) = + futures::channel::mpsc::channel::(100); - anyhow::Ok(( - Self { + let routing_task = cx.background_executor().spawn({ + async move { + while let Some(message) = request_rx.next().await { + w.send(message).await.ok(); + } + Ok(()) + } + }); + + let receiving_task = cx.spawn({ + let session = session.clone(); + + |mut cx| async move { + while let Some(message) = r.next().await { + match message { + Ok(message) => { + session + .update(&mut cx, |session, cx| { + session.route(&message, cx); + }) + .ok(); + } + Err(e) => { + log::error!("Error receiving message: {:?}", e); + } + } + } + Ok(()) + } + }); + + anyhow::Ok(Box::new(Self { + _routing_task: routing_task, + _receiving_task: receiving_task, remote_server, working_directory, request_tx, + // todo(kyle): pull this from the kernel API to start with execution_state: ExecutionState::Idle, kernel_info: None, - }, - (), - )) + kernel_id, + http_client: http_client.clone(), + }) as Box) + }) } } @@ -116,7 +256,30 @@ impl RunningKernel for RemoteRunningKernel { self.kernel_info = Some(info); } - fn force_shutdown(&mut self) -> anyhow::Result<()> { - unimplemented!("force_shutdown") + fn force_shutdown(&mut self, cx: &mut WindowContext) -> Task> { + let url = self + .remote_server + .api_url(&format!("/kernels/{}", self.kernel_id)); + let token = self.remote_server.token.clone(); + let http_client = self.http_client.clone(); + + cx.spawn(|_| async move { + let request = Request::builder() + .method("DELETE") + .uri(&url) + .header("Authorization", format!("token {}", token)) + .body(AsyncBody::default())?; + + let response = http_client.send(request).await?; + + if response.status().is_success() { + Ok(()) + } else { + Err(anyhow::anyhow!( + "Failed to shutdown kernel: {}", + response.status() + )) + } + }) } } diff --git a/crates/repl/src/repl_store.rs b/crates/repl/src/repl_store.rs index a4863b809b..27854c0eee 100644 --- a/crates/repl/src/repl_store.rs +++ b/crates/repl/src/repl_store.rs @@ -7,11 +7,14 @@ use command_palette_hooks::CommandPaletteFilter; use gpui::{ prelude::*, AppContext, EntityId, Global, Model, ModelContext, Subscription, Task, View, }; +use jupyter_websocket_client::RemoteServer; use language::Language; use project::{Fs, Project, WorktreeId}; use settings::{Settings, SettingsStore}; -use crate::kernels::{local_kernel_specifications, python_env_kernel_specifications}; +use crate::kernels::{ + list_remote_kernelspecs, local_kernel_specifications, python_env_kernel_specifications, +}; use crate::{JupyterSettings, KernelSpecification, Session}; struct GlobalReplStore(Model); @@ -141,19 +144,50 @@ impl ReplStore { }) } + fn get_remote_kernel_specifications( + &self, + cx: &mut ModelContext, + ) -> Option>>> { + match ( + std::env::var("JUPYTER_SERVER"), + std::env::var("JUPYTER_TOKEN"), + ) { + (Ok(server), Ok(token)) => { + let remote_server = RemoteServer { + base_url: server, + token, + }; + let http_client = cx.http_client(); + Some(cx.spawn(|_, _| async move { + list_remote_kernelspecs(remote_server, http_client) + .await + .map(|specs| specs.into_iter().map(KernelSpecification::Remote).collect()) + })) + } + _ => None, + } + } + pub fn refresh_kernelspecs(&mut self, cx: &mut ModelContext) -> Task> { let local_kernel_specifications = local_kernel_specifications(self.fs.clone()); - cx.spawn(|this, mut cx| async move { - let local_kernel_specifications = local_kernel_specifications.await?; + let remote_kernel_specifications = self.get_remote_kernel_specifications(cx); - let mut kernel_options = Vec::new(); - for kernel_specification in local_kernel_specifications { - kernel_options.push(KernelSpecification::Jupyter(kernel_specification)); + cx.spawn(|this, mut cx| async move { + let mut all_specs = local_kernel_specifications + .await? + .into_iter() + .map(KernelSpecification::Jupyter) + .collect::>(); + + if let Some(remote_task) = remote_kernel_specifications { + if let Ok(remote_specs) = remote_task.await { + all_specs.extend(remote_specs); + } } this.update(&mut cx, |this, cx| { - this.kernel_specifications = kernel_options; + this.kernel_specifications = all_specs; cx.notify(); }) }) diff --git a/crates/repl/src/session.rs b/crates/repl/src/session.rs index 513e85719d..0c1dc287ed 100644 --- a/crates/repl/src/session.rs +++ b/crates/repl/src/session.rs @@ -1,4 +1,5 @@ use crate::components::KernelListItem; +use crate::kernels::RemoteRunningKernel; use crate::setup_editor_session_actions; use crate::{ kernels::{Kernel, KernelSpecification, NativeRunningKernel}, @@ -15,8 +16,7 @@ use editor::{ scroll::Autoscroll, Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint, }; -use futures::io::BufReader; -use futures::{AsyncBufReadExt as _, FutureExt as _, StreamExt as _}; +use futures::FutureExt as _; use gpui::{ div, prelude::*, EventEmitter, Model, Render, Subscription, Task, View, ViewContext, WeakView, }; @@ -29,14 +29,13 @@ use runtimelib::{ use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration}; use theme::ActiveTheme; use ui::{prelude::*, IconButtonShape, Tooltip}; +use util::ResultExt as _; pub struct Session { fs: Arc, editor: WeakView, pub kernel: Kernel, blocks: HashMap, - messaging_task: Option>, - process_status_task: Option>, pub kernel_specification: KernelSpecification, telemetry: Arc, _buffer_subscription: Subscription, @@ -219,8 +218,6 @@ impl Session { fs, editor, kernel: Kernel::StartingKernel(Task::ready(()).shared()), - messaging_task: None, - process_status_task: None, blocks: HashMap::default(), kernel_specification, _buffer_subscription: subscription, @@ -246,6 +243,8 @@ impl Session { cx.entity_id().to_string(), ); + let session_view = cx.view().clone(); + let kernel = match self.kernel_specification.clone() { KernelSpecification::Jupyter(kernel_specification) | KernelSpecification::PythonEnv(kernel_specification) => NativeRunningKernel::new( @@ -253,11 +252,15 @@ impl Session { entity_id, working_directory, self.fs.clone(), + session_view, + cx, + ), + KernelSpecification::Remote(remote_kernel_specification) => RemoteRunningKernel::new( + remote_kernel_specification, + working_directory, + session_view, cx, ), - KernelSpecification::Remote(_remote_kernel_specification) => { - unimplemented!() - } }; let pending_kernel = cx @@ -265,119 +268,15 @@ impl Session { let kernel = kernel.await; match kernel { - Ok((mut kernel, mut messages_rx)) => { + Ok(kernel) => { this.update(&mut cx, |session, cx| { - let stderr = kernel.process.stderr.take(); - - cx.spawn(|_session, mut _cx| async move { - if stderr.is_none() { - return; - } - let reader = BufReader::new(stderr.unwrap()); - let mut lines = reader.lines(); - while let Some(Ok(line)) = lines.next().await { - // todo!(): Log stdout and stderr to something the session can show - log::error!("kernel: {}", line); - } - }) - .detach(); - - let stdout = kernel.process.stdout.take(); - - cx.spawn(|_session, mut _cx| async move { - if stdout.is_none() { - return; - } - let reader = BufReader::new(stdout.unwrap()); - let mut lines = reader.lines(); - while let Some(Ok(line)) = lines.next().await { - log::info!("kernel: {}", line); - } - }) - .detach(); - - let status = kernel.process.status(); - session.kernel(Kernel::RunningKernel(Box::new(kernel)), cx); - - let process_status_task = cx.spawn(|session, mut cx| async move { - let error_message = match status.await { - Ok(status) => { - if status.success() { - log::info!("kernel process exited successfully"); - return; - } - - format!("kernel process exited with status: {:?}", status) - } - Err(err) => { - format!("kernel process exited with error: {:?}", err) - } - }; - - log::error!("{}", error_message); - - session - .update(&mut cx, |session, cx| { - session.kernel( - Kernel::ErroredLaunch(error_message.clone()), - cx, - ); - - session.blocks.values().for_each(|block| { - block.execution_view.update( - cx, - |execution_view, cx| { - match execution_view.status { - ExecutionStatus::Finished => { - // Do nothing when the output was good - } - _ => { - // All other cases, set the status to errored - execution_view.status = - ExecutionStatus::KernelErrored( - error_message.clone(), - ) - } - } - cx.notify(); - }, - ); - }); - - cx.notify(); - }) - .ok(); - }); - - session.process_status_task = Some(process_status_task); - - session.messaging_task = Some(cx.spawn(|session, mut cx| async move { - while let Some(message) = messages_rx.next().await { - session - .update(&mut cx, |session, cx| { - session.route(&message, cx); - }) - .ok(); - } - })); - - // todo!(@rgbkrk): send KernelInfoRequest once our shell channel read/writes are split - // cx.spawn(|this, mut cx| async move { - // cx.background_executor() - // .timer(Duration::from_millis(120)) - // .await; - // this.update(&mut cx, |this, cx| { - // this.send(KernelInfoRequest {}.into(), cx).ok(); - // }) - // .ok(); - // }) - // .detach(); + session.kernel(Kernel::RunningKernel(kernel), cx); }) .ok(); } Err(err) => { this.update(&mut cx, |session, cx| { - session.kernel(Kernel::ErroredLaunch(err.to_string()), cx); + session.kernel_errored(err.to_string(), cx); }) .ok(); } @@ -389,6 +288,26 @@ impl Session { cx.notify(); } + pub fn kernel_errored(&mut self, error_message: String, cx: &mut ViewContext) { + self.kernel(Kernel::ErroredLaunch(error_message.clone()), cx); + + self.blocks.values().for_each(|block| { + block.execution_view.update(cx, |execution_view, cx| { + match execution_view.status { + ExecutionStatus::Finished => { + // Do nothing when the output was good + } + _ => { + // All other cases, set the status to errored + execution_view.status = + ExecutionStatus::KernelErrored(error_message.clone()) + } + } + cx.notify(); + }); + }); + } + fn on_buffer_event( &mut self, buffer: Model, @@ -559,7 +478,7 @@ impl Session { } } - fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext) { + pub fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext) { let parent_message_id = match message.parent_header.as_ref() { Some(header) => &header.msg_id, None => return, @@ -639,21 +558,17 @@ impl Session { Kernel::RunningKernel(mut kernel) => { let mut request_tx = kernel.request_tx().clone(); + let forced = kernel.force_shutdown(cx); + cx.spawn(|this, mut cx| async move { let message: JupyterMessage = ShutdownRequest { restart: false }.into(); request_tx.try_send(message).ok(); + forced.await.log_err(); + // Give the kernel a bit of time to clean up cx.background_executor().timer(Duration::from_secs(3)).await; - this.update(&mut cx, |session, _cx| { - session.messaging_task.take(); - session.process_status_task.take(); - }) - .ok(); - - kernel.force_shutdown().ok(); - this.update(&mut cx, |session, cx| { session.clear_outputs(cx); session.kernel(Kernel::Shutdown, cx); @@ -664,8 +579,6 @@ impl Session { .detach(); } _ => { - self.messaging_task.take(); - self.process_status_task.take(); self.kernel(Kernel::Shutdown, cx); } } @@ -682,23 +595,19 @@ impl Session { Kernel::RunningKernel(mut kernel) => { let mut request_tx = kernel.request_tx().clone(); + let forced = kernel.force_shutdown(cx); + cx.spawn(|this, mut cx| async move { // Send shutdown request with restart flag log::debug!("restarting kernel"); let message: JupyterMessage = ShutdownRequest { restart: true }.into(); request_tx.try_send(message).ok(); - this.update(&mut cx, |session, _cx| { - session.messaging_task.take(); - session.process_status_task.take(); - }) - .ok(); - // Wait for kernel to shutdown cx.background_executor().timer(Duration::from_secs(1)).await; // Force kill the kernel if it hasn't shut down - kernel.force_shutdown().ok(); + forced.await.log_err(); // Start a new kernel this.update(&mut cx, |session, cx| { @@ -711,9 +620,6 @@ impl Session { .detach(); } _ => { - // If it's not already running, we can just clean up and start a new kernel - self.messaging_task.take(); - self.process_status_task.take(); self.clear_outputs(cx); self.start_kernel(cx); }