Extract an Fs trait in Worktree

This commit is contained in:
Antonio Scandurra 2021-07-09 10:58:07 +02:00
parent 60ef74a18f
commit 4dae17a4cf
3 changed files with 120 additions and 73 deletions

1
Cargo.lock generated
View file

@ -4457,6 +4457,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"arrayvec", "arrayvec",
"async-trait",
"async-tungstenite", "async-tungstenite",
"cargo-bundle", "cargo-bundle",
"crossbeam-channel", "crossbeam-channel",

View file

@ -19,6 +19,7 @@ test-support = ["tempdir", "serde_json", "zed-rpc/test-support"]
[dependencies] [dependencies]
anyhow = "1.0.38" anyhow = "1.0.38"
arrayvec = "0.5.2" arrayvec = "0.5.2"
async-trait = "0.1"
async-tungstenite = { version="0.14", features=["async-tls"] } async-tungstenite = { version="0.14", features=["async-tls"] }
crossbeam-channel = "0.5.0" crossbeam-channel = "0.5.0"
ctor = "0.1.20" ctor = "0.1.20"

View file

@ -63,6 +63,84 @@ pub fn init(cx: &mut MutableAppContext, rpc: rpc::Client) {
rpc.on_message(remote::save_buffer, cx); rpc.on_message(remote::save_buffer, cx);
} }
#[async_trait::async_trait]
trait Fs: Send + Sync {
async fn entry(
&self,
root_char_bag: CharBag,
next_entry_id: &AtomicUsize,
path: Arc<Path>,
abs_path: &Path,
) -> Result<Option<Entry>>;
async fn load(&self, path: &Path) -> Result<String>;
async fn save(&self, path: &Path, text: &Rope) -> Result<()>;
}
struct OsFs;
#[async_trait::async_trait]
impl Fs for OsFs {
async fn entry(
&self,
root_char_bag: CharBag,
next_entry_id: &AtomicUsize,
path: Arc<Path>,
abs_path: &Path,
) -> Result<Option<Entry>> {
let metadata = match smol::fs::metadata(&abs_path).await {
Err(err) => {
return match (err.kind(), err.raw_os_error()) {
(io::ErrorKind::NotFound, _) => Ok(None),
(io::ErrorKind::Other, Some(libc::ENOTDIR)) => Ok(None),
_ => Err(anyhow::Error::new(err)),
}
}
Ok(metadata) => metadata,
};
let inode = metadata.ino();
let mtime = metadata.modified()?;
let is_symlink = smol::fs::symlink_metadata(&abs_path)
.await
.context("failed to read symlink metadata")?
.file_type()
.is_symlink();
let entry = Entry {
id: next_entry_id.fetch_add(1, SeqCst),
kind: if metadata.file_type().is_dir() {
EntryKind::PendingDir
} else {
EntryKind::File(char_bag_for_path(root_char_bag, &path))
},
path: Arc::from(path),
inode,
mtime,
is_symlink,
is_ignored: false,
};
Ok(Some(entry))
}
async fn load(&self, path: &Path) -> Result<String> {
let mut file = smol::fs::File::open(path).await?;
let mut text = String::new();
file.read_to_string(&mut text).await?;
Ok(text)
}
async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
let buffer_size = text.summary().bytes.min(10 * 1024);
let file = smol::fs::File::create(path).await?;
let mut writer = smol::io::BufWriter::with_capacity(buffer_size, file);
for chunk in text.chunks() {
writer.write_all(chunk.as_bytes()).await?;
}
writer.flush().await?;
Ok(())
}
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
enum ScanState { enum ScanState {
Idle, Idle,
@ -106,7 +184,20 @@ impl Worktree {
languages: Arc<LanguageRegistry>, languages: Arc<LanguageRegistry>,
cx: &mut ModelContext<Worktree>, cx: &mut ModelContext<Worktree>,
) -> Self { ) -> Self {
Worktree::Local(LocalWorktree::new(path, languages, cx)) let fs = Arc::new(OsFs);
let (mut tree, scan_states_tx) = LocalWorktree::new(path, languages, fs, cx);
let (event_stream, event_stream_handle) = fsevent::EventStream::new(
&[tree.snapshot.abs_path.as_ref()],
Duration::from_millis(100),
);
let background_snapshot = tree.background_snapshot.clone();
let id = tree.id;
std::thread::spawn(move || {
let scanner = BackgroundScanner::new(background_snapshot, scan_states_tx, id);
scanner.run(event_stream);
});
tree._event_stream_handle = Some(event_stream_handle);
Worktree::Local(tree)
} }
pub async fn open_remote( pub async fn open_remote(
@ -527,21 +618,23 @@ pub struct LocalWorktree {
background_snapshot: Arc<Mutex<Snapshot>>, background_snapshot: Arc<Mutex<Snapshot>>,
snapshots_to_send_tx: Option<Sender<Snapshot>>, snapshots_to_send_tx: Option<Sender<Snapshot>>,
last_scan_state_rx: watch::Receiver<ScanState>, last_scan_state_rx: watch::Receiver<ScanState>,
_event_stream_handle: fsevent::Handle, _event_stream_handle: Option<fsevent::Handle>,
poll_scheduled: bool, poll_scheduled: bool,
rpc: Option<(rpc::Client, u64)>, rpc: Option<(rpc::Client, u64)>,
open_buffers: HashMap<usize, WeakModelHandle<Buffer>>, open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>, shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
peers: HashMap<PeerId, ReplicaId>, peers: HashMap<PeerId, ReplicaId>,
languages: Arc<LanguageRegistry>, languages: Arc<LanguageRegistry>,
fs: Arc<dyn Fs>,
} }
impl LocalWorktree { impl LocalWorktree {
fn new( fn new(
path: impl Into<Arc<Path>>, path: impl Into<Arc<Path>>,
languages: Arc<LanguageRegistry>, languages: Arc<LanguageRegistry>,
fs: Arc<dyn Fs>,
cx: &mut ModelContext<Worktree>, cx: &mut ModelContext<Worktree>,
) -> Self { ) -> (Self, Sender<ScanState>) {
let abs_path = path.into(); let abs_path = path.into();
let (scan_states_tx, scan_states_rx) = smol::channel::unbounded(); let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning); let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
@ -558,30 +651,22 @@ impl LocalWorktree {
removed_entry_ids: Default::default(), removed_entry_ids: Default::default(),
next_entry_id: Default::default(), next_entry_id: Default::default(),
}; };
let (event_stream, event_stream_handle) =
fsevent::EventStream::new(&[snapshot.abs_path.as_ref()], Duration::from_millis(100));
let background_snapshot = Arc::new(Mutex::new(snapshot.clone()));
let tree = Self { let tree = Self {
snapshot, snapshot: snapshot.clone(),
background_snapshot: background_snapshot.clone(), background_snapshot: Arc::new(Mutex::new(snapshot)),
snapshots_to_send_tx: None, snapshots_to_send_tx: None,
last_scan_state_rx, last_scan_state_rx,
_event_stream_handle: event_stream_handle, _event_stream_handle: None,
poll_scheduled: false, poll_scheduled: false,
open_buffers: Default::default(), open_buffers: Default::default(),
shared_buffers: Default::default(), shared_buffers: Default::default(),
peers: Default::default(), peers: Default::default(),
rpc: None, rpc: None,
languages, languages,
fs,
}; };
std::thread::spawn(move || {
let scanner = BackgroundScanner::new(background_snapshot, scan_states_tx, id);
scanner.run(event_stream)
});
cx.spawn_weak(|this, mut cx| async move { cx.spawn_weak(|this, mut cx| async move {
while let Ok(scan_state) = scan_states_rx.recv().await { while let Ok(scan_state) = scan_states_rx.recv().await {
if let Some(handle) = cx.read(|cx| this.upgrade(&cx)) { if let Some(handle) = cx.read(|cx| this.upgrade(&cx)) {
@ -606,7 +691,7 @@ impl LocalWorktree {
}) })
.detach(); .detach();
tree (tree, scan_states_tx)
} }
pub fn open_buffer( pub fn open_buffer(
@ -769,12 +854,11 @@ impl LocalWorktree {
let path = Arc::from(path); let path = Arc::from(path);
let abs_path = self.absolutize(&path); let abs_path = self.absolutize(&path);
let background_snapshot = self.background_snapshot.clone(); let background_snapshot = self.background_snapshot.clone();
let fs = self.fs.clone();
cx.spawn(|this, mut cx| async move { cx.spawn(|this, mut cx| async move {
let mut file = smol::fs::File::open(&abs_path).await?; let text = fs.load(&abs_path).await?;
let mut text = String::new();
file.read_to_string(&mut text).await?;
// Eagerly populate the snapshot with an updated entry for the loaded file // Eagerly populate the snapshot with an updated entry for the loaded file
let entry = refresh_entry(&background_snapshot, path, &abs_path)?; let entry = refresh_entry(fs.as_ref(), &background_snapshot, path, &abs_path).await?;
this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
Ok((File::new(entry.id, handle, entry.path, entry.mtime), text)) Ok((File::new(entry.id, handle, entry.path, entry.mtime), text))
}) })
@ -809,16 +893,10 @@ impl LocalWorktree {
let path = path.into(); let path = path.into();
let abs_path = self.absolutize(&path); let abs_path = self.absolutize(&path);
let background_snapshot = self.background_snapshot.clone(); let background_snapshot = self.background_snapshot.clone();
let fs = self.fs.clone();
let save = cx.background().spawn(async move { let save = cx.background().spawn(async move {
let buffer_size = text.summary().bytes.min(10 * 1024); fs.save(&abs_path, &text).await?;
let file = smol::fs::File::create(&abs_path).await?; refresh_entry(fs.as_ref(), &background_snapshot, path.clone(), &abs_path).await
let mut writer = smol::io::BufWriter::with_capacity(buffer_size, file);
for chunk in text.chunks() {
writer.write_all(chunk.as_bytes()).await?;
}
writer.flush().await?;
refresh_entry(&background_snapshot, path.clone(), &abs_path)
}); });
cx.spawn(|this, mut cx| async move { cx.spawn(|this, mut cx| async move {
@ -1969,12 +2047,12 @@ impl BackgroundScanner {
} }
}; };
match fs_entry_for_path( match smol::block_on(OsFs.entry(
snapshot.root_char_bag, snapshot.root_char_bag,
&next_entry_id, &next_entry_id,
path.clone(), path.clone(),
&event.path, &event.path,
) { )) {
Ok(Some(mut fs_entry)) => { Ok(Some(mut fs_entry)) => {
let is_dir = fs_entry.is_dir(); let is_dir = fs_entry.is_dir();
let ignore_stack = snapshot.ignore_stack_for_path(&path, is_dir); let ignore_stack = snapshot.ignore_stack_for_path(&path, is_dir);
@ -2110,7 +2188,12 @@ impl BackgroundScanner {
} }
} }
fn refresh_entry(snapshot: &Mutex<Snapshot>, path: Arc<Path>, abs_path: &Path) -> Result<Entry> { async fn refresh_entry(
fs: &dyn Fs,
snapshot: &Mutex<Snapshot>,
path: Arc<Path>,
abs_path: &Path,
) -> Result<Entry> {
let root_char_bag; let root_char_bag;
let next_entry_id; let next_entry_id;
{ {
@ -2118,51 +2201,13 @@ fn refresh_entry(snapshot: &Mutex<Snapshot>, path: Arc<Path>, abs_path: &Path) -
root_char_bag = snapshot.root_char_bag; root_char_bag = snapshot.root_char_bag;
next_entry_id = snapshot.next_entry_id.clone(); next_entry_id = snapshot.next_entry_id.clone();
} }
let entry = fs_entry_for_path(root_char_bag, &next_entry_id, path, abs_path)? let entry = fs
.entry(root_char_bag, &next_entry_id, path, abs_path)
.await?
.ok_or_else(|| anyhow!("could not read saved file metadata"))?; .ok_or_else(|| anyhow!("could not read saved file metadata"))?;
Ok(snapshot.lock().insert_entry(entry)) Ok(snapshot.lock().insert_entry(entry))
} }
fn fs_entry_for_path(
root_char_bag: CharBag,
next_entry_id: &AtomicUsize,
path: Arc<Path>,
abs_path: &Path,
) -> Result<Option<Entry>> {
let metadata = match fs::metadata(&abs_path) {
Err(err) => {
return match (err.kind(), err.raw_os_error()) {
(io::ErrorKind::NotFound, _) => Ok(None),
(io::ErrorKind::Other, Some(libc::ENOTDIR)) => Ok(None),
_ => Err(anyhow::Error::new(err)),
}
}
Ok(metadata) => metadata,
};
let inode = metadata.ino();
let mtime = metadata.modified()?;
let is_symlink = fs::symlink_metadata(&abs_path)
.context("failed to read symlink metadata")?
.file_type()
.is_symlink();
let entry = Entry {
id: next_entry_id.fetch_add(1, SeqCst),
kind: if metadata.file_type().is_dir() {
EntryKind::PendingDir
} else {
EntryKind::File(char_bag_for_path(root_char_bag, &path))
},
path: Arc::from(path),
inode,
mtime,
is_symlink,
is_ignored: false,
};
Ok(Some(entry))
}
fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag { fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
let mut result = root_char_bag; let mut result = root_char_bag;
result.extend( result.extend(