Fix terminal memory leak by deduping alacritty events on background thread (#23593)

Closes #23008

Release Notes:

- Fixed case where the terminal can leak memory when it produces events
at a faster rate than could be processed.
This commit is contained in:
Michael Sloan 2025-01-24 03:25:03 -07:00 committed by GitHub
parent dd8ee76b2e
commit 813bbecd5c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 134 additions and 66 deletions

View file

@ -16,6 +16,7 @@
//! //!
use alacritty_terminal::{ use alacritty_terminal::{
event::VoidListener,
grid::Dimensions as _, grid::Dimensions as _,
index::{Column, Line, Point}, index::{Column, Line, Point},
term::Config, term::Config,
@ -24,8 +25,6 @@ use alacritty_terminal::{
use gpui::{canvas, size, ClipboardItem, FontStyle, Model, TextStyle, WhiteSpace}; use gpui::{canvas, size, ClipboardItem, FontStyle, Model, TextStyle, WhiteSpace};
use language::Buffer; use language::Buffer;
use settings::Settings as _; use settings::Settings as _;
use std::mem;
use terminal::ZedListener;
use terminal_view::terminal_element::TerminalElement; use terminal_view::terminal_element::TerminalElement;
use theme::ThemeSettings; use theme::ThemeSettings;
use ui::{prelude::*, IntoElement}; use ui::{prelude::*, IntoElement};
@ -50,7 +49,7 @@ pub struct TerminalOutput {
/// ANSI escape sequence processor for parsing input text. /// ANSI escape sequence processor for parsing input text.
parser: Processor, parser: Processor,
/// Alacritty terminal instance that manages the terminal state and content. /// Alacritty terminal instance that manages the terminal state and content.
handler: alacritty_terminal::Term<ZedListener>, handler: alacritty_terminal::Term<VoidListener>,
} }
const DEFAULT_NUM_LINES: usize = 32; const DEFAULT_NUM_LINES: usize = 32;
@ -124,14 +123,9 @@ impl TerminalOutput {
/// and sets up the necessary components for handling terminal events and rendering. /// and sets up the necessary components for handling terminal events and rendering.
/// ///
pub fn new(cx: &mut WindowContext) -> Self { pub fn new(cx: &mut WindowContext) -> Self {
let (events_tx, events_rx) = futures::channel::mpsc::unbounded(); let term =
let term = alacritty_terminal::Term::new( alacritty_terminal::Term::new(Config::default(), &terminal_size(cx), VoidListener);
Config::default(),
&terminal_size(cx),
terminal::ZedListener(events_tx.clone()),
);
mem::forget(events_rx);
Self { Self {
parser: Processor::new(), parser: Processor::new(),
handler: term, handler: term,

View file

@ -28,7 +28,7 @@ use anyhow::{bail, Result};
use futures::{ use futures::{
channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}, channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
FutureExt, FutureExt, SinkExt,
}; };
use mappings::mouse::{ use mappings::mouse::{
@ -53,7 +53,7 @@ use std::{
ops::{Deref, Index, RangeInclusive}, ops::{Deref, Index, RangeInclusive},
path::PathBuf, path::PathBuf,
sync::Arc, sync::Arc,
time::Duration, time::{Duration, Instant},
}; };
use thiserror::Error; use thiserror::Error;
@ -482,61 +482,59 @@ impl TerminalBuilder {
}) })
} }
pub fn subscribe(mut self, cx: &ModelContext<Terminal>) -> Terminal { pub fn subscribe(self, cx: &ModelContext<Terminal>) -> Terminal {
//Event loop // Accumulate the effects of events on a background thread in order to keep up with the
cx.spawn(|terminal, mut cx| async move { // events from alacritty even when it is emitting events rapidly.
while let Some(event) = self.events_rx.next().await { let (mut accumulated_events_tx, mut accumulated_events_rx) = unbounded();
terminal.update(&mut cx, |terminal, cx| { let mut events_rx = self.events_rx;
//Process the first event immediately for lowered latency let background_executor = cx.background_executor().clone();
terminal.process_event(&event, cx); cx.background_executor()
})?; .spawn(async move {
while let Some(event) = events_rx.next().await {
'outer: loop { // Process the first event immediately to reduce latency
let mut events = Vec::new(); accumulated_events_tx
let mut timer = cx .feed(EventOrAccumulator::Event(event))
.background_executor() .await?;
.timer(Duration::from_millis(4)) 'outer: loop {
.fuse(); let start_time = Instant::now();
let mut wakeup = false; let mut timer = background_executor.timer(Duration::from_millis(4)).fuse();
loop { let mut event_accumulator = EventAccumulator::new();
futures::select_biased! { loop {
_ = timer => break, futures::select_biased! {
event = self.events_rx.next() => { // Events are no longer coming in at a high rate, so go back to just
if let Some(event) = event { // awaiting the next event.
if matches!(event, AlacTermEvent::Wakeup) { _ = timer => break 'outer,
wakeup = true; event = events_rx.next() => {
} else { let Some(event) = event else {
events.push(event); break;
} };
event_accumulator.add(event);
if events.len() > 100 { if event_accumulator.events.len() > 100 {
break; break;
} }
} else { let elapsed = Instant::now().duration_since(start_time);
break; if elapsed > Duration::from_millis(20) {
} break;
}, }
},
}
} }
accumulated_events_tx
.feed(EventOrAccumulator::Accumulator(event_accumulator))
.await?;
} }
if events.is_empty() && !wakeup {
smol::future::yield_now().await;
break 'outer;
}
terminal.update(&mut cx, |this, cx| {
if wakeup {
this.process_event(&AlacTermEvent::Wakeup, cx);
}
for event in events {
this.process_event(&event, cx);
}
})?;
smol::future::yield_now().await;
} }
} anyhow::Ok(())
})
.detach();
// On the foreground thread, process the accumulated effects of events.
cx.spawn(|terminal, mut cx| async move {
while let Some(event_or_accumulator) = accumulated_events_rx.next().await {
terminal.update(&mut cx, |terminal, cx| {
event_or_accumulator.process_events(terminal, cx)
})?;
}
anyhow::Ok(()) anyhow::Ok(())
}) })
.detach(); .detach();
@ -545,6 +543,83 @@ impl TerminalBuilder {
} }
} }
enum EventOrAccumulator {
Event(AlacTermEvent),
Accumulator(EventAccumulator),
}
impl EventOrAccumulator {
fn process_events(self, terminal: &mut Terminal, cx: &mut ModelContext<Terminal>) {
match self {
EventOrAccumulator::Event(event) => terminal.process_event(event, cx),
EventOrAccumulator::Accumulator(accumulator) => {
accumulator.process_events(terminal, cx)
}
}
}
}
struct EventAccumulator {
wakeup: bool,
cursor_blinking_changed: bool,
bell: bool,
title: Option<String>,
/// Events that can't be deduplicated.
events: Vec<AlacTermEvent>,
}
impl EventAccumulator {
fn new() -> Self {
EventAccumulator {
wakeup: false,
cursor_blinking_changed: false,
bell: false,
title: None,
events: Vec::new(),
}
}
fn add(&mut self, event: AlacTermEvent) {
match event {
// Events that can have their effects deduplicated.
AlacTermEvent::Title(title) => self.title = Some(title),
AlacTermEvent::ResetTitle => self.title = Some(String::new()),
AlacTermEvent::CursorBlinkingChange => self.cursor_blinking_changed = true,
AlacTermEvent::Wakeup => self.wakeup = true,
AlacTermEvent::Bell => self.bell = true,
// Events that have handlers involving writing text to the terminal or interacting with
// clipboard, and so must be kept in order.
AlacTermEvent::ClipboardStore(_, _) => self.events.push(event),
AlacTermEvent::ClipboardLoad(_, _) => self.events.push(event),
AlacTermEvent::PtyWrite(_) => self.events.push(event),
AlacTermEvent::TextAreaSizeRequest(_) => self.events.push(event),
AlacTermEvent::ColorRequest(_, _) => self.events.push(event),
AlacTermEvent::Exit => self.events.push(event),
AlacTermEvent::ChildExit(_) => self.events.push(event),
// Handled in render so no need to handle here.
AlacTermEvent::MouseCursorDirty => {}
}
}
fn process_events(self, terminal: &mut Terminal, cx: &mut ModelContext<Terminal>) {
if self.wakeup {
terminal.process_event(AlacTermEvent::Wakeup, cx);
}
if self.cursor_blinking_changed {
terminal.process_event(AlacTermEvent::CursorBlinkingChange, cx);
}
if self.bell {
terminal.process_event(AlacTermEvent::Bell, cx);
}
if let Some(title) = self.title {
terminal.process_event(AlacTermEvent::Title(title), cx);
}
for event in self.events {
terminal.process_event(event, cx);
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]
pub struct IndexedCell { pub struct IndexedCell {
pub point: AlacPoint, pub point: AlacPoint,
@ -674,7 +749,7 @@ impl TaskStatus {
} }
impl Terminal { impl Terminal {
fn process_event(&mut self, event: &AlacTermEvent, cx: &mut ModelContext<Self>) { fn process_event(&mut self, event: AlacTermEvent, cx: &mut ModelContext<Self>) {
match event { match event {
AlacTermEvent::Title(title) => { AlacTermEvent::Title(title) => {
self.breadcrumb_text = title.to_string(); self.breadcrumb_text = title.to_string();
@ -728,13 +803,12 @@ impl Terminal {
// Instead of locking, we could store the colors in `self.last_content`. But then // Instead of locking, we could store the colors in `self.last_content`. But then
// we might respond with out of date value if a "set color" sequence is immediately // we might respond with out of date value if a "set color" sequence is immediately
// followed by a color request sequence. // followed by a color request sequence.
let color = self.term.lock().colors()[*index].unwrap_or_else(|| { let color = self.term.lock().colors()[index]
to_alac_rgb(get_color_at_index(*index, cx.theme().as_ref())) .unwrap_or_else(|| to_alac_rgb(get_color_at_index(index, cx.theme().as_ref())));
});
self.write_to_pty(format(color)); self.write_to_pty(format(color));
} }
AlacTermEvent::ChildExit(error_code) => { AlacTermEvent::ChildExit(error_code) => {
self.register_task_finished(Some(*error_code), cx); self.register_task_finished(Some(error_code), cx);
} }
} }
} }