rust - 我怎样才能将两个 futures 链接到同一个资源上,而不必提前定义每个方法组合?

标签 rust

我正在编写代码以使用 SIM800L 调制解调器引导并连接到 2G/3G 网络。该调制解调器与单个串行 channel 连接,我在该项目之外将其多路复用为 4 个 channel (数据、文本接口(interface)、控制接口(interface)、状态消息)。

为了启动它,我需要运行一系列顺序命令。此序列根据调制解调器的输出而变化(SIM 是否锁定?SIM 需要解锁什么样的信息?我们正在使用什么样的 APN?我们想要什么样的网络选择?)。我最初认为这将是 futures 的完美应用程序,因为每个单独的操作在空闲时间上都可能非常昂贵(AT+COPS,命令之一,最多需要 10 秒才能返回)。

我正在做这样的事情,虽然它编译并似乎按顺序执行命令,但第三个操作是空的。我的问题是双重的:为什么运行的命令没有在最后一个 future 的结果中弹出,有没有更强大的方法来做这样的事情?

#![feature(conservative_impl_trait)]

extern crate futures;
extern crate tokio_core;

use std::sync::{Arc, Mutex};
use futures::{future, Future};
use tokio_core::reactor::Core;
use futures::sync::oneshot;
use std::thread;
use std::io;
use std::time::Duration;

pub struct Channel {
    operations: Arc<Mutex<Vec<String>>>,
}

impl Channel {
    pub fn ops(&mut self) -> Box<Future<Item = Vec<String>, Error = io::Error>> {
        println!("{:?}", self.operations);
        let ops = Arc::clone(&self.operations);
        let ops = ops.lock().unwrap();
        future::ok::<Vec<String>, io::Error>(ops.to_vec()).boxed()
    }

    pub fn run(&mut self, command: &str) -> Box<Future<Item = Vec<String>, Error = io::Error>> {
        let (tx, rx) = oneshot::channel::<Vec<String>>();

        let ops = Arc::clone(&self.operations);
        let str_cmd = String::from(command);
        thread::spawn(move || {
            thread::sleep(Duration::new(0, 10000));

            let mut ops = ops.lock().unwrap();
            ops.push(str_cmd.clone());
            println!("Pushing op: {}", str_cmd.clone());
            tx.send(vec!["OK".to_string()])
        });

        rx.map_err(|_| io::Error::new(io::ErrorKind::NotFound, "Test"))
            .boxed()
    }
}

pub struct Channels {
    inner_object: Arc<Mutex<Channel>>,
}

impl Channels {
    pub fn one(&self, cmd: &str) -> Box<Future<Item = Vec<String>, Error = io::Error>> {
        let v = Arc::clone(&self.inner_object);
        let mut v = v.lock().unwrap();
        v.run(&cmd)
    }

    pub fn ops(&self) -> Box<Future<Item = Vec<String>, Error = io::Error>> {
        let v = Arc::clone(&self.inner_object);
        let mut v = v.lock().unwrap();
        v.ops()
    }

    pub fn run_command(&self) -> Box<Future<Item = (), Error = io::Error>> {
        let a = self.one("AT+CMEE=2");
        let b = self.one("AT+CREG=0");
        let c = self.ops();
        Box::new(a.and_then(|result_1| {
            assert_eq!(result_1, vec![String::from("OK")]);
            b.and_then(|result_2| {
                assert_eq!(result_2, vec![String::from("OK")]);
                c.map(move |ops| {
                    assert_eq!(
                        ops.as_slice(),
                        ["AT+CMEE=2".to_string(), "AT+CREG=0".to_string()]
                    );
                })
            })
        }))
    }
}

fn main() {
    let mut core = Core::new().expect("Core should be created");
    let channels = Channels {
        inner_object: Arc::new(Mutex::new(Channel {
            operations: Arc::new(Mutex::new(vec![])),
        })),
    };
    let result = core.run(channels.run_command()).expect("Should've worked");

    println!("{:?}", result);
}

playground

最佳答案

why do the commands run not pop up in the result of the last future

因为您还没有对以这种方式发生的操作进行排序:

let a = self.one("AT+CMEE=2");
let b = self.one("AT+CREG=0");
let c = self.ops();

立即构建:

  • a , b — promise 在他们回应之前睡一会儿
  • c — 在向量中获取操作的 promise

c 的时间点已创建, sleep 尚未终止,因此尚未执行任何操作,因此向量将为空。

Future::and_then旨在用于定义顺序操作。这在你的情况下很复杂,因为你想使用 selfand_then 的正文中关闭。您可以克隆 Arc<Channel>并改用它。

您会注意到我做了一些简化:

  • 返回 String而不是 Vec<String>
  • 删除未使用的 mut限定符和 Mutex
  • 返回操作 Vec直接。
extern crate futures;
extern crate tokio_core;

use std::sync::{Arc, Mutex};
use futures::Future;
use tokio_core::reactor::Core;
use futures::sync::oneshot;
use std::thread;
use std::io;
use std::time::Duration;

