asynchronous - 在发生繁重的I/O操作时,如何转换线程代码以使用 future 连续检查计算值?

标签 asynchronous rust

我正在寻找一种将下面的代码转换为异步版本的方法。密码是双重的。“第一个”线程模拟阻塞读取,例如长I/O操作。如果“read”成功,则表示另一个线程准备就绪。“第二个”线程尝试通过通道读取可能的信号,如果有值,则发送1.0,如果没有值,则发送0.0
就我所理解的异步编程而言,Tokio和futures机箱是一个不错的选择,因为它们卸载了重量级的I/O操作。
要更直接地提出这个问题:当繁重的I/O操作试图获取一些字节时,我如何使用异步方法转换下面的代码以执行其他操作?
编辑:
上面提出的问题不够清楚,我正试图更详细地阐述这个问题。有一种similar question要求在未来采用正确的方式封装昂贵的同步I/O。这个问题的不同之处在于我正在寻找一个解决方案,其中包括在一个连续的值流上操作。其主要思想是这样的:如果有一个数据可以从某个I/O操作中获得,那么将其发送到一个通道中,如果没有另一个值,那么将其发送到流中。

let (tx, rx) = std::sync::mpsc::channel::<bool>();
let (data_tx, data_rx) = std::sync::mpsc::channel::<f64>();

// this thread simulates some heavy work, eg. some blocking I/O operation.
std::thread::spawn(move || {
    let mut num_packets = 0;
    let max_packets = 5;

    loop {
        std::thread::sleep(std::time::Duration::from_millis(50));
        tx.send(true).expect("Error send into channel");

        num_packets = num_packets + 1;
        if num_packets >= max_packets {
            break;
        }
    }

    println!("send {} packets", num_packets);
});

std::thread::spawn(move || loop {
    let rx_t = rx.try_recv();
    if rx_t.is_err() {
        if let Err(std::sync::mpsc::TryRecvError::Disconnected) = rx_t {
            break;
        }
        data_tx.send(0.0).expect("Could not send into data channel");
    } else {
        data_tx.send(1.0).expect("Could not send into data channel");
    }
});

let mut num_received_packets = 0;
let max_packets = 5;

loop {
    let data = data_rx.recv().expect("Could not read from data channel");

    if data >= 1.0 {
        num_received_packets = num_received_packets + 1;
    }

    if num_received_packets >= max_packets {
        break;
    }
}

println!("received n packets {}", num_received_packets);

最佳答案

期货箱提供Stream异步产生一个或多个值。
Stream提供两个流的combinator functionselect。两条河流都被调查过。如果一个流准备好产生一个值,这个值将被产生。这就是我要的。

#[macro_use]
extern crate tokio;

use futures::*; // 0.1.28
use tokio::*; // 0.1.22

struct A(Interval);

impl A {
    pub fn new() -> A {
        A(Interval::new_interval(std::time::Duration::from_millis(1)))
    }
}

impl Stream for A {
    type Item = f64;
    type Error = ();

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        try_ready!(self.0.poll().map_err(|_| ()));
        Ok(Async::from(Option::from(1.0)))
    }
}

// repeat for `B`
// ...

#[test]
fn test_select_streams() {
    tokio::run(A::new().select(B::new()).for_each(move |num| {
        println!("num {}", num);
        Ok(())
    }));
}

关于asynchronous - 在发生繁重的I/O操作时,如何转换线程代码以使用 future 连续检查计算值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57163172/

相关文章:

javascript - 异步 NPM 模块不为数组中的每个项目执行 setTimeout

asp.net-mvc - 在 ASP.NET MVC 中使用 EWS 异步发送批量电子邮件

javascript - 为什么 console.log 在 promise 循环后工作

rust - 如何为 Cargo.toml 中的 [build-dependencies] 生成 rustdoc 文档?

multidimensional-array - Rust 中的多维向量,段错误?

node.js - 同步多个请求和多个数据库调用

python - 异步提要解析器请求

rust - 不比较 Rust 中的字符串

generics - 具有通用参数类型的函数

rust - 遍历中RefCell的循环引用借用