Compare commits

...
Sign in to create a new pull request.

1 commit

Author SHA1 Message Date
Mikayla Maki
ef86176bd3 WIP: Add a linux implementation of the fsevents API 2024-03-05 15:04:54 -08:00
8 changed files with 156 additions and 85 deletions

2
Cargo.lock generated
View file

@ -3982,7 +3982,6 @@ dependencies = [
"lazy_static",
"libc",
"log",
"notify",
"parking_lot 0.11.2",
"rope",
"serde",
@ -4014,6 +4013,7 @@ version = "0.1.0"
dependencies = [
"bitflags 2.4.2",
"fsevent-sys 3.1.0",
"notify",
"parking_lot 0.11.2",
"tempfile",
]

View file

@ -37,9 +37,6 @@ time.workspace = true
gpui = { workspace = true, optional = true }
[target.'cfg(not(target_os = "macos"))'.dependencies]
notify = "6.1.1"
[target.'cfg(target_os = "windows")'.dependencies]
windows-sys = { version = "0.52", features = [
"Win32_Foundation",

View file

@ -2,15 +2,8 @@ pub mod repository;
use anyhow::{anyhow, Result};
pub use fsevent::Event;
#[cfg(target_os = "macos")]
use fsevent::EventStream;
#[cfg(not(target_os = "macos"))]
use fsevent::StreamFlags;
#[cfg(not(target_os = "macos"))]
use notify::{Config, EventKind, Watcher};
#[cfg(unix)]
use std::os::unix::fs::MetadataExt;
@ -309,7 +302,6 @@ impl Fs for RealFs {
Ok(Box::pin(result))
}
#[cfg(target_os = "macos")]
async fn watch(
&self,
path: &Path,
@ -317,64 +309,13 @@ impl Fs for RealFs {
) -> Pin<Box<dyn Send + Stream<Item = Vec<Event>>>> {
let (tx, rx) = smol::channel::unbounded();
let (stream, handle) = EventStream::new(&[path], latency);
std::thread::spawn(move || {
stream.run(move |events| smol::block_on(tx.send(events)).is_ok());
});
Box::pin(rx.chain(futures::stream::once(async move {
drop(handle);
vec![]
})))
}
#[cfg(not(target_os = "macos"))]
async fn watch(
&self,
path: &Path,
latency: Duration,
) -> Pin<Box<dyn Send + Stream<Item = Vec<Event>>>> {
let (tx, rx) = smol::channel::unbounded();
if !path.exists() {
log::error!("watch path does not exist: {}", path.display());
return Box::pin(rx);
}
let mut watcher =
notify::recommended_watcher(move |res: Result<notify::Event, _>| match res {
Ok(event) => {
let flags = match event.kind {
// ITEM_REMOVED is currently the only flag we care about
EventKind::Remove(_) => StreamFlags::ITEM_REMOVED,
_ => StreamFlags::NONE,
};
let events = event
.paths
.into_iter()
.map(|path| Event {
event_id: 0,
flags,
path,
})
.collect::<Vec<_>>();
let _ = tx.try_send(events);
}
Err(err) => {
log::error!("watch error: {}", err);
}
})
.unwrap();
watcher
.configure(Config::default().with_poll_interval(latency))
.unwrap();
watcher
.watch(path, notify::RecursiveMode::Recursive)
.unwrap();
Box::pin(rx)
}
fn open_repo(&self, dotgit_path: &Path) -> Option<Arc<Mutex<dyn GitRepository>>> {
LibGitRepository::open(dotgit_path)
.log_err()

View file

@ -19,6 +19,9 @@ parking_lot.workspace = true
[target.'cfg(target_os = "macos")'.dependencies]
fsevent-sys = "3.0.2"
[target.'cfg(target_os = "linux")'.dependencies]
notify = "6.1.1"
[dev-dependencies]
tempfile.workspace = true

View file

@ -1,12 +1,20 @@
#[cfg(target_os = "macos")]
pub use mac_impl::*;
mod mac;
#[cfg(target_os = "macos")]
pub use mac::*;
#[cfg(target_os = "linux")]
mod linux;
#[cfg(target_os = "linux")]
pub use linux::*;
use bitflags::bitflags;
use std::path::PathBuf;
#[cfg(target_os = "macos")]
mod mac_impl;
#[derive(Clone, Debug)]
pub struct Event {
pub event_id: u64,

120
crates/fsevent/src/linux.rs Normal file
View file

@ -0,0 +1,120 @@
use std::{
path::{Path, PathBuf},
sync::{Arc, OnceLock},
time::Duration,
};
use notify::{Config, RecommendedWatcher, Watcher};
use parking_lot::Mutex;
use crate::{Event, StreamFlags};
pub struct EventStream {
watcher: Arc<Mutex<RecommendedWatcher>>,
paths: Vec<PathBuf>,
event_fn: Arc<OnceLock<Box<dyn Fn(Vec<Event>) -> bool + 'static + Send + Sync>>>,
}
pub struct Handle(Arc<Mutex<RecommendedWatcher>>);
impl EventStream {
pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) {
let event_fn: Arc<OnceLock<Box<dyn Fn(Vec<Event>) -> bool + 'static + Send + Sync>>> =
Arc::new(OnceLock::new());
let paths: Vec<_> = paths.iter().map(|path| path.to_path_buf()).collect();
let mut watcher = notify::recommended_watcher({
let event_fn = event_fn.clone();
let paths = paths.clone();
move |res: Result<notify::Event, _>| {
if let Ok(event) = res {
let flags = StreamFlags::empty();
// match event.kind {
// EventKind::Access(access) => match access {
// notify::event::AccessKind::Any => todo!(),
// notify::event::AccessKind::Read => todo!(),
// notify::event::AccessKind::Open(_) => todo!(),
// notify::event::AccessKind::Close(_) => todo!(),
// notify::event::AccessKind::Other => todo!(),
// },
// EventKind::Create(create) => match create {
// notify::event::CreateKind::Any => todo!(),
// notify::event::CreateKind::File => todo!(),
// notify::event::CreateKind::Folder => todo!(),
// notify::event::CreateKind::Other => todo!(),
// },
// EventKind::Modify(modify) => match modify {
// notify::event::ModifyKind::Any => todo!(),
// notify::event::ModifyKind::Data(_) => todo!(),
// notify::event::ModifyKind::Metadata(_) => todo!(),
// notify::event::ModifyKind::Name(_) => todo!(),
// notify::event::ModifyKind::Other => todo!(),
// },
// EventKind::Remove(remove) => match remove {
// notify::event::RemoveKind::Any => todo!(),
// notify::event::RemoveKind::File => todo!(),
// notify::event::RemoveKind::Folder => todo!(),
// notify::event::RemoveKind::Other => todo!(),
// },
// EventKind::Other => todo!(),
// EventKind::Any => todo!(),
// };
let events = event
.paths
.iter()
.filter(|evt_path| {
paths
.iter()
.any(|requested_path| evt_path.starts_with(requested_path))
})
.map(|path| Event {
event_id: 0,
flags,
path: path.to_path_buf(),
})
.collect::<Vec<_>>();
if !events.is_empty() {
event_fn
.get()
.expect("Watcher cannot produce events until paths are provided")(
events,
);
}
}
}
})
.expect("Failed to watch requested path");
watcher
.configure(Config::default().with_poll_interval(latency))
.expect("Failed to watch requested path");
let watcher = Arc::new(Mutex::new(watcher));
(
Self {
watcher: watcher.clone(),
event_fn: event_fn.clone(),
paths,
},
Handle(watcher),
)
}
pub fn run(self, f: impl Fn(Vec<Event>) -> bool + 'static + Send + Sync) {
self.event_fn.get_or_init(|| Box::new(f));
let mut watcher = self.watcher.lock();
for path in self.paths {
watcher
.watch(
dbg!(path.parent().unwrap_or(&path)),
notify::RecursiveMode::Recursive,
)
.expect("Failed to watch requested path");
}
}
}

View file

@ -108,8 +108,9 @@ impl EventStream {
pub fn run<F>(mut self, f: F)
where
F: FnMut(Vec<Event>) -> bool + 'static,
F: Fn(Vec<Event>) -> bool + 'static + Send + Sync,
{
std::thread::spawn(move || {
self.state.callback = Some(Box::new(f));
unsafe {
let run_loop = cf::CFRunLoopGetCurrent();
@ -129,6 +130,7 @@ impl EventStream {
fs::FSEventStreamStart(self.state.stream);
cf::CFRunLoopRun();
}
});
}
extern "C" fn trampoline(

0
crates/zed/src/languages Normal file
View file