我正在寻找一种将下面的代码转换为异步版本的方法。密码是双重的。“第一个”线程模拟阻塞读取,例如长I/O操作。如果“read”成功,则表示另一个线程准备就绪。“第二个”线程尝试通过通道读取可能的信号,如果有值,则发送1.0
,如果没有值,则发送0.0
。
就我所理解的异步编程而言,Tokio和futures机箱是一个不错的选择,因为它们卸载了重量级的I/O操作。
要更直接地提出这个问题:当繁重的I/O操作试图获取一些字节时,我如何使用异步方法转换下面的代码以执行其他操作?
编辑:
上面提出的问题不够清楚,我正试图更详细地阐述这个问题。有一种similar question要求在未来采用正确的方式封装昂贵的同步I/O。这个问题的不同之处在于我正在寻找一个解决方案,其中包括在一个连续的值流上操作。其主要思想是这样的:如果有一个数据可以从某个I/O操作中获得,那么将其发送到一个通道中,如果没有另一个值,那么将其发送到流中。
let (tx, rx) = std::sync::mpsc::channel::<bool>();
let (data_tx, data_rx) = std::sync::mpsc::channel::<f64>();
// this thread simulates some heavy work, eg. some blocking I/O operation.
std::thread::spawn(move || {
let mut num_packets = 0;
let max_packets = 5;
loop {
std::thread::sleep(std::time::Duration::from_millis(50));
tx.send(true).expect("Error send into channel");
num_packets = num_packets + 1;
if num_packets >= max_packets {
break;
}
}
println!("send {} packets", num_packets);
});
std::thread::spawn(move || loop {
let rx_t = rx.try_recv();
if rx_t.is_err() {
if let Err(std::sync::mpsc::TryRecvError::Disconnected) = rx_t {
break;
}
data_tx.send(0.0).expect("Could not send into data channel");
} else {
data_tx.send(1.0).expect("Could not send into data channel");
}
});
let mut num_received_packets = 0;
let max_packets = 5;
loop {
let data = data_rx.recv().expect("Could not read from data channel");
if data >= 1.0 {
num_received_packets = num_received_packets + 1;
}
if num_received_packets >= max_packets {
break;
}
}
println!("received n packets {}", num_received_packets);
最佳答案
期货箱提供Stream
异步产生一个或多个值。Stream
提供两个流的combinator function到select
。两条河流都被调查过。如果一个流准备好产生一个值,这个值将被产生。这就是我要的。
#[macro_use]
extern crate tokio;
use futures::*; // 0.1.28
use tokio::*; // 0.1.22
struct A(Interval);
impl A {
pub fn new() -> A {
A(Interval::new_interval(std::time::Duration::from_millis(1)))
}
}
impl Stream for A {
type Item = f64;
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
try_ready!(self.0.poll().map_err(|_| ()));
Ok(Async::from(Option::from(1.0)))
}
}
// repeat for `B`
// ...
#[test]
fn test_select_streams() {
tokio::run(A::new().select(B::new()).for_each(move |num| {
println!("num {}", num);
Ok(())
}));
}
关于asynchronous - 在发生繁重的I/O操作时,如何转换线程代码以使用 future 连续检查计算值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57163172/