Address some issues with the sqlez_macros

This commit is contained in:
Kay Simmons 2022-11-30 16:19:46 -08:00 committed by Mikayla Maki
parent 1b225fa37c
commit f68e8d4664
10 changed files with 183 additions and 174 deletions

View file

@ -1,6 +1,6 @@
use futures::{channel::oneshot, Future, FutureExt};
use lazy_static::lazy_static;
use parking_lot::RwLock;
use parking_lot::{Mutex, RwLock};
use std::{collections::HashMap, marker::PhantomData, ops::Deref, sync::Arc, thread};
use thread_local::ThreadLocal;
@ -73,37 +73,8 @@ impl<M: Migrator> ThreadSafeConnectionBuilder<M> {
}
pub async fn build(self) -> ThreadSafeConnection<M> {
if !QUEUES.read().contains_key(&self.connection.uri) {
let mut queues = QUEUES.write();
if !queues.contains_key(&self.connection.uri) {
let mut write_connection = self.connection.create_connection();
// Enable writes for this connection
write_connection.write = true;
if let Some(mut write_queue_constructor) = self.write_queue_constructor {
let write_channel = write_queue_constructor(write_connection);
queues.insert(self.connection.uri.clone(), write_channel);
} else {
use std::sync::mpsc::channel;
let (sender, reciever) = channel::<QueuedWrite>();
thread::spawn(move || {
while let Ok(write) = reciever.recv() {
write(&write_connection)
}
});
let sender = UnboundedSyncSender::new(sender);
queues.insert(
self.connection.uri.clone(),
Box::new(move |queued_write| {
sender
.send(queued_write)
.expect("Could not send write action to backgorund thread");
}),
);
}
}
}
self.connection
.initialize_queues(self.write_queue_constructor);
let db_initialize_query = self.db_initialize_query;
@ -134,6 +105,40 @@ impl<M: Migrator> ThreadSafeConnectionBuilder<M> {
}
impl<M: Migrator> ThreadSafeConnection<M> {
fn initialize_queues(&self, write_queue_constructor: Option<WriteQueueConstructor>) {
if !QUEUES.read().contains_key(&self.uri) {
let mut queues = QUEUES.write();
if !queues.contains_key(&self.uri) {
let mut write_connection = self.create_connection();
// Enable writes for this connection
write_connection.write = true;
if let Some(mut write_queue_constructor) = write_queue_constructor {
let write_channel = write_queue_constructor(write_connection);
queues.insert(self.uri.clone(), write_channel);
} else {
use std::sync::mpsc::channel;
let (sender, reciever) = channel::<QueuedWrite>();
thread::spawn(move || {
while let Ok(write) = reciever.recv() {
write(&write_connection)
}
});
let sender = UnboundedSyncSender::new(sender);
queues.insert(
self.uri.clone(),
Box::new(move |queued_write| {
sender
.send(queued_write)
.expect("Could not send write action to backgorund thread");
}),
);
}
}
}
}
pub fn builder(uri: &str, persistent: bool) -> ThreadSafeConnectionBuilder<M> {
ThreadSafeConnectionBuilder::<M> {
db_initialize_query: None,
@ -208,14 +213,18 @@ impl ThreadSafeConnection<()> {
uri: &str,
persistent: bool,
connection_initialize_query: Option<&'static str>,
write_queue_constructor: Option<WriteQueueConstructor>,
) -> Self {
Self {
let connection = Self {
uri: Arc::from(uri),
persistent,
connection_initialize_query,
connections: Default::default(),
_migrator: PhantomData,
}
};
connection.initialize_queues(write_queue_constructor);
connection
}
}
@ -243,6 +252,16 @@ impl<M: Migrator> Deref for ThreadSafeConnection<M> {
}
}
pub fn locking_queue() -> WriteQueueConstructor {
Box::new(|connection| {
let connection = Mutex::new(connection);
Box::new(move |queued_write| {
let connection = connection.lock();
queued_write(&connection)
})
})
}
#[cfg(test)]
mod test {
use indoc::indoc;