ZIm/crates/repl/src/session.rs
Max Brunsfeld d209eab058
Combine excerpt footer and header into a single block (#19441)
This simplifies rendering of excerpt headers and footers, and removes
the need to store a `BlockDisposition` on these boundary blocks. It's a
step toward implementing "replace blocks", which we want to use in the
assistant panel.

We've also cleaned up the way heights are specified for headers and
footers and fixed some visual asymmetries between the "expand upward"
and "expand downward" buttons.

Release Notes:

- N/A

---------

Co-authored-by: Richard <richard@zed.dev>
2024-10-18 17:58:07 -07:00

765 lines
28 KiB
Rust

use crate::components::KernelListItem;
use crate::KernelStatus;
use crate::{
kernels::{Kernel, KernelSpecification, RunningKernel},
outputs::{ExecutionStatus, ExecutionView},
};
use client::telemetry::Telemetry;
use collections::{HashMap, HashSet};
use editor::{
display_map::{
BlockContext, BlockDisposition, BlockId, BlockProperties, BlockStyle, CustomBlockId,
RenderBlock,
},
scroll::Autoscroll,
Anchor, AnchorRangeExt as _, Editor, MultiBuffer, ToPoint,
};
use futures::io::BufReader;
use futures::{AsyncBufReadExt as _, FutureExt as _, StreamExt as _};
use gpui::{
div, prelude::*, EventEmitter, Model, Render, Subscription, Task, View, ViewContext, WeakView,
};
use language::Point;
use project::Fs;
use runtimelib::{
ExecuteRequest, ExecutionState, InterruptRequest, JupyterMessage, JupyterMessageContent,
ShutdownRequest,
};
use std::{env::temp_dir, ops::Range, sync::Arc, time::Duration};
use theme::ActiveTheme;
use ui::{prelude::*, IconButtonShape, Tooltip};
pub struct Session {
fs: Arc<dyn Fs>,
editor: WeakView<Editor>,
pub kernel: Kernel,
blocks: HashMap<String, EditorBlock>,
messaging_task: Option<Task<()>>,
process_status_task: Option<Task<()>>,
pub kernel_specification: KernelSpecification,
telemetry: Arc<Telemetry>,
_buffer_subscription: Subscription,
}
struct EditorBlock {
code_range: Range<Anchor>,
invalidation_anchor: Anchor,
block_id: CustomBlockId,
execution_view: View<ExecutionView>,
}
type CloseBlockFn =
Arc<dyn for<'a> Fn(CustomBlockId, &'a mut WindowContext) + Send + Sync + 'static>;
impl EditorBlock {
fn new(
editor: WeakView<Editor>,
code_range: Range<Anchor>,
status: ExecutionStatus,
on_close: CloseBlockFn,
cx: &mut ViewContext<Session>,
) -> anyhow::Result<Self> {
let editor = editor
.upgrade()
.ok_or_else(|| anyhow::anyhow!("editor is not open"))?;
let workspace = editor
.read(cx)
.workspace()
.ok_or_else(|| anyhow::anyhow!("workspace dropped"))?;
let execution_view =
cx.new_view(|cx| ExecutionView::new(status, workspace.downgrade(), cx));
let (block_id, invalidation_anchor) = editor.update(cx, |editor, cx| {
let buffer = editor.buffer().clone();
let buffer_snapshot = buffer.read(cx).snapshot(cx);
let end_point = code_range.end.to_point(&buffer_snapshot);
let next_row_start = end_point + Point::new(1, 0);
if next_row_start > buffer_snapshot.max_point() {
buffer.update(cx, |buffer, cx| {
buffer.edit(
[(
buffer_snapshot.max_point()..buffer_snapshot.max_point(),
"\n",
)],
None,
cx,
)
});
}
let invalidation_anchor = buffer.read(cx).read(cx).anchor_before(next_row_start);
let block = BlockProperties {
position: code_range.end,
// Take up at least one height for status, allow the editor to determine the real height based on the content from render
height: 1,
style: BlockStyle::Sticky,
render: Self::create_output_area_renderer(execution_view.clone(), on_close.clone()),
disposition: BlockDisposition::Below,
priority: 0,
};
let block_id = editor.insert_blocks([block], None, cx)[0];
(block_id, invalidation_anchor)
});
anyhow::Ok(Self {
code_range,
invalidation_anchor,
block_id,
execution_view,
})
}
fn handle_message(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Session>) {
self.execution_view.update(cx, |execution_view, cx| {
execution_view.push_message(&message.content, cx);
});
}
fn create_output_area_renderer(
execution_view: View<ExecutionView>,
on_close: CloseBlockFn,
) -> RenderBlock {
let render = move |cx: &mut BlockContext| {
let execution_view = execution_view.clone();
let text_style = crate::outputs::plain::text_style(cx);
let gutter = cx.gutter_dimensions;
let block_id = cx.block_id;
let on_close = on_close.clone();
let rem_size = cx.rem_size();
let text_line_height = text_style.line_height_in_pixels(rem_size);
let close_button = h_flex()
.flex_none()
.items_center()
.justify_center()
.absolute()
.top(text_line_height / 2.)
.right(
// 2px is a magic number to nudge the button just a bit closer to
// the line number start
gutter.full_width() / 2.0 - text_line_height / 2.0 - px(2.),
)
.w(text_line_height)
.h(text_line_height)
.child(
IconButton::new("close_output_area", IconName::Close)
.icon_size(IconSize::Small)
.icon_color(Color::Muted)
.size(ButtonSize::Compact)
.shape(IconButtonShape::Square)
.tooltip(|cx| Tooltip::text("Close output area", cx))
.on_click(move |_, cx| {
if let BlockId::Custom(block_id) = block_id {
(on_close)(block_id, cx)
}
}),
);
div()
.id(cx.block_id)
.flex()
.items_start()
.min_h(text_line_height)
.w_full()
.border_y_1()
.border_color(cx.theme().colors().border)
.bg(cx.theme().colors().background)
.child(
div()
.relative()
.w(gutter.full_width())
.h(text_line_height * 2)
.child(close_button),
)
.child(
div()
.flex_1()
.size_full()
.py(text_line_height / 2.)
.mr(gutter.width)
.child(execution_view),
)
.into_any_element()
};
Box::new(render)
}
}
impl Session {
pub fn new(
editor: WeakView<Editor>,
fs: Arc<dyn Fs>,
telemetry: Arc<Telemetry>,
kernel_specification: KernelSpecification,
cx: &mut ViewContext<Self>,
) -> Self {
let subscription = match editor.upgrade() {
Some(editor) => {
let buffer = editor.read(cx).buffer().clone();
cx.subscribe(&buffer, Self::on_buffer_event)
}
None => Subscription::new(|| {}),
};
let mut session = Self {
fs,
editor,
kernel: Kernel::StartingKernel(Task::ready(()).shared()),
messaging_task: None,
process_status_task: None,
blocks: HashMap::default(),
kernel_specification,
_buffer_subscription: subscription,
telemetry,
};
session.start_kernel(cx);
session
}
fn start_kernel(&mut self, cx: &mut ViewContext<Self>) {
let kernel_language = self.kernel_specification.kernelspec.language.clone();
let entity_id = self.editor.entity_id();
let working_directory = self
.editor
.upgrade()
.and_then(|editor| editor.read(cx).working_directory(cx))
.unwrap_or_else(temp_dir);
self.telemetry.report_repl_event(
kernel_language.clone(),
KernelStatus::Starting.to_string(),
cx.entity_id().to_string(),
);
let kernel = RunningKernel::new(
self.kernel_specification.clone(),
entity_id,
working_directory,
self.fs.clone(),
cx,
);
let pending_kernel = cx
.spawn(|this, mut cx| async move {
let kernel = kernel.await;
match kernel {
Ok((mut kernel, mut messages_rx)) => {
this.update(&mut cx, |session, cx| {
let stderr = kernel.process.stderr.take();
cx.spawn(|_session, mut _cx| async move {
if stderr.is_none() {
return;
}
let reader = BufReader::new(stderr.unwrap());
let mut lines = reader.lines();
while let Some(Ok(line)) = lines.next().await {
// todo!(): Log stdout and stderr to something the session can show
log::error!("kernel: {}", line);
}
})
.detach();
let stdout = kernel.process.stdout.take();
cx.spawn(|_session, mut _cx| async move {
if stdout.is_none() {
return;
}
let reader = BufReader::new(stdout.unwrap());
let mut lines = reader.lines();
while let Some(Ok(line)) = lines.next().await {
log::info!("kernel: {}", line);
}
})
.detach();
let status = kernel.process.status();
session.kernel(Kernel::RunningKernel(kernel), cx);
let process_status_task = cx.spawn(|session, mut cx| async move {
let error_message = match status.await {
Ok(status) => {
if status.success() {
log::info!("kernel process exited successfully");
return;
}
format!("kernel process exited with status: {:?}", status)
}
Err(err) => {
format!("kernel process exited with error: {:?}", err)
}
};
log::error!("{}", error_message);
session
.update(&mut cx, |session, cx| {
session.kernel(
Kernel::ErroredLaunch(error_message.clone()),
cx,
);
session.blocks.values().for_each(|block| {
block.execution_view.update(
cx,
|execution_view, cx| {
match execution_view.status {
ExecutionStatus::Finished => {
// Do nothing when the output was good
}
_ => {
// All other cases, set the status to errored
execution_view.status =
ExecutionStatus::KernelErrored(
error_message.clone(),
)
}
}
cx.notify();
},
);
});
cx.notify();
})
.ok();
});
session.process_status_task = Some(process_status_task);
session.messaging_task = Some(cx.spawn(|session, mut cx| async move {
while let Some(message) = messages_rx.next().await {
session
.update(&mut cx, |session, cx| {
session.route(&message, cx);
})
.ok();
}
}));
// todo!(@rgbkrk): send KernelInfoRequest once our shell channel read/writes are split
// cx.spawn(|this, mut cx| async move {
// cx.background_executor()
// .timer(Duration::from_millis(120))
// .await;
// this.update(&mut cx, |this, cx| {
// this.send(KernelInfoRequest {}.into(), cx).ok();
// })
// .ok();
// })
// .detach();
})
.ok();
}
Err(err) => {
this.update(&mut cx, |session, cx| {
session.kernel(Kernel::ErroredLaunch(err.to_string()), cx);
})
.ok();
}
}
})
.shared();
self.kernel(Kernel::StartingKernel(pending_kernel), cx);
cx.notify();
}
fn on_buffer_event(
&mut self,
buffer: Model<MultiBuffer>,
event: &multi_buffer::Event,
cx: &mut ViewContext<Self>,
) {
if let multi_buffer::Event::Edited { .. } = event {
let snapshot = buffer.read(cx).snapshot(cx);
let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
self.blocks.retain(|_id, block| {
if block.invalidation_anchor.is_valid(&snapshot) {
true
} else {
blocks_to_remove.insert(block.block_id);
false
}
});
if !blocks_to_remove.is_empty() {
self.editor
.update(cx, |editor, cx| {
editor.remove_blocks(blocks_to_remove, None, cx);
})
.ok();
cx.notify();
}
}
}
fn send(&mut self, message: JupyterMessage, _cx: &mut ViewContext<Self>) -> anyhow::Result<()> {
if let Kernel::RunningKernel(kernel) = &mut self.kernel {
kernel.request_tx.try_send(message).ok();
}
anyhow::Ok(())
}
pub fn clear_outputs(&mut self, cx: &mut ViewContext<Self>) {
let blocks_to_remove: HashSet<CustomBlockId> =
self.blocks.values().map(|block| block.block_id).collect();
self.editor
.update(cx, |editor, cx| {
editor.remove_blocks(blocks_to_remove, None, cx);
})
.ok();
self.blocks.clear();
}
pub fn execute(
&mut self,
code: String,
anchor_range: Range<Anchor>,
next_cell: Option<Anchor>,
move_down: bool,
cx: &mut ViewContext<Self>,
) {
let Some(editor) = self.editor.upgrade() else {
return;
};
if code.is_empty() {
return;
}
let execute_request = ExecuteRequest {
code,
..ExecuteRequest::default()
};
let message: JupyterMessage = execute_request.into();
let mut blocks_to_remove: HashSet<CustomBlockId> = HashSet::default();
let buffer = editor.read(cx).buffer().read(cx).snapshot(cx);
self.blocks.retain(|_key, block| {
if anchor_range.overlaps(&block.code_range, &buffer) {
blocks_to_remove.insert(block.block_id);
false
} else {
true
}
});
self.editor
.update(cx, |editor, cx| {
editor.remove_blocks(blocks_to_remove, None, cx);
})
.ok();
let status = match &self.kernel {
Kernel::Restarting => ExecutionStatus::Restarting,
Kernel::RunningKernel(_) => ExecutionStatus::Queued,
Kernel::StartingKernel(_) => ExecutionStatus::ConnectingToKernel,
Kernel::ErroredLaunch(error) => ExecutionStatus::KernelErrored(error.clone()),
Kernel::ShuttingDown => ExecutionStatus::ShuttingDown,
Kernel::Shutdown => ExecutionStatus::Shutdown,
};
let parent_message_id = message.header.msg_id.clone();
let session_view = cx.view().downgrade();
let weak_editor = self.editor.clone();
let on_close: CloseBlockFn =
Arc::new(move |block_id: CustomBlockId, cx: &mut WindowContext| {
if let Some(session) = session_view.upgrade() {
session.update(cx, |session, cx| {
session.blocks.remove(&parent_message_id);
cx.notify();
});
}
if let Some(editor) = weak_editor.upgrade() {
editor.update(cx, |editor, cx| {
let mut block_ids = HashSet::default();
block_ids.insert(block_id);
editor.remove_blocks(block_ids, None, cx);
});
}
});
let Ok(editor_block) =
EditorBlock::new(self.editor.clone(), anchor_range, status, on_close, cx)
else {
return;
};
let new_cursor_pos = if let Some(next_cursor) = next_cell {
next_cursor
} else {
editor_block.invalidation_anchor
};
self.blocks
.insert(message.header.msg_id.clone(), editor_block);
match &self.kernel {
Kernel::RunningKernel(_) => {
self.send(message, cx).ok();
}
Kernel::StartingKernel(task) => {
// Queue up the execution as a task to run after the kernel starts
let task = task.clone();
let message = message.clone();
cx.spawn(|this, mut cx| async move {
task.await;
this.update(&mut cx, |session, cx| {
session.send(message, cx).ok();
})
.ok();
})
.detach();
}
_ => {}
}
if move_down {
editor.update(cx, move |editor, cx| {
editor.change_selections(Some(Autoscroll::top_relative(8)), cx, |selections| {
selections.select_ranges([new_cursor_pos..new_cursor_pos]);
});
});
}
}
fn route(&mut self, message: &JupyterMessage, cx: &mut ViewContext<Self>) {
let parent_message_id = match message.parent_header.as_ref() {
Some(header) => &header.msg_id,
None => return,
};
match &message.content {
JupyterMessageContent::Status(status) => {
self.kernel.set_execution_state(&status.execution_state);
self.telemetry.report_repl_event(
self.kernel_specification.kernelspec.language.clone(),
KernelStatus::from(&self.kernel).to_string(),
cx.entity_id().to_string(),
);
cx.notify();
}
JupyterMessageContent::KernelInfoReply(reply) => {
self.kernel.set_kernel_info(reply);
cx.notify();
}
JupyterMessageContent::UpdateDisplayData(update) => {
let display_id = if let Some(display_id) = update.transient.display_id.clone() {
display_id
} else {
return;
};
self.blocks.iter_mut().for_each(|(_, block)| {
block.execution_view.update(cx, |execution_view, cx| {
execution_view.update_display_data(&update.data, &display_id, cx);
});
});
return;
}
_ => {}
}
if let Some(block) = self.blocks.get_mut(parent_message_id) {
block.handle_message(message, cx);
}
}
pub fn interrupt(&mut self, cx: &mut ViewContext<Self>) {
match &mut self.kernel {
Kernel::RunningKernel(_kernel) => {
self.send(InterruptRequest {}.into(), cx).ok();
}
Kernel::StartingKernel(_task) => {
// NOTE: If we switch to a literal queue instead of chaining on to the task, clear all queued executions
}
_ => {}
}
}
pub fn kernel(&mut self, kernel: Kernel, cx: &mut ViewContext<Self>) {
if let Kernel::Shutdown = kernel {
cx.emit(SessionEvent::Shutdown(self.editor.clone()));
}
let kernel_status = KernelStatus::from(&kernel).to_string();
let kernel_language = self.kernel_specification.kernelspec.language.clone();
self.telemetry.report_repl_event(
kernel_language,
kernel_status,
cx.entity_id().to_string(),
);
self.kernel = kernel;
}
pub fn shutdown(&mut self, cx: &mut ViewContext<Self>) {
let kernel = std::mem::replace(&mut self.kernel, Kernel::ShuttingDown);
match kernel {
Kernel::RunningKernel(mut kernel) => {
let mut request_tx = kernel.request_tx.clone();
cx.spawn(|this, mut cx| async move {
let message: JupyterMessage = ShutdownRequest { restart: false }.into();
request_tx.try_send(message).ok();
// Give the kernel a bit of time to clean up
cx.background_executor().timer(Duration::from_secs(3)).await;
this.update(&mut cx, |session, _cx| {
session.messaging_task.take();
session.process_status_task.take();
})
.ok();
kernel.process.kill().ok();
this.update(&mut cx, |session, cx| {
session.clear_outputs(cx);
session.kernel(Kernel::Shutdown, cx);
cx.notify();
})
.ok();
})
.detach();
}
_ => {
self.messaging_task.take();
self.process_status_task.take();
self.kernel(Kernel::Shutdown, cx);
}
}
cx.notify();
}
pub fn restart(&mut self, cx: &mut ViewContext<Self>) {
let kernel = std::mem::replace(&mut self.kernel, Kernel::Restarting);
match kernel {
Kernel::Restarting => {
// Do nothing if already restarting
}
Kernel::RunningKernel(mut kernel) => {
let mut request_tx = kernel.request_tx.clone();
cx.spawn(|this, mut cx| async move {
// Send shutdown request with restart flag
log::debug!("restarting kernel");
let message: JupyterMessage = ShutdownRequest { restart: true }.into();
request_tx.try_send(message).ok();
this.update(&mut cx, |session, _cx| {
session.messaging_task.take();
session.process_status_task.take();
})
.ok();
// Wait for kernel to shutdown
cx.background_executor().timer(Duration::from_secs(1)).await;
// Force kill the kernel if it hasn't shut down
kernel.process.kill().ok();
// Start a new kernel
this.update(&mut cx, |session, cx| {
// todo!(): Differentiate between restart and restart+clear-outputs
session.clear_outputs(cx);
session.start_kernel(cx);
})
.ok();
})
.detach();
}
_ => {
// If it's not already running, we can just clean up and start a new kernel
self.messaging_task.take();
self.process_status_task.take();
self.clear_outputs(cx);
self.start_kernel(cx);
}
}
cx.notify();
}
}
pub enum SessionEvent {
Shutdown(WeakView<Editor>),
}
impl EventEmitter<SessionEvent> for Session {}
impl Render for Session {
fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
let (status_text, interrupt_button) = match &self.kernel {
Kernel::RunningKernel(kernel) => (
kernel
.kernel_info
.as_ref()
.map(|info| info.language_info.name.clone()),
Some(
Button::new("interrupt", "Interrupt")
.style(ButtonStyle::Subtle)
.on_click(cx.listener(move |session, _, cx| {
session.interrupt(cx);
})),
),
),
Kernel::StartingKernel(_) => (Some("Starting".into()), None),
Kernel::ErroredLaunch(err) => (Some(format!("Error: {err}")), None),
Kernel::ShuttingDown => (Some("Shutting Down".into()), None),
Kernel::Shutdown => (Some("Shutdown".into()), None),
Kernel::Restarting => (Some("Restarting".into()), None),
};
KernelListItem::new(self.kernel_specification.clone())
.status_color(match &self.kernel {
Kernel::RunningKernel(kernel) => match kernel.execution_state {
ExecutionState::Idle => Color::Success,
ExecutionState::Busy => Color::Modified,
},
Kernel::StartingKernel(_) => Color::Modified,
Kernel::ErroredLaunch(_) => Color::Error,
Kernel::ShuttingDown => Color::Modified,
Kernel::Shutdown => Color::Disabled,
Kernel::Restarting => Color::Modified,
})
.child(Label::new(self.kernel_specification.name.clone()))
.children(status_text.map(|status_text| Label::new(format!("({status_text})"))))
.button(
Button::new("shutdown", "Shutdown")
.style(ButtonStyle::Subtle)
.disabled(self.kernel.is_shutting_down())
.on_click(cx.listener(move |session, _, cx| {
session.shutdown(cx);
})),
)
.buttons(interrupt_button)
}
}