1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
use crate::util::killable_channel::{channel, KillMode, Killer, Receiver, SendMode, Sender}; use crate::util::FAILED_TO_LOCK; use crossbeam::channel; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; #[allow(clippy::mutex_atomic)] pub fn lossy_channel<T>(queue_size: usize) -> (LossySender<T>, LossyReceiver<T>) { let (killer, data_tx, receiver) = channel(SendMode::Unbounded, KillMode::Async); let is_open = Arc::new(AtomicBool::new(true)); let queue_size = Arc::new(Mutex::new(queue_size)); let sender = LossySender { data_tx, data_rx: receiver.data_rx.clone(), killer, is_open, queue_size, }; (sender, receiver) } #[derive(Clone)] pub struct LossySender<T> { data_tx: Sender<T>, data_rx: channel::Receiver<T>, killer: Killer, is_open: Arc<AtomicBool>, pub queue_size: Arc<Mutex<usize>>, } impl<T> LossySender<T> { pub fn try_send(&self, msg: T) -> Result<(), channel::TrySendError<T>> { if !self.is_open.load(Ordering::SeqCst) { return Err(channel::TrySendError::Disconnected(msg)); } self.data_tx.try_send(msg)?; self.remove_extra_data(); Ok(()) } pub fn close(&mut self) -> Result<(), channel::SendError<()>> { self.is_open.store(false, Ordering::SeqCst); self.killer.send() } fn remove_extra_data(&self) { let queue_size: usize = *self.queue_size.lock().expect(FAILED_TO_LOCK); while self.data_rx.len() > queue_size { if self.data_rx.try_recv().is_err() { log::error!("Failed to remove excess data from message queue"); break; } } } pub fn set_queue_size(&self, queue_size: usize) { *self.queue_size.lock().expect(FAILED_TO_LOCK) = queue_size; } pub fn set_queue_size_max(&self, queue_size: usize) { let mut current_size = self.queue_size.lock().expect(FAILED_TO_LOCK); *current_size = current_size.max(queue_size); } } pub type LossyReceiver<T> = Receiver<T>;