Remove logic for multiple channel parents
Co-authored-by: Conrad <conrad@zed.dev> Co-authored-by: Kyle <kyle@zed.dev> Co-authored-by: Joseph <joseph@zed.dev>
This commit is contained in:
parent
cc9e92857b
commit
5c03b6a610
23 changed files with 772 additions and 2069 deletions
|
@ -16,7 +16,8 @@ impl Database {
|
|||
connection: ConnectionId,
|
||||
) -> Result<proto::JoinChannelBufferResponse> {
|
||||
self.transaction(|tx| async move {
|
||||
self.check_user_is_channel_participant(channel_id, user_id, &tx)
|
||||
let channel = self.get_channel_internal(channel_id, &*tx).await?;
|
||||
self.check_user_is_channel_participant(&channel, user_id, &tx)
|
||||
.await?;
|
||||
|
||||
let buffer = channel::Model {
|
||||
|
@ -129,9 +130,11 @@ impl Database {
|
|||
self.transaction(|tx| async move {
|
||||
let mut results = Vec::new();
|
||||
for client_buffer in buffers {
|
||||
let channel_id = ChannelId::from_proto(client_buffer.channel_id);
|
||||
let channel = self
|
||||
.get_channel_internal(ChannelId::from_proto(client_buffer.channel_id), &*tx)
|
||||
.await?;
|
||||
if self
|
||||
.check_user_is_channel_participant(channel_id, user_id, &*tx)
|
||||
.check_user_is_channel_participant(&channel, user_id, &*tx)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
|
@ -139,9 +142,9 @@ impl Database {
|
|||
continue;
|
||||
}
|
||||
|
||||
let buffer = self.get_channel_buffer(channel_id, &*tx).await?;
|
||||
let buffer = self.get_channel_buffer(channel.id, &*tx).await?;
|
||||
let mut collaborators = channel_buffer_collaborator::Entity::find()
|
||||
.filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
|
||||
.filter(channel_buffer_collaborator::Column::ChannelId.eq(channel.id))
|
||||
.all(&*tx)
|
||||
.await?;
|
||||
|
||||
|
@ -439,7 +442,8 @@ impl Database {
|
|||
Vec<proto::VectorClockEntry>,
|
||||
)> {
|
||||
self.transaction(move |tx| async move {
|
||||
self.check_user_is_channel_member(channel_id, user, &*tx)
|
||||
let channel = self.get_channel_internal(channel_id, &*tx).await?;
|
||||
self.check_user_is_channel_member(&channel, user, &*tx)
|
||||
.await?;
|
||||
|
||||
let buffer = buffer::Entity::find()
|
||||
|
@ -482,7 +486,7 @@ impl Database {
|
|||
)
|
||||
.await?;
|
||||
|
||||
channel_members = self.get_channel_participants(channel_id, &*tx).await?;
|
||||
channel_members = self.get_channel_participants(&channel, &*tx).await?;
|
||||
let collaborators = self
|
||||
.get_channel_buffer_collaborators_internal(channel_id, &*tx)
|
||||
.await?;
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -1,5 +1,4 @@
|
|||
use super::*;
|
||||
use futures::Stream;
|
||||
use rpc::Notification;
|
||||
use sea_orm::TryInsertResult;
|
||||
use time::OffsetDateTime;
|
||||
|
@ -12,7 +11,8 @@ impl Database {
|
|||
user_id: UserId,
|
||||
) -> Result<()> {
|
||||
self.transaction(|tx| async move {
|
||||
self.check_user_is_channel_participant(channel_id, user_id, &*tx)
|
||||
let channel = self.get_channel_internal(channel_id, &*tx).await?;
|
||||
self.check_user_is_channel_participant(&channel, user_id, &*tx)
|
||||
.await?;
|
||||
channel_chat_participant::ActiveModel {
|
||||
id: ActiveValue::NotSet,
|
||||
|
@ -80,7 +80,8 @@ impl Database {
|
|||
before_message_id: Option<MessageId>,
|
||||
) -> Result<Vec<proto::ChannelMessage>> {
|
||||
self.transaction(|tx| async move {
|
||||
self.check_user_is_channel_participant(channel_id, user_id, &*tx)
|
||||
let channel = self.get_channel_internal(channel_id, &*tx).await?;
|
||||
self.check_user_is_channel_participant(&channel, user_id, &*tx)
|
||||
.await?;
|
||||
|
||||
let mut condition =
|
||||
|
@ -94,7 +95,7 @@ impl Database {
|
|||
.filter(condition)
|
||||
.order_by_desc(channel_message::Column::Id)
|
||||
.limit(count as u64)
|
||||
.stream(&*tx)
|
||||
.all(&*tx)
|
||||
.await?;
|
||||
|
||||
self.load_channel_messages(rows, &*tx).await
|
||||
|
@ -111,27 +112,23 @@ impl Database {
|
|||
let rows = channel_message::Entity::find()
|
||||
.filter(channel_message::Column::Id.is_in(message_ids.iter().copied()))
|
||||
.order_by_desc(channel_message::Column::Id)
|
||||
.stream(&*tx)
|
||||
.all(&*tx)
|
||||
.await?;
|
||||
|
||||
let mut channel_ids = HashSet::<ChannelId>::default();
|
||||
let messages = self
|
||||
.load_channel_messages(
|
||||
rows.map(|row| {
|
||||
row.map(|row| {
|
||||
channel_ids.insert(row.channel_id);
|
||||
row
|
||||
})
|
||||
}),
|
||||
&*tx,
|
||||
)
|
||||
.await?;
|
||||
let mut channels = HashMap::<ChannelId, channel::Model>::default();
|
||||
for row in &rows {
|
||||
channels.insert(
|
||||
row.channel_id,
|
||||
self.get_channel_internal(row.channel_id, &*tx).await?,
|
||||
);
|
||||
}
|
||||
|
||||
for channel_id in channel_ids {
|
||||
self.check_user_is_channel_member(channel_id, user_id, &*tx)
|
||||
for (_, channel) in channels {
|
||||
self.check_user_is_channel_participant(&channel, user_id, &*tx)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let messages = self.load_channel_messages(rows, &*tx).await?;
|
||||
Ok(messages)
|
||||
})
|
||||
.await
|
||||
|
@ -139,26 +136,26 @@ impl Database {
|
|||
|
||||
async fn load_channel_messages(
|
||||
&self,
|
||||
mut rows: impl Send + Unpin + Stream<Item = Result<channel_message::Model, sea_orm::DbErr>>,
|
||||
rows: Vec<channel_message::Model>,
|
||||
tx: &DatabaseTransaction,
|
||||
) -> Result<Vec<proto::ChannelMessage>> {
|
||||
let mut messages = Vec::new();
|
||||
while let Some(row) = rows.next().await {
|
||||
let row = row?;
|
||||
let nonce = row.nonce.as_u64_pair();
|
||||
messages.push(proto::ChannelMessage {
|
||||
id: row.id.to_proto(),
|
||||
sender_id: row.sender_id.to_proto(),
|
||||
body: row.body,
|
||||
timestamp: row.sent_at.assume_utc().unix_timestamp() as u64,
|
||||
mentions: vec![],
|
||||
nonce: Some(proto::Nonce {
|
||||
upper_half: nonce.0,
|
||||
lower_half: nonce.1,
|
||||
}),
|
||||
});
|
||||
}
|
||||
drop(rows);
|
||||
let mut messages = rows
|
||||
.into_iter()
|
||||
.map(|row| {
|
||||
let nonce = row.nonce.as_u64_pair();
|
||||
proto::ChannelMessage {
|
||||
id: row.id.to_proto(),
|
||||
sender_id: row.sender_id.to_proto(),
|
||||
body: row.body,
|
||||
timestamp: row.sent_at.assume_utc().unix_timestamp() as u64,
|
||||
mentions: vec![],
|
||||
nonce: Some(proto::Nonce {
|
||||
upper_half: nonce.0,
|
||||
lower_half: nonce.1,
|
||||
}),
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
messages.reverse();
|
||||
|
||||
let mut mentions = channel_message_mention::Entity::find()
|
||||
|
@ -203,7 +200,8 @@ impl Database {
|
|||
nonce: u128,
|
||||
) -> Result<CreatedChannelMessage> {
|
||||
self.transaction(|tx| async move {
|
||||
self.check_user_is_channel_participant(channel_id, user_id, &*tx)
|
||||
let channel = self.get_channel_internal(channel_id, &*tx).await?;
|
||||
self.check_user_is_channel_participant(&channel, user_id, &*tx)
|
||||
.await?;
|
||||
|
||||
let mut rows = channel_chat_participant::Entity::find()
|
||||
|
@ -310,7 +308,7 @@ impl Database {
|
|||
}
|
||||
}
|
||||
|
||||
let mut channel_members = self.get_channel_participants(channel_id, &*tx).await?;
|
||||
let mut channel_members = self.get_channel_participants(&channel, &*tx).await?;
|
||||
channel_members.retain(|member| !participant_user_ids.contains(member));
|
||||
|
||||
Ok(CreatedChannelMessage {
|
||||
|
@ -483,8 +481,9 @@ impl Database {
|
|||
.await?;
|
||||
|
||||
if result.rows_affected == 0 {
|
||||
let channel = self.get_channel_internal(channel_id, &*tx).await?;
|
||||
if self
|
||||
.check_user_is_channel_admin(channel_id, user_id, &*tx)
|
||||
.check_user_is_channel_admin(&channel, user_id, &*tx)
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
|
|
|
@ -50,10 +50,10 @@ impl Database {
|
|||
.map(|participant| participant.user_id),
|
||||
);
|
||||
|
||||
let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
|
||||
let (channel, room) = self.get_channel_room(room_id, &tx).await?;
|
||||
let channel_members;
|
||||
if let Some(channel_id) = channel_id {
|
||||
channel_members = self.get_channel_participants(channel_id, &tx).await?;
|
||||
if let Some(channel) = &channel {
|
||||
channel_members = self.get_channel_participants(channel, &tx).await?;
|
||||
} else {
|
||||
channel_members = Vec::new();
|
||||
|
||||
|
@ -69,7 +69,7 @@ impl Database {
|
|||
|
||||
Ok(RefreshedRoom {
|
||||
room,
|
||||
channel_id,
|
||||
channel_id: channel.map(|channel| channel.id),
|
||||
channel_members,
|
||||
stale_participant_user_ids,
|
||||
canceled_calls_to_user_ids,
|
||||
|
@ -381,7 +381,6 @@ impl Database {
|
|||
|
||||
pub(crate) async fn join_channel_room_internal(
|
||||
&self,
|
||||
channel_id: ChannelId,
|
||||
room_id: RoomId,
|
||||
user_id: UserId,
|
||||
connection: ConnectionId,
|
||||
|
@ -420,11 +419,12 @@ impl Database {
|
|||
.exec(&*tx)
|
||||
.await?;
|
||||
|
||||
let room = self.get_room(room_id, &tx).await?;
|
||||
let channel_members = self.get_channel_participants(channel_id, &tx).await?;
|
||||
let (channel, room) = self.get_channel_room(room_id, &tx).await?;
|
||||
let channel = channel.ok_or_else(|| anyhow!("no channel for room"))?;
|
||||
let channel_members = self.get_channel_participants(&channel, &*tx).await?;
|
||||
Ok(JoinRoom {
|
||||
room,
|
||||
channel_id: Some(channel_id),
|
||||
channel_id: Some(channel.id),
|
||||
channel_members,
|
||||
})
|
||||
}
|
||||
|
@ -718,16 +718,16 @@ impl Database {
|
|||
});
|
||||
}
|
||||
|
||||
let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
|
||||
let channel_members = if let Some(channel_id) = channel_id {
|
||||
self.get_channel_participants(channel_id, &tx).await?
|
||||
let (channel, room) = self.get_channel_room(room_id, &tx).await?;
|
||||
let channel_members = if let Some(channel) = &channel {
|
||||
self.get_channel_participants(&channel, &tx).await?
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
Ok(RejoinedRoom {
|
||||
room,
|
||||
channel_id,
|
||||
channel_id: channel.map(|channel| channel.id),
|
||||
channel_members,
|
||||
rejoined_projects,
|
||||
reshared_projects,
|
||||
|
@ -869,7 +869,7 @@ impl Database {
|
|||
.exec(&*tx)
|
||||
.await?;
|
||||
|
||||
let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
|
||||
let (channel, room) = self.get_channel_room(room_id, &tx).await?;
|
||||
let deleted = if room.participants.is_empty() {
|
||||
let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
|
||||
result.rows_affected > 0
|
||||
|
@ -877,14 +877,14 @@ impl Database {
|
|||
false
|
||||
};
|
||||
|
||||
let channel_members = if let Some(channel_id) = channel_id {
|
||||
self.get_channel_participants(channel_id, &tx).await?
|
||||
let channel_members = if let Some(channel) = &channel {
|
||||
self.get_channel_participants(channel, &tx).await?
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
let left_room = LeftRoom {
|
||||
room,
|
||||
channel_id,
|
||||
channel_id: channel.map(|channel| channel.id),
|
||||
channel_members,
|
||||
left_projects,
|
||||
canceled_calls_to_user_ids,
|
||||
|
@ -1072,7 +1072,7 @@ impl Database {
|
|||
&self,
|
||||
room_id: RoomId,
|
||||
tx: &DatabaseTransaction,
|
||||
) -> Result<(Option<ChannelId>, proto::Room)> {
|
||||
) -> Result<(Option<channel::Model>, proto::Room)> {
|
||||
let db_room = room::Entity::find_by_id(room_id)
|
||||
.one(tx)
|
||||
.await?
|
||||
|
@ -1181,9 +1181,16 @@ impl Database {
|
|||
project_id: db_follower.project_id.to_proto(),
|
||||
});
|
||||
}
|
||||
drop(db_followers);
|
||||
|
||||
let channel = if let Some(channel_id) = db_room.channel_id {
|
||||
Some(self.get_channel_internal(channel_id, &*tx).await?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok((
|
||||
db_room.channel_id,
|
||||
channel,
|
||||
proto::Room {
|
||||
id: db_room.id.to_proto(),
|
||||
live_kit_room: db_room.live_kit_room,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue