rust - 使用 tokio 0.1.x 生成具有非静态生命周期的任务

标签 rust lifetime rust-tokio

我有一个 tokio 核心,它的主要任务是运行一个 websocket(客户端)。当我从服务器收到一些消息时,我想执行一个新任务来更新一些数据。下面是一个最小的失败示例:

use tokio_core::reactor::{Core, Handle};
use futures::future::Future;
use futures::future;

struct Client {
    handle: Handle,
    data: usize,
}

impl Client {
    fn update_data(&mut self) {
        // spawn a new task that updates the data
        self.handle.spawn(future::ok(()).and_then(|x| {
            self.data += 1; // error here
            future::ok(())
        }));
    }
}

fn main() {
    let mut runtime = Core::new().unwrap();

    let mut client = Client {
        handle: runtime.handle(),
        data: 0,
    };

    let task = future::ok::<(), ()>(()).and_then(|_| {
        // under some conditions (omitted), we update the data
        client.update_data();
        future::ok::<(), ()>(())
    });
    runtime.run(task).unwrap();
}

产生此错误的原因:

error[E0477]: the type `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:13:51: 16:10 self:&mut &mut Client]>` does not fulfill the required lifetime
  --> src/main.rs:13:21                                                                                                                                                                
   |                                                                                                                                                                                   
13 |         self.handle.spawn(future::ok(()).and_then(|x| {                                                                                                                           
   |                     ^^^^^                                                                                                                                                         
   |                                                                                                                                                                                   
   = note: type must satisfy the static lifetime      

问题是通过句柄产生的新任务需要是静态的。描述了同样的问题here .可悲的是,我不清楚如何解决这个问题。即使尝试使用 ArcMutex(单线程应用程序确实不需要),我也没有成功。

由于 tokio 领域的发展相当迅速,我想知道当前最好的解决方案是什么。你有什么建议吗?

编辑

解决方案 Peter Hall适用于上面的示例。可悲的是,当我构建失败的示例时,我更改了 tokio reactor,认为它们是相似的。使用 tokio::runtime::current_thread

use futures::future;
use futures::future::Future;
use futures::stream::Stream;
use std::cell::Cell;
use std::rc::Rc;
use tokio::runtime::current_thread::{Builder, Handle};

struct Client {
    handle: Handle,
    data: Rc<Cell<usize>>,
}

impl Client {
    fn update_data(&mut self) {
        // spawn a new task that updates the data
        let mut data = Rc::clone(&self.data);
        self.handle.spawn(future::ok(()).and_then(move |_x| {
            data.set(data.get() + 1);
            future::ok(())
        }));
    }
}

fn main() {
    // let mut runtime = Core::new().unwrap();

    let mut runtime = Builder::new().build().unwrap();

    let mut client = Client {
        handle: runtime.handle(),
        data: Rc::new(Cell::new(1)),
    };

    let task = future::ok::<(), ()>(()).and_then(|_| {
        // under some conditions (omitted), we update the data
        client.update_data();
        future::ok::<(), ()>(())
    });
    runtime.block_on(task).unwrap();
}

我得到:

error[E0277]: `std::rc::Rc<std::cell::Cell<usize>>` cannot be sent between threads safely
--> src/main.rs:17:21                                                         
|                                                                            
17 |         self.handle.spawn(future::ok(()).and_then(move |_x| {              
|                     ^^^^^ `std::rc::Rc<std::cell::Cell<usize>>` cannot be sent between threads safely
|                                                                            
= help: within `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::cell::Cell<usize>>`
= note: required because it appears within the type `[closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]`
= note: required because it appears within the type `futures::future::chain::Chain<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`
= note: required because it appears within the type `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`

所以在这种情况下,我确实需要一个 Arc 和一个 Mutex,即使整个代码都是单线程的?

最佳答案

在单线程程序中,不需要使用Arc ; Rc足够了:

use std::{rc::Rc, cell::Cell};

struct Client {
    handle: Handle,
    data: Rc<Cell<usize>>,
}

impl Client {
    fn update_data(&mut self) {
        let data = Rc::clone(&self.data);
        self.handle.spawn(future::ok(()).and_then(move |_x| {
            data.set(data.get() + 1);
            future::ok(())
        }));
    }
}

重点是您不必再担心生命周期,因为 Rc 的每个克隆就像它拥有数据一样,而不是通过对 self 的引用来访问它.内层Cell (或 RefCell 对于非 Copy 类型)是必需的,因为 Rc不能可变地取消引用,因为它已被克隆。


spawn tokio::runtime::current_thread::Handle的方法|要求 future 是Send ,这是导致您的问题更新时出现问题的原因。 this Tokio Github issue 中有一个(某种程度上的)解释为什么会出现这种情况。 .

您可以使用 tokio::runtime::current_thread::spawn而不是 Handle 的方法,它将始终在当前线程中运行 future ,并且要求 future 是Send .您可以替换 self.handle.spawn在上面的代码中,它会工作得很好。

如果需要使用Handle上的方法那么你还需要求助于ArcMutex (或 RwLock )以满足 Send要求:

use std::sync::{Mutex, Arc};

struct Client {
    handle: Handle,
    data: Arc<Mutex<usize>>,
}

impl Client {
    fn update_data(&mut self) {
        let data = Arc::clone(&self.data);
        self.handle.spawn(future::ok(()).and_then(move |_x| {
            *data.lock().unwrap() += 1;
            future::ok(())
        }));
    }
}

如果你的数据真的是usize , 你也可以使用 AtomicUsize而不是 Mutex<usize> ,但我个人认为使用起来同样笨拙。

关于rust - 使用 tokio 0.1.x 生成具有非静态生命周期的任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53873375/

相关文章:

rust - 如何为链表实现 len() 函数?

rust - 如何在结构体中的数组 block 上实现迭代器?

rust - 为什么 poll() 内的延迟 future 在我的自定义流类型中不起作用?

asynchronous - Tokio react 器是否轮询每个组合器之间所有可能的 poll() 函数?

rust - 如何在不出现 E0507 的情况下将包含 Vec 的结构移至静态数组或从静态数组移出?

linux - 为什么我的 Rust 可执行文件映射到如此高的地址(靠近堆栈)而不是 0x400000?

memory - 如何在 Rust 的枚举中找到最大的变体?

rust - rust 的生命周期相互矛盾

iterator - 如何编写返回对自身的引用的迭代器?

asynchronous - 将同步Rust IO驱动程序转换为 `async`