Stream buffer ops in the background when creating buffer for peers

This commit is contained in:
Antonio Scandurra 2022-08-23 16:05:56 +02:00
parent ec48ffc9da
commit 954695f5fe
6 changed files with 209 additions and 98 deletions

View file

@ -123,6 +123,7 @@ pub struct Project {
loading_local_worktrees:
HashMap<Arc<Path>, Shared<Task<Result<ModelHandle<Worktree>, Arc<anyhow::Error>>>>>,
opened_buffers: HashMap<u64, OpenBuffer>,
incomplete_buffers: HashMap<u64, ModelHandle<Buffer>>,
buffer_snapshots: HashMap<u64, Vec<(i32, TextBufferSnapshot)>>,
nonce: u128,
initialized_persistent_state: bool,
@ -144,7 +145,7 @@ pub enum JoinProjectError {
enum OpenBuffer {
Strong(ModelHandle<Buffer>),
Weak(WeakModelHandle<Buffer>),
Loading(Vec<Operation>),
Operations(Vec<Operation>),
}
enum WorktreeHandle {
@ -461,6 +462,7 @@ impl Project {
collaborators: Default::default(),
opened_buffers: Default::default(),
shared_buffers: Default::default(),
incomplete_buffers: Default::default(),
loading_buffers: Default::default(),
loading_local_worktrees: Default::default(),
buffer_snapshots: Default::default(),
@ -550,6 +552,7 @@ impl Project {
loading_buffers: Default::default(),
opened_buffer: watch::channel(),
shared_buffers: Default::default(),
incomplete_buffers: Default::default(),
loading_local_worktrees: Default::default(),
active_entry: None,
collaborators: Default::default(),
@ -1331,7 +1334,7 @@ impl Project {
*open_buffer = OpenBuffer::Strong(buffer);
}
}
OpenBuffer::Loading(_) => unreachable!(),
OpenBuffer::Operations(_) => unreachable!(),
}
}
@ -1456,6 +1459,10 @@ impl Project {
}
cx.emit(Event::DisconnectedFromHost);
cx.notify();
// Wake up all futures currently waiting on a buffer to get opened,
// to give them a chance to fail now that we've disconnected.
*self.opened_buffer.0.borrow_mut() = ();
}
}
@ -1757,7 +1764,7 @@ impl Project {
match self.opened_buffers.insert(remote_id, open_buffer) {
None => {}
Some(OpenBuffer::Loading(operations)) => {
Some(OpenBuffer::Operations(operations)) => {
buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?
}
Some(OpenBuffer::Weak(existing_handle)) => {
@ -5107,7 +5114,7 @@ impl Project {
OpenBuffer::Strong(buffer) => {
buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
}
OpenBuffer::Loading(operations) => operations.extend_from_slice(&ops),
OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
OpenBuffer::Weak(_) => {}
},
hash_map::Entry::Vacant(e) => {
@ -5116,7 +5123,7 @@ impl Project {
"received buffer update from {:?}",
envelope.original_sender_id
);
e.insert(OpenBuffer::Loading(ops));
e.insert(OpenBuffer::Operations(ops));
}
}
Ok(())
@ -5130,24 +5137,52 @@ impl Project {
mut cx: AsyncAppContext,
) -> Result<()> {
this.update(&mut cx, |this, cx| {
let mut buffer = envelope
match envelope
.payload
.buffer
.ok_or_else(|| anyhow!("invalid buffer"))?;
let mut buffer_file = None;
if let Some(file) = buffer.file.take() {
let worktree_id = WorktreeId::from_proto(file.worktree_id);
let worktree = this
.worktree_for_id(worktree_id, cx)
.ok_or_else(|| anyhow!("no worktree found for id {}", file.worktree_id))?;
buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
as Arc<dyn language::File>);
}
.variant
.ok_or_else(|| anyhow!("missing variant"))?
{
proto::create_buffer_for_peer::Variant::State(mut state) => {
let mut buffer_file = None;
if let Some(file) = state.file.take() {
let worktree_id = WorktreeId::from_proto(file.worktree_id);
let worktree = this.worktree_for_id(worktree_id, cx).ok_or_else(|| {
anyhow!("no worktree found for id {}", file.worktree_id)
})?;
buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
as Arc<dyn language::File>);
}
let buffer = cx.add_model(|cx| {
Buffer::from_proto(this.replica_id(), buffer, buffer_file, cx).unwrap()
});
this.register_buffer(&buffer, cx)?;
let buffer_id = state.id;
let buffer = cx.add_model(|_| {
Buffer::from_proto(this.replica_id(), state, buffer_file).unwrap()
});
this.incomplete_buffers.insert(buffer_id, buffer);
}
proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
let buffer = this
.incomplete_buffers
.get(&chunk.buffer_id)
.ok_or_else(|| {
anyhow!(
"received chunk for buffer {} without initial state",
chunk.buffer_id
)
})?
.clone();
let operations = chunk
.operations
.into_iter()
.map(language::proto::deserialize_operation)
.collect::<Result<Vec<_>>>()?;
buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
if chunk.is_last {
this.incomplete_buffers.remove(&chunk.buffer_id);
this.register_buffer(&buffer, cx)?;
}
}
}
Ok(())
})
@ -5658,13 +5693,50 @@ impl Project {
if let Some(project_id) = self.remote_id() {
let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
if shared_buffers.insert(buffer_id) {
self.client
.send(proto::CreateBufferForPeer {
project_id,
peer_id: peer_id.0,
buffer: Some(buffer.read(cx).to_proto()),
})
.log_err();
let (state, mut operations) = buffer.read(cx).to_proto();
let client = self.client.clone();
cx.background()
.spawn(
async move {
client.send(proto::CreateBufferForPeer {
project_id,
peer_id: peer_id.0,
variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
})?;
loop {
#[cfg(any(test, feature = "test-support"))]
const CHUNK_SIZE: usize = 5;
#[cfg(not(any(test, feature = "test-support")))]
const CHUNK_SIZE: usize = 100;
let chunk = operations
.drain(..cmp::min(CHUNK_SIZE, operations.len()))
.collect();
let is_last = operations.is_empty();
client.send(proto::CreateBufferForPeer {
project_id,
peer_id: peer_id.0,
variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
proto::BufferChunk {
buffer_id,
operations: chunk,
is_last,
},
)),
})?;
if is_last {
break;
}
}
Ok(())
}
.log_err(),
)
.detach();
}
}
@ -5686,7 +5758,10 @@ impl Project {
});
if let Some(buffer) = buffer {
break buffer;
} else if this.read_with(&cx, |this, _| this.is_read_only()) {
return Err(anyhow!("disconnected before buffer {} could be opened", id));
}
opened_buffer_rx
.next()
.await
@ -6026,7 +6101,7 @@ impl OpenBuffer {
match self {
OpenBuffer::Strong(handle) => Some(handle.clone()),
OpenBuffer::Weak(handle) => handle.upgrade(cx),
OpenBuffer::Loading(_) => None,
OpenBuffer::Operations(_) => None,
}
}
}