pub struct Channel {
    operations: Arc<Mutex<Vec<String>>>,
}

impl Channel {
    fn ops(&self) -> Vec<String> {
        self.operations.lock().unwrap().clone()
    }

    fn command(&self, command: &str) -> Box<Future<Item = String, Error = io::Error>> {
        let (tx, rx) = oneshot::channel();

        let ops = Arc::clone(&self.operations);
        let str_cmd = String::from(command);

        thread::spawn(move || {
            thread::sleep(Duration::new(0, 10000));

            println!("Pushing op: {}", str_cmd);
            ops.lock().unwrap().push(str_cmd);

            tx.send("OK".to_string())
        });

        Box::new(rx.map_err(|_| io::Error::new(io::ErrorKind::NotFound, "Test")))
    }
}

struct Channels {
    data: Arc<Channel>,
}

impl Channels {
    fn run_command(&self) -> Box<Future<Item = (), Error = io::Error>> {
        let d2 = Arc::clone(&self.data);
        let d3 = Arc::clone(&self.data);

        Box::new(
            self.data
                .command("AT+CMEE=2")
                .and_then(move |cmee_answer| {
                    assert_eq!(cmee_answer, "OK"); // This should be checked in `command` and be a specific Error
                    d2.command("AT+CREG=0")
                })
                .map(move |creg_answer| {
                    assert_eq!(creg_answer, "OK"); // This should be checked in `command` and be a specific Error
                    let ops = d3.ops();
                    assert_eq!(ops, ["AT+CMEE=2", "AT+CREG=0"])
                }),
        )
    }
}

fn main() {
    let mut core = Core::new().expect("Core should be created");
    let channels = Channels {
        data: Arc::new(Channel {
            operations: Arc::new(Mutex::new(vec![])),
        }),
    };
    let result = core.run(channels.run_command()).expect("Should've worked");

    println!("{:?}", result);
}

但是,这不是我通常在 futures 中看到的代码类型。而不是服用 &self , 多 future 走self .让我们看看它会是什么样子:

extern crate futures;
extern crate tokio_core;

use std::sync::{Arc, Mutex};
use futures::Future;
use tokio_core::reactor::Core;
use futures::sync::oneshot;
use std::thread;
use std::io;
use std::time::Duration;

#[derive(Clone)]
pub struct Channel {
    operations: Arc<Mutex<Vec<String>>>,
}

impl Channel {
    fn ops(&self) -> Arc<Mutex<Vec<String>>> {
        Arc::clone(&self.operations)
    }

    fn command(self, command: &str) -> Box<Future<Item = (Self, String), Error = io::Error>> {
        let (tx, rx) = oneshot::channel();
        let str_cmd = String::from(command);

        thread::spawn(move || {
            thread::sleep(Duration::new(0, 10000));

            println!("Pushing op: {}", str_cmd);
            self.operations.lock().unwrap().push(str_cmd);

            tx.send((self, "OK".to_string()))
        });

        Box::new(rx.map_err(|_| io::Error::new(io::ErrorKind::NotFound, "Test")))
    }
}

struct Channels {
    data: Channel,
}

impl Channels {
    fn run_command(self) -> Box<Future<Item = (), Error = io::Error>> {
        Box::new(
            self.data
                .clone()
                .command("AT+CMEE=2")
                .and_then(|(channel, cmee_answer)| {
                    assert_eq!(cmee_answer, "OK");
                    channel.command("AT+CREG=0")
                })
                .map(|(channel, creg_answer)| {
                    assert_eq!(creg_answer, "OK");
                    let ops = channel.ops();
                    let ops = ops.lock().unwrap();
                    assert_eq!(*ops, ["AT+CMEE=2", "AT+CREG=0"]);
                }),
        )
    }
}

fn main() {
    let mut core = Core::new().expect("Core should be created");
    let channels = Channels {
        data: Channel {
            operations: Arc::new(Mutex::new(vec![])),
        },
    };
    let result = core.run(channels.run_command()).expect("Should've worked");

    println!("{:?}", result);
}

关于rust - 我怎样才能将两个 futures 链接到同一个资源上,而不必提前定义每个方法组合?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47103771/

相关文章:

generics - 如何初始化常量泛型数组?

shell - 解析表达式语法是否适合解析 shell 命令语言?

compiler-errors - 奇怪地重复出现的通用特征模式 : overflow evaluating the requirement

compilation - 有 Rust 解释器吗?

generics - 将 trait 中方法的返回类型与实现该 trait 的类型绑定(bind)

rust - 我如何告诉 Rust 我的 Option 的值实际上比传递给 and_then 的闭包还长?

rust - 为什么 Rust RwLock 在 fork 时表现异常?

rust - 清除前设置剪刀矩形

android - 如何将 Flutter 应用程序的构建过程与 Rust 代码集成?即在构建 Flutter 代码时,如何自动构建其 Rust 代码?

linux - 绑定(bind)() : "Cannot assign request address"