Fix stale server queries, use foreign keys from connectionsn to servers
This commit is contained in:
parent
363e3cae4b
commit
6c58a4f885
10 changed files with 128 additions and 122 deletions
|
@ -110,7 +110,7 @@ impl Database {
|
|||
Ok(new_migrations)
|
||||
}
|
||||
|
||||
pub async fn create_server(&self, environment: &str) -> Result<ServerEpoch> {
|
||||
pub async fn create_server(&self, environment: &str) -> Result<ServerId> {
|
||||
self.transaction(|tx| async move {
|
||||
let server = server::ActiveModel {
|
||||
environment: ActiveValue::set(environment.into()),
|
||||
|
@ -118,30 +118,29 @@ impl Database {
|
|||
}
|
||||
.insert(&*tx)
|
||||
.await?;
|
||||
Ok(server.epoch)
|
||||
Ok(server.id)
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn delete_stale_projects(
|
||||
&self,
|
||||
new_epoch: ServerEpoch,
|
||||
new_epoch: ServerId,
|
||||
environment: &str,
|
||||
) -> Result<()> {
|
||||
self.transaction(|tx| async move {
|
||||
let stale_server_epochs = self
|
||||
.stale_server_epochs(environment, new_epoch, &tx)
|
||||
.await?;
|
||||
let stale_server_epochs = self.stale_server_ids(environment, new_epoch, &tx).await?;
|
||||
project_collaborator::Entity::delete_many()
|
||||
.filter(
|
||||
project_collaborator::Column::ConnectionEpoch
|
||||
project_collaborator::Column::ConnectionServerId
|
||||
.is_in(stale_server_epochs.iter().copied()),
|
||||
)
|
||||
.exec(&*tx)
|
||||
.await?;
|
||||
project::Entity::delete_many()
|
||||
.filter(
|
||||
project::Column::HostConnectionEpoch.is_in(stale_server_epochs.iter().copied()),
|
||||
project::Column::HostConnectionServerId
|
||||
.is_in(stale_server_epochs.iter().copied()),
|
||||
)
|
||||
.exec(&*tx)
|
||||
.await?;
|
||||
|
@ -152,7 +151,7 @@ impl Database {
|
|||
|
||||
pub async fn stale_room_ids(
|
||||
&self,
|
||||
new_epoch: ServerEpoch,
|
||||
new_epoch: ServerId,
|
||||
environment: &str,
|
||||
) -> Result<Vec<RoomId>> {
|
||||
self.transaction(|tx| async move {
|
||||
|
@ -161,15 +160,14 @@ impl Database {
|
|||
RoomId,
|
||||
}
|
||||
|
||||
let stale_server_epochs = self
|
||||
.stale_server_epochs(environment, new_epoch, &tx)
|
||||
.await?;
|
||||
let stale_server_epochs = self.stale_server_ids(environment, new_epoch, &tx).await?;
|
||||
Ok(room_participant::Entity::find()
|
||||
.select_only()
|
||||
.column(room_participant::Column::RoomId)
|
||||
.distinct()
|
||||
.filter(
|
||||
room_participant::Column::AnsweringConnectionEpoch.is_in(stale_server_epochs),
|
||||
room_participant::Column::AnsweringConnectionServerId
|
||||
.is_in(stale_server_epochs),
|
||||
)
|
||||
.into_values::<_, QueryAs>()
|
||||
.all(&*tx)
|
||||
|
@ -181,13 +179,13 @@ impl Database {
|
|||
pub async fn refresh_room(
|
||||
&self,
|
||||
room_id: RoomId,
|
||||
new_epoch: ServerEpoch,
|
||||
new_epoch: ServerId,
|
||||
) -> Result<RoomGuard<RefreshedRoom>> {
|
||||
self.room_transaction(|tx| async move {
|
||||
let stale_participant_filter = Condition::all()
|
||||
.add(room_participant::Column::RoomId.eq(room_id))
|
||||
.add(room_participant::Column::AnsweringConnectionId.is_not_null())
|
||||
.add(room_participant::Column::AnsweringConnectionEpoch.ne(new_epoch));
|
||||
.add(room_participant::Column::AnsweringConnectionServerId.ne(new_epoch));
|
||||
|
||||
let stale_participant_user_ids = room_participant::Entity::find()
|
||||
.filter(stale_participant_filter.clone())
|
||||
|
@ -231,19 +229,13 @@ impl Database {
|
|||
.await
|
||||
}
|
||||
|
||||
pub async fn delete_stale_servers(
|
||||
&self,
|
||||
new_epoch: ServerEpoch,
|
||||
environment: &str,
|
||||
) -> Result<()> {
|
||||
pub async fn delete_stale_servers(&self, new_epoch: ServerId, environment: &str) -> Result<()> {
|
||||
self.transaction(|tx| async move {
|
||||
server::Entity::delete_many()
|
||||
.filter(
|
||||
Condition::all().add(
|
||||
server::Column::Environment
|
||||
.eq(environment)
|
||||
.add(server::Column::Epoch.ne(new_epoch)),
|
||||
),
|
||||
Condition::all()
|
||||
.add(server::Column::Environment.eq(environment))
|
||||
.add(server::Column::Id.ne(new_epoch)),
|
||||
)
|
||||
.exec(&*tx)
|
||||
.await?;
|
||||
|
@ -252,26 +244,21 @@ impl Database {
|
|||
.await
|
||||
}
|
||||
|
||||
async fn stale_server_epochs(
|
||||
async fn stale_server_ids(
|
||||
&self,
|
||||
environment: &str,
|
||||
new_epoch: ServerEpoch,
|
||||
new_epoch: ServerId,
|
||||
tx: &DatabaseTransaction,
|
||||
) -> Result<Vec<ServerEpoch>> {
|
||||
) -> Result<Vec<ServerId>> {
|
||||
let stale_servers = server::Entity::find()
|
||||
.filter(
|
||||
Condition::all().add(
|
||||
server::Column::Environment
|
||||
.eq(environment)
|
||||
.add(server::Column::Epoch.ne(new_epoch)),
|
||||
),
|
||||
Condition::all()
|
||||
.add(server::Column::Environment.eq(environment))
|
||||
.add(server::Column::Id.ne(new_epoch)),
|
||||
)
|
||||
.all(&*tx)
|
||||
.await?;
|
||||
Ok(stale_servers
|
||||
.into_iter()
|
||||
.map(|server| server.epoch)
|
||||
.collect())
|
||||
Ok(stale_servers.into_iter().map(|server| server.id).collect())
|
||||
}
|
||||
|
||||
// users
|
||||
|
@ -1167,11 +1154,15 @@ impl Database {
|
|||
room_id: ActiveValue::set(room_id),
|
||||
user_id: ActiveValue::set(user_id),
|
||||
answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
|
||||
answering_connection_epoch: ActiveValue::set(Some(connection.epoch as i32)),
|
||||
answering_connection_server_id: ActiveValue::set(Some(ServerId(
|
||||
connection.epoch as i32,
|
||||
))),
|
||||
answering_connection_lost: ActiveValue::set(false),
|
||||
calling_user_id: ActiveValue::set(user_id),
|
||||
calling_connection_id: ActiveValue::set(connection.id as i32),
|
||||
calling_connection_epoch: ActiveValue::set(connection.epoch as i32),
|
||||
calling_connection_server_id: ActiveValue::set(Some(ServerId(
|
||||
connection.epoch as i32,
|
||||
))),
|
||||
..Default::default()
|
||||
}
|
||||
.insert(&*tx)
|
||||
|
@ -1198,7 +1189,9 @@ impl Database {
|
|||
answering_connection_lost: ActiveValue::set(false),
|
||||
calling_user_id: ActiveValue::set(calling_user_id),
|
||||
calling_connection_id: ActiveValue::set(calling_connection.id as i32),
|
||||
calling_connection_epoch: ActiveValue::set(calling_connection.epoch as i32),
|
||||
calling_connection_server_id: ActiveValue::set(Some(ServerId(
|
||||
calling_connection.epoch as i32,
|
||||
))),
|
||||
initial_project_id: ActiveValue::set(initial_project_id),
|
||||
..Default::default()
|
||||
}
|
||||
|
@ -1280,7 +1273,7 @@ impl Database {
|
|||
.eq(calling_connection.id as i32),
|
||||
)
|
||||
.add(
|
||||
room_participant::Column::CallingConnectionEpoch
|
||||
room_participant::Column::CallingConnectionServerId
|
||||
.eq(calling_connection.epoch as i32),
|
||||
)
|
||||
.add(room_participant::Column::AnsweringConnectionId.is_null()),
|
||||
|
@ -1320,14 +1313,16 @@ impl Database {
|
|||
.add(room_participant::Column::AnsweringConnectionId.is_null())
|
||||
.add(room_participant::Column::AnsweringConnectionLost.eq(true))
|
||||
.add(
|
||||
room_participant::Column::AnsweringConnectionEpoch
|
||||
room_participant::Column::AnsweringConnectionServerId
|
||||
.ne(connection.epoch as i32),
|
||||
),
|
||||
),
|
||||
)
|
||||
.set(room_participant::ActiveModel {
|
||||
answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
|
||||
answering_connection_epoch: ActiveValue::set(Some(connection.epoch as i32)),
|
||||
answering_connection_server_id: ActiveValue::set(Some(ServerId(
|
||||
connection.epoch as i32,
|
||||
))),
|
||||
answering_connection_lost: ActiveValue::set(false),
|
||||
..Default::default()
|
||||
})
|
||||
|
@ -1353,7 +1348,7 @@ impl Database {
|
|||
.eq(connection.id as i32),
|
||||
)
|
||||
.add(
|
||||
room_participant::Column::AnsweringConnectionEpoch
|
||||
room_participant::Column::AnsweringConnectionServerId
|
||||
.eq(connection.epoch as i32),
|
||||
),
|
||||
)
|
||||
|
@ -1376,7 +1371,7 @@ impl Database {
|
|||
.eq(connection.id as i32),
|
||||
)
|
||||
.add(
|
||||
room_participant::Column::CallingConnectionEpoch
|
||||
room_participant::Column::CallingConnectionServerId
|
||||
.eq(connection.epoch as i32),
|
||||
)
|
||||
.add(room_participant::Column::AnsweringConnectionId.is_null()),
|
||||
|
@ -1412,7 +1407,7 @@ impl Database {
|
|||
project_collaborator::Column::ConnectionId.eq(connection.id as i32),
|
||||
)
|
||||
.add(
|
||||
project_collaborator::Column::ConnectionEpoch
|
||||
project_collaborator::Column::ConnectionServerId
|
||||
.eq(connection.epoch as i32),
|
||||
),
|
||||
)
|
||||
|
@ -1437,7 +1432,7 @@ impl Database {
|
|||
});
|
||||
|
||||
let collaborator_connection_id = ConnectionId {
|
||||
epoch: collaborator.connection_epoch as u32,
|
||||
epoch: collaborator.connection_server_id.0 as u32,
|
||||
id: collaborator.connection_id as u32,
|
||||
};
|
||||
if collaborator_connection_id != connection {
|
||||
|
@ -1459,7 +1454,7 @@ impl Database {
|
|||
project_collaborator::Column::ConnectionId.eq(connection.id as i32),
|
||||
)
|
||||
.add(
|
||||
project_collaborator::Column::ConnectionEpoch
|
||||
project_collaborator::Column::ConnectionServerId
|
||||
.eq(connection.epoch as i32),
|
||||
),
|
||||
)
|
||||
|
@ -1472,7 +1467,9 @@ impl Database {
|
|||
Condition::all()
|
||||
.add(project::Column::RoomId.eq(room_id))
|
||||
.add(project::Column::HostConnectionId.eq(connection.id as i32))
|
||||
.add(project::Column::HostConnectionEpoch.eq(connection.epoch as i32)),
|
||||
.add(
|
||||
project::Column::HostConnectionServerId.eq(connection.epoch as i32),
|
||||
),
|
||||
)
|
||||
.exec(&*tx)
|
||||
.await?;
|
||||
|
@ -1538,7 +1535,7 @@ impl Database {
|
|||
.eq(connection.id as i32),
|
||||
)
|
||||
.add(
|
||||
room_participant::Column::AnsweringConnectionEpoch
|
||||
room_participant::Column::AnsweringConnectionServerId
|
||||
.eq(connection.epoch as i32),
|
||||
),
|
||||
)
|
||||
|
@ -1573,7 +1570,7 @@ impl Database {
|
|||
.eq(connection.id as i32),
|
||||
)
|
||||
.add(
|
||||
room_participant::Column::AnsweringConnectionEpoch
|
||||
room_participant::Column::AnsweringConnectionServerId
|
||||
.eq(connection.epoch as i32),
|
||||
),
|
||||
)
|
||||
|
@ -1595,7 +1592,7 @@ impl Database {
|
|||
Condition::all()
|
||||
.add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
|
||||
.add(
|
||||
project_collaborator::Column::ConnectionEpoch
|
||||
project_collaborator::Column::ConnectionServerId
|
||||
.eq(connection.epoch as i32),
|
||||
),
|
||||
)
|
||||
|
@ -1606,7 +1603,7 @@ impl Database {
|
|||
Condition::all()
|
||||
.add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
|
||||
.add(
|
||||
project_collaborator::Column::ConnectionEpoch
|
||||
project_collaborator::Column::ConnectionServerId
|
||||
.eq(connection.epoch as i32),
|
||||
),
|
||||
)
|
||||
|
@ -1624,7 +1621,7 @@ impl Database {
|
|||
.into_iter()
|
||||
.map(|collaborator| ConnectionId {
|
||||
id: collaborator.connection_id as u32,
|
||||
epoch: collaborator.connection_epoch as u32,
|
||||
epoch: collaborator.connection_server_id.0 as u32,
|
||||
})
|
||||
.collect();
|
||||
|
||||
|
@ -1633,7 +1630,7 @@ impl Database {
|
|||
host_user_id: project.host_user_id,
|
||||
host_connection_id: ConnectionId {
|
||||
id: project.host_connection_id as u32,
|
||||
epoch: project.host_connection_epoch as u32,
|
||||
epoch: project.host_connection_server_id.0 as u32,
|
||||
},
|
||||
connection_ids,
|
||||
});
|
||||
|
@ -1644,7 +1641,7 @@ impl Database {
|
|||
.filter(
|
||||
Condition::all()
|
||||
.add(project::Column::HostConnectionId.eq(connection.id as i32))
|
||||
.add(project::Column::HostConnectionEpoch.eq(connection.epoch as i32)),
|
||||
.add(project::Column::HostConnectionServerId.eq(connection.epoch as i32)),
|
||||
)
|
||||
.exec(&*tx)
|
||||
.await?;
|
||||
|
@ -1696,9 +1693,9 @@ impl Database {
|
|||
let mut pending_participants = Vec::new();
|
||||
while let Some(db_participant) = db_participants.next().await {
|
||||
let db_participant = db_participant?;
|
||||
if let Some((answering_connection_id, answering_connection_epoch)) = db_participant
|
||||
if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
|
||||
.answering_connection_id
|
||||
.zip(db_participant.answering_connection_epoch)
|
||||
.zip(db_participant.answering_connection_server_id)
|
||||
{
|
||||
let location = match (
|
||||
db_participant.location_kind,
|
||||
|
@ -1720,7 +1717,7 @@ impl Database {
|
|||
};
|
||||
|
||||
let answering_connection = ConnectionId {
|
||||
epoch: answering_connection_epoch as u32,
|
||||
epoch: answering_connection_server_id.0 as u32,
|
||||
id: answering_connection_id as u32,
|
||||
};
|
||||
participants.insert(
|
||||
|
@ -1751,7 +1748,7 @@ impl Database {
|
|||
while let Some(row) = db_projects.next().await {
|
||||
let (db_project, db_worktree) = row?;
|
||||
let host_connection = ConnectionId {
|
||||
epoch: db_project.host_connection_epoch as u32,
|
||||
epoch: db_project.host_connection_server_id.0 as u32,
|
||||
id: db_project.host_connection_id as u32,
|
||||
};
|
||||
if let Some(participant) = participants.get_mut(&host_connection) {
|
||||
|
@ -1820,7 +1817,7 @@ impl Database {
|
|||
.eq(connection.id as i32),
|
||||
)
|
||||
.add(
|
||||
room_participant::Column::AnsweringConnectionEpoch
|
||||
room_participant::Column::AnsweringConnectionServerId
|
||||
.eq(connection.epoch as i32),
|
||||
),
|
||||
)
|
||||
|
@ -1835,7 +1832,7 @@ impl Database {
|
|||
room_id: ActiveValue::set(participant.room_id),
|
||||
host_user_id: ActiveValue::set(participant.user_id),
|
||||
host_connection_id: ActiveValue::set(connection.id as i32),
|
||||
host_connection_epoch: ActiveValue::set(connection.epoch as i32),
|
||||
host_connection_server_id: ActiveValue::set(ServerId(connection.epoch as i32)),
|
||||
..Default::default()
|
||||
}
|
||||
.insert(&*tx)
|
||||
|
@ -1860,7 +1857,7 @@ impl Database {
|
|||
project_collaborator::ActiveModel {
|
||||
project_id: ActiveValue::set(project.id),
|
||||
connection_id: ActiveValue::set(connection.id as i32),
|
||||
connection_epoch: ActiveValue::set(connection.epoch as i32),
|
||||
connection_server_id: ActiveValue::set(ServerId(connection.epoch as i32)),
|
||||
user_id: ActiveValue::set(participant.user_id),
|
||||
replica_id: ActiveValue::set(ReplicaId(0)),
|
||||
is_host: ActiveValue::set(true),
|
||||
|
@ -1888,7 +1885,7 @@ impl Database {
|
|||
.await?
|
||||
.ok_or_else(|| anyhow!("project not found"))?;
|
||||
let host_connection = ConnectionId {
|
||||
epoch: project.host_connection_epoch as u32,
|
||||
epoch: project.host_connection_server_id.0 as u32,
|
||||
id: project.host_connection_id as u32,
|
||||
};
|
||||
if host_connection == connection {
|
||||
|
@ -1916,7 +1913,7 @@ impl Database {
|
|||
.filter(
|
||||
Condition::all()
|
||||
.add(project::Column::HostConnectionId.eq(connection.id as i32))
|
||||
.add(project::Column::HostConnectionEpoch.eq(connection.epoch as i32)),
|
||||
.add(project::Column::HostConnectionServerId.eq(connection.epoch as i32)),
|
||||
)
|
||||
.one(&*tx)
|
||||
.await?
|
||||
|
@ -1974,7 +1971,7 @@ impl Database {
|
|||
.filter(
|
||||
Condition::all()
|
||||
.add(project::Column::HostConnectionId.eq(connection.id as i32))
|
||||
.add(project::Column::HostConnectionEpoch.eq(connection.epoch as i32)),
|
||||
.add(project::Column::HostConnectionServerId.eq(connection.epoch as i32)),
|
||||
)
|
||||
.one(&*tx)
|
||||
.await?
|
||||
|
@ -2071,7 +2068,7 @@ impl Database {
|
|||
.await?
|
||||
.ok_or_else(|| anyhow!("no such project"))?;
|
||||
let host_connection = ConnectionId {
|
||||
epoch: project.host_connection_epoch as u32,
|
||||
epoch: project.host_connection_server_id.0 as u32,
|
||||
id: project.host_connection_id as u32,
|
||||
};
|
||||
if host_connection != connection {
|
||||
|
@ -2128,7 +2125,7 @@ impl Database {
|
|||
.await?
|
||||
.ok_or_else(|| anyhow!("no such project"))?;
|
||||
let host_connection = ConnectionId {
|
||||
epoch: project.host_connection_epoch as u32,
|
||||
epoch: project.host_connection_server_id.0 as u32,
|
||||
id: project.host_connection_id as u32,
|
||||
};
|
||||
if host_connection != connection {
|
||||
|
@ -2173,7 +2170,7 @@ impl Database {
|
|||
.eq(connection.id as i32),
|
||||
)
|
||||
.add(
|
||||
room_participant::Column::AnsweringConnectionEpoch
|
||||
room_participant::Column::AnsweringConnectionServerId
|
||||
.eq(connection.epoch as i32),
|
||||
),
|
||||
)
|
||||
|
@ -2204,7 +2201,7 @@ impl Database {
|
|||
let new_collaborator = project_collaborator::ActiveModel {
|
||||
project_id: ActiveValue::set(project_id),
|
||||
connection_id: ActiveValue::set(connection.id as i32),
|
||||
connection_epoch: ActiveValue::set(connection.epoch as i32),
|
||||
connection_server_id: ActiveValue::set(ServerId(connection.epoch as i32)),
|
||||
user_id: ActiveValue::set(participant.user_id),
|
||||
replica_id: ActiveValue::set(replica_id),
|
||||
is_host: ActiveValue::set(false),
|
||||
|
@ -2315,7 +2312,7 @@ impl Database {
|
|||
.add(project_collaborator::Column::ProjectId.eq(project_id))
|
||||
.add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
|
||||
.add(
|
||||
project_collaborator::Column::ConnectionEpoch
|
||||
project_collaborator::Column::ConnectionServerId
|
||||
.eq(connection.epoch as i32),
|
||||
),
|
||||
)
|
||||
|
@ -2336,7 +2333,7 @@ impl Database {
|
|||
let connection_ids = collaborators
|
||||
.into_iter()
|
||||
.map(|collaborator| ConnectionId {
|
||||
epoch: collaborator.connection_epoch as u32,
|
||||
epoch: collaborator.connection_server_id.0 as u32,
|
||||
id: collaborator.connection_id as u32,
|
||||
})
|
||||
.collect();
|
||||
|
@ -2345,7 +2342,7 @@ impl Database {
|
|||
id: project_id,
|
||||
host_user_id: project.host_user_id,
|
||||
host_connection_id: ConnectionId {
|
||||
epoch: project.host_connection_epoch as u32,
|
||||
epoch: project.host_connection_server_id.0 as u32,
|
||||
id: project.host_connection_id as u32,
|
||||
},
|
||||
connection_ids,
|
||||
|
@ -2372,7 +2369,7 @@ impl Database {
|
|||
|
||||
if collaborators.iter().any(|collaborator| {
|
||||
let collaborator_connection = ConnectionId {
|
||||
epoch: collaborator.connection_epoch as u32,
|
||||
epoch: collaborator.connection_server_id.0 as u32,
|
||||
id: collaborator.connection_id as u32,
|
||||
};
|
||||
collaborator_connection == connection
|
||||
|
@ -2404,7 +2401,7 @@ impl Database {
|
|||
while let Some(participant) = participants.next().await {
|
||||
let participant = participant?;
|
||||
connection_ids.insert(ConnectionId {
|
||||
epoch: participant.connection_epoch as u32,
|
||||
epoch: participant.connection_server_id.0 as u32,
|
||||
id: participant.connection_id as u32,
|
||||
});
|
||||
}
|
||||
|
@ -2436,7 +2433,7 @@ impl Database {
|
|||
while let Some(participant) = participants.next().await {
|
||||
let participant = participant?;
|
||||
guest_connection_ids.push(ConnectionId {
|
||||
epoch: participant.connection_epoch as u32,
|
||||
epoch: participant.connection_server_id.0 as u32,
|
||||
id: participant.connection_id as u32,
|
||||
});
|
||||
}
|
||||
|
@ -2817,7 +2814,7 @@ id_type!(RoomParticipantId);
|
|||
id_type!(ProjectId);
|
||||
id_type!(ProjectCollaboratorId);
|
||||
id_type!(ReplicaId);
|
||||
id_type!(ServerEpoch);
|
||||
id_type!(ServerId);
|
||||
id_type!(SignupId);
|
||||
id_type!(UserId);
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue