rust - 如何实现 future 功能的流

标签 rust rust-tokio rust-async-std

为了了解流是如何工作的,我试图实现一个使用random.org的无限数生成器。我做的第一件事是实现一个版本,在该版本中,我将调用一个名为get_number的异步函数,它将填充缓冲区并返回下一个可能的数字:


struct RandomGenerator {
    buffer: Vec<u8>,
    position: usize,
}

impl RandomGenerator {
    pub fn new() -> RandomGenerator {
        Self {
            buffer: Vec::new(),
            position: 0,
        }
    }

    pub async fn get_number(&mut self) -> u8 {
        self.fill_buffer().await;

        let value = self.buffer[self.position];
        self.position += 1;

        value
    }

    async fn fill_buffer(&mut self) {
        if self.buffer.is_empty() || self.is_buffer_depleted() {
            let new_numbers = self.fetch_numbers().await;
            drop(replace(&mut self.buffer, new_numbers));
            self.position = 0;
        }
    }

    fn is_buffer_depleted(&self) -> bool {
        self.buffer.len() >= self.position
    }

    async fn fetch_numbers(&mut self) -> Vec<u8> {
        let response = reqwest::get("https://www.random.org/integers/?num=10&min=1&max=100&col=1&base=10&format=plain&rnd=new").await.unwrap();
        let numbers = response.text().await.unwrap();
        numbers
            .lines()
            .map(|line| line.trim().parse::<u8>().unwrap())
            .collect()
    }
}
通过此实现,我可以在循环上调用函数get_number并获取我想要的尽可能多的数字,但其想法是拥有迭代器,以便可以调用一堆合成函数,例如taketake_while等。
但是,当我尝试实现Stream时,问题开始出现:
我的第一个尝试是拥有一个可以保存对生成器的引用的结构
struct RandomGeneratorStream<'a> {
    generator: &'a mut RandomGenerator,
}

然后我实现了以下流
impl<'a> Stream for RandomGeneratorStream<'a> {
    type Item = u8;

    fn poll_next(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        let f = self.get_mut().generator.get_number();
        pin_mut!(f);
        f.poll_unpin(cx).map(Some)
    }
}

但调用此操作只会挂起进程
generator.into_stream().take(18).collect::<Vec<u8>>().await
在接下来的尝试中,我尝试使用pin_mut在流结构上保留将来的状态!但最终在生命周期中遇到许多错误,无法解决它们。
在这种情况下可以做什么?
这是没有流的工作代码:
use std::mem::replace;
struct RandomGenerator {
    buffer: Vec<u8>,
    position: usize,
}

impl RandomGenerator {
    pub fn new() -> RandomGenerator {
        Self {
            buffer: Vec::new(),
            position: 0,
        }
    }

    pub async fn get_number(&mut self) -> u8 {
        self.fill_buffer().await;

        let value = self.buffer[self.position];
        self.position += 1;

        value
    }

    async fn fill_buffer(&mut self) {
        if self.buffer.is_empty() || self.is_buffer_depleted() {
            let new_numbers = self.fetch_numbers().await;
            drop(replace(&mut self.buffer, new_numbers));
            self.position = 0;
        }
    }

    fn is_buffer_depleted(&self) -> bool {
        self.buffer.len() >= self.position
    }

    async fn fetch_numbers(&mut self) -> Vec<u8> {
        let response = reqwest::get("https://www.random.org/integers/?num=10&min=1&max=100&col=1&base=10&format=plain&rnd=new").await.unwrap();
        let numbers = response.text().await.unwrap();
        numbers
            .lines()
            .map(|line| line.trim().parse::<u8>().unwrap())
            .collect()
    }
}

#[tokio::main]
async fn main() {
    let mut generator = RandomGenerator::new();
    dbg!(generator.get_number().await);
}
在这里,您可以找到第一个工作示例的链接(而不是调用random.org,我使用了Cursor,因为dns分辨率在操场上不起作用)https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=730eaf1f7db842877d3f3e7ca1c6d2a5
我最后一次尝试使用流,您可以在这里找到https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=de0b212ee70865f6ac6c19430cd952cd

最佳答案

On the next tries, I tried to hold a state of the future on the stream struct using pin_mut! but ended up having many errors with lifetimes without being able to solve them.


您处在正确的轨道上,您需要坚持 future ,以便poll_next正常工作。
不幸的是,您会遇到可变引用的障碍。您需要保留&mut RandomGenerator以便重复使用,但 future 本身也必须保留&mut RandomGenerator才能完成其工作。这将违反可变引用的排他性。您以任何方式削减它都可能会遇到此问题。
Future转到Stream的更好方法是遵循建议here并使用 futures::stream::unfold :
fn as_stream<'a>(&'a mut self) -> impl Stream<Item = u8> + 'a {
    futures::stream::unfold(self, |rng| async {
        let number = rng.get_number().await;
        Some((number, rng))
    })
}
playground上看到它。
这可能不一定能帮助您了解有关流的更多信息,但是提供的功能通常比手动滚动更好。避免上面的多重变量引用问题的关键原因是因为生成 future 的函数获取了可变引用的所有权,然后在完成后将其返回。这样一来一次只存在一个。即使您自己实现了Stream,也必须使用类似的机制。

关于rust - 如何实现 future 功能的流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66607516/

相关文章:

rust - 为什么放弃这个 SpawnHandle 不会取消它的 future ?

asynchronous - 如何等待rust中的异步函数调用列表?

Rust:如何使用 async-std + TLS + HTTP 代理(http 隧道)?

rust - 是否可以查看当前安装的 Rust 版本的源代码?

rust - 可以从汇编的角度想到 "let"和其他 Rust 习语吗?

sql-server - 如何将所有行作为 tiberius future 的向量?

rust - tokio 在 Rust 中加入多个任务

rust - 如何使用异步来并行化繁重的计算?

string - 用同一向量 Rust 中的另一个字符串扩展一个字符串

rust - 为什么Mutex中的计数器与计数器的增量不一致?