channel projects (#8456)

Add plumbing for hosted projects. This will currently show them if they
exist
but provides no UX to create/rename/delete them.

Also changed the `ChannelId` type to not auto-cast to u64; this avoids
type
confusion if you have multiple id types.


Release Notes:

- N/A
This commit is contained in:
Conrad Irwin 2024-02-26 22:15:11 -07:00 committed by GitHub
parent 8cf36ae603
commit c31626717f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
37 changed files with 446 additions and 144 deletions

View file

@ -11,7 +11,7 @@ pub use channel_chat::{
mentions_to_proto, ChannelChat, ChannelChatEvent, ChannelMessage, ChannelMessageId,
MessageParams,
};
pub use channel_store::{Channel, ChannelEvent, ChannelId, ChannelMembership, ChannelStore};
pub use channel_store::{Channel, ChannelEvent, ChannelMembership, ChannelStore, HostedProjectId};
#[cfg(test)]
mod channel_store_tests;

View file

@ -1,6 +1,6 @@
use crate::{Channel, ChannelId, ChannelStore};
use crate::{Channel, ChannelStore};
use anyhow::Result;
use client::{Client, Collaborator, UserStore, ZED_ALWAYS_ACTIVE};
use client::{ChannelId, Client, Collaborator, UserStore, ZED_ALWAYS_ACTIVE};
use collections::HashMap;
use gpui::{AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task};
use language::proto::serialize_version;
@ -51,7 +51,7 @@ impl ChannelBuffer {
) -> Result<Model<Self>> {
let response = client
.request(proto::JoinChannelBuffer {
channel_id: channel.id,
channel_id: channel.id.0,
})
.await?;
let buffer_id = BufferId::new(response.buffer_id)?;
@ -68,7 +68,7 @@ impl ChannelBuffer {
})?;
buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))??;
let subscription = client.subscribe_to_entity(channel.id)?;
let subscription = client.subscribe_to_entity(channel.id.0)?;
anyhow::Ok(cx.new_model(|cx| {
cx.subscribe(&buffer, Self::on_buffer_update).detach();
@ -97,7 +97,7 @@ impl ChannelBuffer {
}
self.client
.send(proto::LeaveChannelBuffer {
channel_id: self.channel_id,
channel_id: self.channel_id.0,
})
.log_err();
}
@ -191,7 +191,7 @@ impl ChannelBuffer {
let operation = language::proto::serialize_operation(operation);
self.client
.send(proto::UpdateChannelBuffer {
channel_id: self.channel_id,
channel_id: self.channel_id.0,
operations: vec![operation],
})
.log_err();

View file

@ -1,9 +1,9 @@
use crate::{Channel, ChannelId, ChannelStore};
use crate::{Channel, ChannelStore};
use anyhow::{anyhow, Result};
use client::{
proto,
user::{User, UserStore},
Client, Subscription, TypedEnvelope, UserId,
ChannelId, Client, Subscription, TypedEnvelope, UserId,
};
use collections::HashSet;
use futures::lock::Mutex;
@ -104,10 +104,12 @@ impl ChannelChat {
mut cx: AsyncAppContext,
) -> Result<Model<Self>> {
let channel_id = channel.id;
let subscription = client.subscribe_to_entity(channel_id).unwrap();
let subscription = client.subscribe_to_entity(channel_id.0).unwrap();
let response = client
.request(proto::JoinChannelChat { channel_id })
.request(proto::JoinChannelChat {
channel_id: channel_id.0,
})
.await?;
let handle = cx.new_model(|cx| {
@ -143,7 +145,7 @@ impl ChannelChat {
fn release(&mut self, _: &mut AppContext) {
self.rpc
.send(proto::LeaveChannelChat {
channel_id: self.channel_id,
channel_id: self.channel_id.0,
})
.log_err();
}
@ -200,7 +202,7 @@ impl ChannelChat {
Ok(cx.spawn(move |this, mut cx| async move {
let outgoing_message_guard = outgoing_messages_lock.lock().await;
let request = rpc.request(proto::SendChannelMessage {
channel_id,
channel_id: channel_id.0,
body: message.text,
nonce: Some(nonce.into()),
mentions: mentions_to_proto(&message.mentions),
@ -220,7 +222,7 @@ impl ChannelChat {
pub fn remove_message(&mut self, id: u64, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
let response = self.rpc.request(proto::RemoveChannelMessage {
channel_id: self.channel_id,
channel_id: self.channel_id.0,
message_id: id,
});
cx.spawn(move |this, mut cx| async move {
@ -245,7 +247,7 @@ impl ChannelChat {
async move {
let response = rpc
.request(proto::GetChannelMessages {
channel_id,
channel_id: channel_id.0,
before_message_id,
})
.await?;
@ -323,7 +325,7 @@ impl ChannelChat {
{
self.rpc
.send(proto::AckChannelMessage {
channel_id: self.channel_id,
channel_id: self.channel_id.0,
message_id: latest_message_id,
})
.ok();
@ -401,7 +403,11 @@ impl ChannelChat {
let channel_id = self.channel_id;
cx.spawn(move |this, mut cx| {
async move {
let response = rpc.request(proto::JoinChannelChat { channel_id }).await?;
let response = rpc
.request(proto::JoinChannelChat {
channel_id: channel_id.0,
})
.await?;
Self::handle_loaded_messages(
this.clone(),
user_store.clone(),
@ -418,7 +424,7 @@ impl ChannelChat {
for pending_message in pending_messages {
let request = rpc.request(proto::SendChannelMessage {
channel_id,
channel_id: channel_id.0,
body: pending_message.body,
mentions: mentions_to_proto(&pending_message.mentions),
nonce: Some(pending_message.nonce.into()),
@ -461,7 +467,7 @@ impl ChannelChat {
if self.acknowledged_message_ids.insert(id) {
self.rpc
.send(proto::AckChannelMessage {
channel_id: self.channel_id,
channel_id: self.channel_id.0,
message_id: id,
})
.ok();

View file

@ -3,7 +3,7 @@ mod channel_index;
use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
use anyhow::{anyhow, Result};
use channel_index::ChannelIndex;
use client::{Client, Subscription, User, UserId, UserStore};
use client::{ChannelId, Client, Subscription, User, UserId, UserStore};
use collections::{hash_map, HashMap, HashSet};
use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
use gpui::{
@ -19,15 +19,16 @@ use rpc::{
use std::{mem, sync::Arc, time::Duration};
use util::{async_maybe, maybe, ResultExt};
pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
let channel_store =
cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
cx.set_global(GlobalChannelStore(channel_store));
}
pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
pub type ChannelId = u64;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
pub struct HostedProjectId(pub u64);
#[derive(Debug, Clone, Default)]
struct NotesVersion {
@ -35,11 +36,31 @@ struct NotesVersion {
version: clock::Global,
}
#[derive(Debug, Clone)]
pub struct HostedProject {
id: HostedProjectId,
channel_id: ChannelId,
name: SharedString,
_visibility: proto::ChannelVisibility,
}
impl From<proto::HostedProject> for HostedProject {
fn from(project: proto::HostedProject) -> Self {
Self {
id: HostedProjectId(project.id),
channel_id: ChannelId(project.channel_id),
_visibility: project.visibility(),
name: project.name.into(),
}
}
}
pub struct ChannelStore {
pub channel_index: ChannelIndex,
channel_invitations: Vec<Arc<Channel>>,
channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
channel_states: HashMap<ChannelId, ChannelState>,
hosted_projects: HashMap<HostedProjectId, HostedProject>,
outgoing_invites: HashSet<(ChannelId, UserId)>,
update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
@ -58,7 +79,7 @@ pub struct Channel {
pub id: ChannelId,
pub name: SharedString,
pub visibility: proto::ChannelVisibility,
pub parent_path: Vec<u64>,
pub parent_path: Vec<ChannelId>,
}
#[derive(Default)]
@ -68,6 +89,7 @@ pub struct ChannelState {
observed_chat_message: Option<u64>,
observed_notes_versions: Option<NotesVersion>,
role: Option<ChannelRole>,
projects: HashSet<HostedProjectId>,
}
impl Channel {
@ -92,10 +114,7 @@ impl Channel {
}
pub fn root_id(&self) -> ChannelId {
self.parent_path
.first()
.map(|id| *id as ChannelId)
.unwrap_or(self.id)
self.parent_path.first().copied().unwrap_or(self.id)
}
pub fn slug(str: &str) -> String {
@ -199,6 +218,7 @@ impl ChannelStore {
channel_invitations: Vec::default(),
channel_index: ChannelIndex::default(),
channel_participants: Default::default(),
hosted_projects: Default::default(),
outgoing_invites: Default::default(),
opened_buffers: Default::default(),
opened_chats: Default::default(),
@ -285,6 +305,19 @@ impl ChannelStore {
self.channel_index.by_id().get(&channel_id)
}
pub fn projects_for_id(&self, channel_id: ChannelId) -> Vec<(SharedString, HostedProjectId)> {
let mut projects: Vec<(SharedString, HostedProjectId)> = self
.channel_states
.get(&channel_id)
.map(|state| state.projects.clone())
.unwrap_or_default()
.into_iter()
.flat_map(|id| Some((self.hosted_projects.get(&id)?.name.clone(), id)))
.collect();
projects.sort();
projects
}
pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
if let Some(buffer) = self.opened_buffers.get(&channel_id) {
if let OpenedModelHandle::Open(buffer) = buffer {
@ -562,13 +595,16 @@ impl ChannelStore {
let name = name.trim_start_matches("#").to_owned();
cx.spawn(move |this, mut cx| async move {
let response = client
.request(proto::CreateChannel { name, parent_id })
.request(proto::CreateChannel {
name,
parent_id: parent_id.map(|cid| cid.0),
})
.await?;
let channel = response
.channel
.ok_or_else(|| anyhow!("missing channel in response"))?;
let channel_id = channel.id;
let channel_id = ChannelId(channel.id);
this.update(&mut cx, |this, cx| {
let task = this.update_channels(
@ -600,7 +636,10 @@ impl ChannelStore {
let client = self.client.clone();
cx.spawn(move |_, _| async move {
let _ = client
.request(proto::MoveChannel { channel_id, to })
.request(proto::MoveChannel {
channel_id: channel_id.0,
to: to.0,
})
.await?;
Ok(())
@ -617,7 +656,7 @@ impl ChannelStore {
cx.spawn(move |_, _| async move {
let _ = client
.request(proto::SetChannelVisibility {
channel_id,
channel_id: channel_id.0,
visibility: visibility.into(),
})
.await?;
@ -642,7 +681,7 @@ impl ChannelStore {
cx.spawn(move |this, mut cx| async move {
let result = client
.request(proto::InviteChannelMember {
channel_id,
channel_id: channel_id.0,
user_id,
role: role.into(),
})
@ -674,7 +713,7 @@ impl ChannelStore {
cx.spawn(move |this, mut cx| async move {
let result = client
.request(proto::RemoveChannelMember {
channel_id,
channel_id: channel_id.0,
user_id,
})
.await;
@ -704,7 +743,7 @@ impl ChannelStore {
cx.spawn(move |this, mut cx| async move {
let result = client
.request(proto::SetChannelMemberRole {
channel_id,
channel_id: channel_id.0,
user_id,
role: role.into(),
})
@ -730,7 +769,10 @@ impl ChannelStore {
let name = new_name.to_string();
cx.spawn(move |this, mut cx| async move {
let channel = client
.request(proto::RenameChannel { channel_id, name })
.request(proto::RenameChannel {
channel_id: channel_id.0,
name,
})
.await?
.channel
.ok_or_else(|| anyhow!("missing channel in response"))?;
@ -763,7 +805,10 @@ impl ChannelStore {
let client = self.client.clone();
cx.background_executor().spawn(async move {
client
.request(proto::RespondToChannelInvite { channel_id, accept })
.request(proto::RespondToChannelInvite {
channel_id: channel_id.0,
accept,
})
.await?;
Ok(())
})
@ -778,7 +823,9 @@ impl ChannelStore {
let user_store = self.user_store.downgrade();
cx.spawn(move |_, mut cx| async move {
let response = client
.request(proto::GetChannelMembers { channel_id })
.request(proto::GetChannelMembers {
channel_id: channel_id.0,
})
.await?;
let user_ids = response.members.iter().map(|m| m.user_id).collect();
@ -806,7 +853,11 @@ impl ChannelStore {
pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
let client = self.client.clone();
async move {
client.request(proto::DeleteChannel { channel_id }).await?;
client
.request(proto::DeleteChannel {
channel_id: channel_id.0,
})
.await?;
Ok(())
}
}
@ -843,19 +894,23 @@ impl ChannelStore {
for buffer_version in message.payload.observed_channel_buffer_version {
let version = language::proto::deserialize_version(&buffer_version.version);
this.acknowledge_notes_version(
buffer_version.channel_id,
ChannelId(buffer_version.channel_id),
buffer_version.epoch,
&version,
cx,
);
}
for message_id in message.payload.observed_channel_message_id {
this.acknowledge_message_id(message_id.channel_id, message_id.message_id, cx);
this.acknowledge_message_id(
ChannelId(message_id.channel_id),
message_id.message_id,
cx,
);
}
for membership in message.payload.channel_memberships {
if let Some(role) = ChannelRole::from_i32(membership.role) {
this.channel_states
.entry(membership.channel_id)
.entry(ChannelId(membership.channel_id))
.or_insert_with(|| ChannelState::default())
.set_role(role)
}
@ -888,7 +943,7 @@ impl ChannelStore {
let channel_buffer = buffer.read(cx);
let buffer = channel_buffer.buffer().read(cx);
buffer_versions.push(proto::ChannelBufferVersion {
channel_id: channel_buffer.channel_id,
channel_id: channel_buffer.channel_id.0,
epoch: channel_buffer.epoch(),
version: language::proto::serialize_version(&buffer.version()),
});
@ -919,7 +974,7 @@ impl ChannelStore {
if let Some(remote_buffer) = response
.buffers
.iter_mut()
.find(|buffer| buffer.channel_id == channel_id)
.find(|buffer| buffer.channel_id == channel_id.0)
{
let channel_id = channel_buffer.channel_id;
let remote_version =
@ -955,7 +1010,7 @@ impl ChannelStore {
{
client
.send(proto::UpdateChannelBuffer {
channel_id,
channel_id: channel_id.0,
operations: chunk,
})
.ok();
@ -1010,12 +1065,12 @@ impl ChannelStore {
) -> Option<Task<Result<()>>> {
if !payload.remove_channel_invitations.is_empty() {
self.channel_invitations
.retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
.retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
}
for channel in payload.channel_invitations {
match self
.channel_invitations
.binary_search_by_key(&channel.id, |c| c.id)
.binary_search_by_key(&channel.id, |c| c.id.0)
{
Ok(ix) => {
Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
@ -1023,10 +1078,14 @@ impl ChannelStore {
Err(ix) => self.channel_invitations.insert(
ix,
Arc::new(Channel {
id: channel.id,
id: ChannelId(channel.id),
visibility: channel.visibility(),
name: channel.name.into(),
parent_path: channel.parent_path,
parent_path: channel
.parent_path
.into_iter()
.map(|cid| ChannelId(cid))
.collect(),
}),
),
}
@ -1035,20 +1094,27 @@ impl ChannelStore {
let channels_changed = !payload.channels.is_empty()
|| !payload.delete_channels.is_empty()
|| !payload.latest_channel_message_ids.is_empty()
|| !payload.latest_channel_buffer_versions.is_empty();
|| !payload.latest_channel_buffer_versions.is_empty()
|| !payload.hosted_projects.is_empty()
|| !payload.deleted_hosted_projects.is_empty();
if channels_changed {
if !payload.delete_channels.is_empty() {
self.channel_index.delete_channels(&payload.delete_channels);
let delete_channels: Vec<ChannelId> = payload
.delete_channels
.into_iter()
.map(|cid| ChannelId(cid))
.collect();
self.channel_index.delete_channels(&delete_channels);
self.channel_participants
.retain(|channel_id, _| !&payload.delete_channels.contains(channel_id));
.retain(|channel_id, _| !delete_channels.contains(&channel_id));
for channel_id in &payload.delete_channels {
for channel_id in &delete_channels {
let channel_id = *channel_id;
if payload
.channels
.iter()
.any(|channel| channel.id == channel_id)
.any(|channel| channel.id == channel_id.0)
{
continue;
}
@ -1064,7 +1130,7 @@ impl ChannelStore {
let mut index = self.channel_index.bulk_insert();
for channel in payload.channels {
let id = channel.id;
let id = ChannelId(channel.id);
let channel_changed = index.insert(channel);
if channel_changed {
@ -1079,17 +1145,45 @@ impl ChannelStore {
for latest_buffer_version in payload.latest_channel_buffer_versions {
let version = language::proto::deserialize_version(&latest_buffer_version.version);
self.channel_states
.entry(latest_buffer_version.channel_id)
.entry(ChannelId(latest_buffer_version.channel_id))
.or_default()
.update_latest_notes_version(latest_buffer_version.epoch, &version)
}
for latest_channel_message in payload.latest_channel_message_ids {
self.channel_states
.entry(latest_channel_message.channel_id)
.entry(ChannelId(latest_channel_message.channel_id))
.or_default()
.update_latest_message_id(latest_channel_message.message_id);
}
for hosted_project in payload.hosted_projects {
let hosted_project: HostedProject = hosted_project.into();
if let Some(old_project) = self
.hosted_projects
.insert(hosted_project.id, hosted_project.clone())
{
self.channel_states
.entry(old_project.channel_id)
.or_default()
.remove_hosted_project(old_project.id);
}
self.channel_states
.entry(hosted_project.channel_id)
.or_default()
.add_hosted_project(hosted_project.id);
}
for hosted_project_id in payload.deleted_hosted_projects {
let hosted_project_id = HostedProjectId(hosted_project_id);
if let Some(old_project) = self.hosted_projects.remove(&hosted_project_id) {
self.channel_states
.entry(old_project.channel_id)
.or_default()
.remove_hosted_project(old_project.id);
}
}
}
cx.notify();
@ -1129,7 +1223,7 @@ impl ChannelStore {
participants.sort_by_key(|u| u.id);
this.channel_participants
.insert(entry.channel_id, participants);
.insert(ChannelId(entry.channel_id), participants);
}
cx.notify();
@ -1207,4 +1301,12 @@ impl ChannelState {
version: version.clone(),
});
}
fn add_hosted_project(&mut self, project_id: HostedProjectId) {
self.projects.insert(project_id);
}
fn remove_hosted_project(&mut self, project_id: HostedProjectId) {
self.projects.remove(&project_id);
}
}

View file

@ -1,4 +1,5 @@
use crate::{Channel, ChannelId};
use crate::Channel;
use client::ChannelId;
use collections::BTreeMap;
use rpc::proto;
use std::sync::Arc;
@ -50,27 +51,32 @@ pub struct ChannelPathsInsertGuard<'a> {
impl<'a> ChannelPathsInsertGuard<'a> {
pub fn insert(&mut self, channel_proto: proto::Channel) -> bool {
let mut ret = false;
if let Some(existing_channel) = self.channels_by_id.get_mut(&channel_proto.id) {
let parent_path = channel_proto
.parent_path
.iter()
.map(|cid| ChannelId(*cid))
.collect();
if let Some(existing_channel) = self.channels_by_id.get_mut(&ChannelId(channel_proto.id)) {
let existing_channel = Arc::make_mut(existing_channel);
ret = existing_channel.visibility != channel_proto.visibility()
|| existing_channel.name != channel_proto.name
|| existing_channel.parent_path != channel_proto.parent_path;
|| existing_channel.parent_path != parent_path;
existing_channel.visibility = channel_proto.visibility();
existing_channel.name = channel_proto.name.into();
existing_channel.parent_path = channel_proto.parent_path.into();
existing_channel.parent_path = parent_path;
} else {
self.channels_by_id.insert(
channel_proto.id,
ChannelId(channel_proto.id),
Arc::new(Channel {
id: channel_proto.id,
id: ChannelId(channel_proto.id),
visibility: channel_proto.visibility(),
name: channel_proto.name.into(),
parent_path: channel_proto.parent_path,
parent_path,
}),
);
self.insert_root(channel_proto.id);
self.insert_root(ChannelId(channel_proto.id));
}
ret
}
@ -94,7 +100,7 @@ impl<'a> Drop for ChannelPathsInsertGuard<'a> {
fn channel_path_sorting_key<'a>(
id: ChannelId,
channels_by_id: &'a BTreeMap<ChannelId, Arc<Channel>>,
) -> impl Iterator<Item = (&str, u64)> {
) -> impl Iterator<Item = (&str, ChannelId)> {
let (parent_path, name) = channels_by_id
.get(&id)
.map_or((&[] as &[_], None), |channel| {