Безопасно-мутабельно-статический RwLock с Sender внутри [Rust]
У меня есть 2 потока, отправляющие сообщения, и основной поток их подхватывает.
Для этого я использую mpsc. Мне нужно инициализировать его статически. Для этого его нужно сделать его мутабельным. Так как обычные static mut являются unsafe, я решил его использовать через lazy_static! в обертке Mutex. Там он по сути является мьютексной ссылкой, которую можно разименовать и дать значение (при этом это все в safe Rust'е)
use std::sync::{mpsc::*, *};
use std::thread;
lazy_static::lazy_static! {
static ref MAIN_SENDER: Mutex<Option<Sender<u8>>> = Mutex::new(None);
}
fn ae() {
let tx = MAIN_SENDER.lock().unwrap().clone().unwrap().clone();
thread::spawn(move || { loop {
tx.send(23).unwrap();
thread::sleep(std::time::Duration::from_millis(200));
}});
}
fn main() {
let (tx, rx) = channel();
*MAIN_SENDER.lock().unwrap() = Some(tx);
ae();
for rex in rx {
println!("{}", rex);
}
}
Но это работает только для одного потока. Теперь мне нужно это сделать для 2 потоков, которым нужно передать Sender, чтобы они по нему отдавали сообщения в главный поток. Я для этого использую RwLock, у которого нет ограничений по читателям памяти, но может быть только один писатель. Причем с логической точки зрения, это можно сделать, ибо потоки-дети лишь читают память, в котором храниться Sender, и уже по которому они отдельно могут отправлять сообщения, независимо от писателя RwLock
use std::sync::{mpsc::*, *};
use std::thread;
lazy_static::lazy_static! {
static ref MAIN_SENDER: RwLock<Option<Sender<u8>>> = RwLock::new(None);
}
fn reqs(var: u8) {
let tx = MAIN_SENDER.read().unwrap().clone().unwrap().clone();
thread::spawn(move || {
loop {
tx.send(var).unwrap();
thread::sleep(std::time::Duration::from_millis(200));
}
});
}
fn main() {
let (tx, rx) = channel();
*MAIN_SENDER.write().unwrap() = Some(tx);
reqs(25);
reqs(14);
for req in rx {
println!("{}", req);
}
}
static ref MAIN_SENDER: RwLock<Option<Sender<u8>>> = RwLock::new(None);
<- Error: std::sync::mpsc::Sender<u8> cannot be shared between threads safely
Ну, я отчетливо понимаю что натягиваю сову на глобус. Но есть ли способ сохранения Sender в статическую память в обертке RwLock, для обеспечения доступа к нему из safe Rust?
Честно говоря, я даже не знаю как написать это через unsafe
Ответы (2 шт):
Проблема в том, что Sender явно размечен, как !Sync, что предполагает его роль в качестве некой обёртки синхронизации доступа, а внутри (при использовании из одного потока) доступ к памяти ведётся напрямую, т.е. представим нарушение подобного инварианта и получим датарейс. Я бы рекомендовал присмотреться к crossbeam-channel, но если хочется mpsc именно из std, тогда попробуйте такую обёртку.
use std::ops::Deref;
use std::ops::DerefMut;
use std::sync::Mutex;
use std::sync::MutexGuard;
use std::sync::RwLock;
use std::sync::mpsc::Sender;
use once_cell::sync::OnceCell;
use thread_local::ThreadLocal;
pub struct CowSender<T: Send> {
original: Mutex<Sender<T>>,
local: ThreadLocal<Sender<T>>,
}
pub struct CowSenderGuard<'a, T: Send> {
inner: &'a mut Sender<T>,
local: &'a mut ThreadLocal<Sender<T>>,
}
impl<T: Send> CowSender<T> {
pub fn lock_mut(&mut self) -> CowSenderGuard<'_, T> {
CowSenderGuard {
inner: self.original.get_mut().unwrap_or_else(|x| x.into_inner()),
local: &mut self.local,
}
}
fn lock_inner(&self) -> MutexGuard<'_, Sender<T>> {
self.original.lock().unwrap_or_else(|x| x.into_inner())
}
}
impl<T: Send> Deref for CowSender<T> {
type Target = Sender<T>;
fn deref(&self) -> &Sender<T> {
self.local.get_or(|| self.lock_inner().clone())
}
}
impl<T: Send> Deref for CowSenderGuard<'_, T> {
type Target = Sender<T>;
fn deref(&self) -> &Sender<T> {
&self.inner
}
}
impl<T: Send> DerefMut for CowSenderGuard<'_, T> {
fn deref_mut(&mut self) -> &mut Sender<T> {
&mut self.inner
}
}
impl<T: Send> Drop for CowSenderGuard<'_, T> {
fn drop(&mut self) {
self.local.iter_mut().for_each(|local| local.clone_from(&self.inner));
}
}
pub static SENDER: OnceCell<RwLock<CowSender<u8>>> = OnceCell::new();
Зачем вам RwLock? Ведь разделяемый Sender вы используете только для того, чтобы склонировать его, и после клонирования вам уже не нужна блокировка. Тут более чем достаточно мьютекса, и ваше исходное решение должно работать с любым количеством потоков-писателей, а если не работает - ищите ошибку.
Однако задумайтесь - зачем вам вообще условно-статическая инициализация и мьютексы?
Всё что вам нужно - это передать один параметр в функцию:
fn reqs(tx: Sender<u8>, var: u8) {
thread::spawn(move || { loop {
tx.send(23).unwrap();
thread::sleep(std::time::Duration::from_millis(200));
}});
}
fn main() {
let (tx, rx) = channel();
reqs(tx.clone(), 25);
reqs(tx, 14);
for req in rx {
println!("{}", req);
}
}