Как исправить многопоточный код?
use std::collections::HashMap;
pub fn frequency(input: &[&str], worker_count: usize) -> HashMap<char, usize> {
let (tx, rx) = std::sync::mpsc::channel();
let mut hndls = Vec::new();
{
// вычисляем количество потоков и размер порции
let mut portion = 10; //минимальная порция
let len = input.len();
let mut cnt = (len as f64 / portion as f64 + 0.5) as usize;
cnt = if cnt > worker_count {cnt} else {worker_count};
portion = (len as f64 / cnt as f64 + 0.5) as usize;
for (i, chank) in input.chunks(portion).enumerate() {
let tx_local = tx.clone();
let chank_local = chank.clone();
let handl = std::thread::spawn(move || {
let mut mp_local = HashMap::new();
//mp_local.insert('a', 1);
for s in chank_local {
for ch in s.to_lowercase().chars() {
if let Some(val) = mp_local.get_mut(&ch) {
*val += 1;
} else {
mp_local.insert(ch, 1);
}
}
}
if let Err(e) = tx_local.send(mp_local) {
eprintln!("Ошибка записи в канал потоком {i}: {e}");
}
});
hndls.push(handl);
}
}
drop(tx);
let mut mp = HashMap::new();
for mp_local in rx {
for (k,v) in mp_local {
if let Some(val) = mp.get_mut(&k) {
*val += v;
} else {
mp.insert(k, v);
}
}
}
for handl in hndls {
handl.join().unwrap();
}
mp
}
Ошибка:
error[E0521]: borrowed data escapes outside of function
--> src/lib.rs:19:25
|
3 | pub fn frequency(input: &[&str], worker_count: usize) -> HashMap<char, usize> {
| ----- - let's call the lifetime of this reference `'1`
| |
| `input` is a reference that is only valid in the function body
...
19 | let handl = std::thread::spawn(move || {
| _________________________^
20 | | let mut mp_local = HashMap::new();
21 | |
22 | | //mp_local.insert('a', 1);
... |
35 | | }
36 | | });
| | ^
| | |
| |______________`input` escapes the function body here
| argument requires that `'1` must outlive `'static`
error[E0521]: borrowed data escapes outside of function
--> src/lib.rs:19:25
|
3 | pub fn frequency(input: &[&str], worker_count: usize) -> HashMap<char, usize> {
| ----- - let's call the lifetime of this reference `'2`
| |
| `input` is a reference that is only valid in the function body
...
19 | let handl = std::thread::spawn(move || {
| _________________________^
20 | | let mut mp_local = HashMap::new();
21 | |
22 | | //mp_local.insert('a', 1);
... |
35 | | }
36 | | });
| | ^
| | |
| |______________`input` escapes the function body here
| argument requires that `'2` must outlive `'static`
For more information about this error, try `rustc --explain E0521`.
error: could not compile `parallel-letter-frequency` (lib test) due to 2 previous errors
warning: build failed, waiting for other jobs to finish...
error: could not compile `parallel-letter-frequency` (lib) due to 2 previous errors
Ответы (2 шт):
Автор решения: Pak Uula
→ Ссылка
Я не запускал, но чисто из общих соображений: chank.clone не клонирует объекты, из которых он состоит. Поэтому время жизни ссылок в chank_local равно времени жизни ссылок из параметра input. Ключевое слово move не может сдвинуть chank_local, так как это значение содержит ссылки с временем жизни отличным от его собственного и отличным от 'static.
НЕ ТЕСТИРОВАЛ!
Вам нужно не клонировать чанк, а копировать все значения из чанка:
let chank_local: Vec<String> = chank.iter().map(|s:&&str| (*s).to_owned()).collect();
Директива move передаст созданные строки в замыкание, которое параметр spawn, и память будет освобождена после завершения этого замыкания.
Автор решения: qwerty ytrewq
→ Ссылка
//Я русский.
use std::collections::HashMap;
pub fn frequency(input: &[&str], worker_count: usize) -> HashMap<char, usize> {
let mut mp = HashMap::new();
if input.is_empty() { return mp; }
// создаём канал
let (tx, rx) = std::sync::mpsc::channel();
// вектор для хранения дескрипторов потоков
let mut hndls = Vec::new();
{
// вычисляем количество потоков и размер порции
let mut portion = 5; //минимальная порция
let len = input.len();
let mut cnt = (len as f64 / portion as f64 + 0.5).round() as usize;
cnt = if cnt < worker_count {cnt} else {worker_count};
portion = (len as f64 / cnt as f64 + 0.5).round() as usize;
// преобразование ссылки в адрес, для передачи в другой поток
let ptr = std::ptr::addr_of!(input);
let p_addr = ptr as usize;
for i in 0..cnt {
let tx_local = tx.clone();
let handl = std::thread::spawn(move || {
let mut mp_local = HashMap::new();
// преобразование адреса в ссылку и получение слайса фрагмента
let ptr = p_addr as *const &[&str];
let m = unsafe { *ptr };
let chank = &m[i*portion..if (i+1)*portion<len {(i+1)*portion} else {len}];
for s in chank {
for mut ch in s.chars() {
if !ch.is_alphabetic() {
continue;
}
if let Some(c) = ch.to_lowercase().nth(0) { ch = c; } else { continue; }
if let Some(val) = mp_local.get_mut(&ch) {
*val += 1;
} else {
mp_local.insert(ch, 1);
}
}
}
if let Err(e) = tx_local.send(mp_local) {
eprintln!("Ошибка записи в канал потоком {i}: {e}");
}
});
hndls.push(handl);
}
}
drop(tx);
for mp_local in rx {
for (k,v) in mp_local {
if let Some(val) = mp.get_mut(&k) {
*val += v;
} else {
mp.insert(k, v);
}
}
}
for handl in hndls {
handl.join().unwrap();
}
mp
}
или так:
//Я русский.
use std::collections::HashMap;
use futures::executor::block_on;
async fn task(p_addr: usize, b: usize, e: usize) ->Result<HashMap<char,usize>, String> {
let mut mp = HashMap::new();
// преобразование адреса в ссылку и получение слайса фрагмента
let ptr = p_addr as *const &[&str];
let m = unsafe { *ptr };
let chank = &m[b..e];
for s in chank {
for mut ch in s.chars() {
if !ch.is_alphabetic() { continue; }
if let Some(c) = ch.to_lowercase().nth(0) { ch = c; } else { continue; }
if let Some(val) = mp.get_mut(&ch) {
*val += 1;
} else {
mp.insert(ch, 1);
}
}
}
Ok(mp)
}
async fn help(input: &[&str], worker_count: usize, mp: &mut HashMap<char, usize>) {
if input.is_empty() { return; }
// вектор для хранения futures
let mut futs = Vec::new();
{
// вычисляем количество заданий и размер порции
let mut portion = 5; //минимальная порция
let len = input.len();
let mut cnt = (len as f64 / portion as f64 + 0.5).round() as usize;
cnt = if cnt < worker_count {cnt} else {worker_count};
portion = (len as f64 / cnt as f64 + 0.5).round() as usize;
// преобразование ссылки в адрес, для передачи в другой поток
let ptr = std::ptr::addr_of!(input);
let p_addr = ptr as usize;
for i in 0..cnt {
let future = task(p_addr, i*portion, if (i+1)*portion<len {(i+1)*portion} else {len});
futs.push(future);
}
}
let mps = futures::future::try_join_all(futs).await.unwrap();
for mp_l in mps {
for (k,v) in mp_l {
if let Some(val) = mp.get_mut(&k) {
*val += v;
} else {
mp.insert(k, v);
}
}
}
}
pub fn frequency(input: &[&str], worker_count: usize) -> HashMap<char, usize> {
let mut mp = HashMap::new();
block_on(help(input, worker_count, &mut mp));
mp
}