rust - 如何使用启用 Futures 的超 block 的 Serde 零拷贝反序列化来存储结果?

标签 rust zero-copy serde

我正在使用 futures、tokio、hyper 和 serde_json 来请求和反序列化一些我需要保留直到下一个请求的数据。我最初的想法是制作一个包含 hyper::Chunk 的结构以及从 Chunk 借用的反序列化数据,但无法获得正确的生命周期。我尝试使用 rental crate ,但我也无法让它工作。也许我正在使用 'buffer声明缓冲区之前的生命周期 Vec ,但也许我搞砸了其他事情:

#[rental]
pub struct ChunkJson<T: serde::de::Deserialize<'buffer>> {
    buffer: Vec<u8>,
    json: T
}

有什么方法可以使生命周期正确,还是我应该只使用 DeserializeOwned并放弃零拷贝?

有关更多上下文,以下代码有效(定期反序列化来自两个 URL 的 JSON,保留结果以便我们可以对它们执行某些操作)。我想更改我的 XY使用的类型 Cow<'a, str>对于他们的领域,从 DeserializeOwned 改变至 Deserialize<'a> .为此,我需要为每个存储已反序列化的切片,但我不知道该怎么做。我正在寻找使用 Serde 的零拷贝反序列化并保留结果的示例,或者寻找一些重构我的代码的想法。

#[macro_use]
extern crate serde_derive;

extern crate serde;
extern crate serde_json;
extern crate futures;
extern crate tokio_core;
extern crate tokio_periodic;
extern crate hyper;

use std::collections::HashMap;
use std::error::Error;

use futures::future;
use futures::Future;
use futures::stream::Stream;
use hyper::Client;


fn stream_json<'a, T: serde::de::DeserializeOwned + Send + 'a>
    (handle: &tokio_core::reactor::Handle,
     url: String,
     period: u64)
     -> Box<Stream<Item = T, Error = Box<Error>> + 'a> {
    let client = Client::new(handle);
    let timer = tokio_periodic::PeriodicTimer::new(handle).unwrap();
    timer
        .reset(::std::time::Duration::new(period, 0))
        .unwrap();
    Box::new(futures::Stream::zip(timer.from_err::<Box<Error>>(), futures::stream::unfold( (), move |_| {
            let uri = url.parse::<hyper::Uri>().unwrap();
            let get = client.get(uri).from_err::<Box<Error>>().and_then(|res| {
                res.body().concat().from_err::<Box<Error>>().and_then(|chunks| {
                    let p: Result<T, Box<Error>> = serde_json::from_slice::<T>(chunks.as_ref()).map_err(|e| Box::new(e) as Box<Error>);
                    match p {
                        Ok(json) => future::ok((json, ())),
                        Err(err) => future::err(err)
                    }
                })
            });
            Some(get)
        })).map(|x| { x.1 }))
}

#[derive(Serialize, Deserialize, Debug)]
pub struct X {
    foo: String,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Y {
    bar: String,
}

fn main() {

    let mut core = tokio_core::reactor::Core::new().unwrap();
    let handle = core.handle();

    let x_stream = stream_json::<HashMap<String, X>>(&handle, "http://localhost/X".to_string(), 2);
    let y_stream = stream_json::<HashMap<String, Y>>(&handle, "http://localhost/Y".to_string(), 5);
    let mut xy_stream = x_stream.merge(y_stream);

    let mut last_x = HashMap::new();
    let mut last_y = HashMap::new();

    loop {
        match core.run(futures::Stream::into_future(xy_stream)) {
            Ok((Some(item), stream)) => {
                match item {
                    futures::stream::MergedItem::First(x) => last_x = x,
                    futures::stream::MergedItem::Second(y) => last_y = y,
                    futures::stream::MergedItem::Both(x, y) => {
                        last_x = x;
                        last_y = y;
                    }
                }
                println!("\nx = {:?}", &last_x);
                println!("y = {:?}", &last_y);
                // Do more stuff with &last_x and &last_y

                xy_stream = stream;
            }
            Ok((None, stream)) => xy_stream = stream,
            Err(_) => {
                panic!("error");
            }
        }
    }
}

最佳答案

当试图解决一个复杂的编程问题时,尽可能多地删除是非常有用的。获取您的代码并删除您可以删除的内容,直到问题消失。稍微调整一下您的代码并继续删除,直到您不能再删除为止。然后,扭转问题,从最小的部分开始构建,然后重新解决错误。执行这两项操作将告诉您问题出在哪里。

首先,让我们确保反序列化正确:

extern crate serde;
extern crate serde_json;
#[macro_use]
extern crate serde_derive;

use std::borrow::Cow;

#[derive(Debug, Deserialize)]
pub struct Example<'a> {
    #[serde(borrow)]
    name: Cow<'a, str>,
    key: bool,
}

impl<'a> Example<'a> {
    fn info(&self) {
        println!("{:?}", self);
        match self.name {
            Cow::Borrowed(_) => println!("Is borrowed"),
            Cow::Owned(_) => println!("Is owned"),
        }
    }
}

fn main() {
    let data: Vec<_> = br#"{"key": true, "name": "alice"}"#.to_vec();

    let decoded: Example = serde_json::from_slice(&data).expect("Couldn't deserialize");
    decoded.info();
}

这里,我忘了加上#[serde(borrow)]属性,所以我很高兴我做了这个测试!

接下来,我们可以介绍一下出租箱:

#[macro_use]
extern crate rental;

rental! {
    mod holding {
        use super::*;

        #[rental]
        pub struct VecHolder {
            data: Vec<u8>,
            parsed: Example<'data>,
        }
    }
}

fn main() {
    let data: Vec<_> = br#"{"key": true, "name": "alice"}"#.to_vec();

    let holder = holding::VecHolder::try_new(data, |data| {
        serde_json::from_slice(data)
    });
    let holder = match holder {
        Ok(holder) => holder,
        Err(_) => panic!("Unable to construct rental"),
    };

    holder.rent(|example| example.info());

    // Make sure we can move the data and it's still valid
    let holder2 = { holder };
    holder2.rent(|example| example.info());
}

接下来我们尝试创建租金Chunk :

#[rental]
pub struct ChunkHolder {
    data: Chunk,
    parsed: Example<'data>,
}

不幸的是,这失败了:

  --> src/main.rs:29:1
   |
29 | rental! {
   | ^
   |
   = help: message: Field `data` must have an angle-bracketed type parameter or be `String`.

糟糕!检查the docs for rental , 我们可以添加 #[target_ty_hack="[u8]"]data field 。这导致:

error[E0277]: the trait bound `hyper::Chunk: rental::__rental_prelude::StableDeref` is not satisfied
  --> src/main.rs:29:1
   |
29 | rental! {
   | ^ the trait `rental::__rental_prelude::StableDeref` is not implemented for `hyper::Chunk`
   |
   = note: required by `rental::__rental_prelude::static_assert_stable_deref`

这很烦人;因为我们无法为 Chunk 实现该特征,我们只需要装箱Chunk ,证明它有一个稳定的地址:

#[rental]
pub struct ChunkHolder {
    data: Box<Chunk>,
    parsed: Example<'data>,
}

我还查看了是否有办法获得 Vec<u8>退出 Chunk , 但它似乎不存在。那将是另一种分配和间接更少的解决方案。

在这一点上,剩下的“全部”就是将其集成回 future 代码中。除了你之外,任何人都需要做很多工作来重新创建它,但我预计这样做不会有任何明显的问题。

关于rust - 如何使用启用 Futures 的超 block 的 Serde 零拷贝反序列化来存储结果?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43702185/

相关文章:

rust - 返回多个异常的 Java 函数的 Rust 等效返回值是多少?

rust - 如何通过 Rust 宏将表达式中的一个标识符替换为另一个标识符?

rust - 如何从 Box<dyn T> 中获取 &dyn T

rust - `expected reference, found integral variable` 从 BTreeMap 访问一个值

java - 如何引用数组的一部分?

linux - 如何应对3.2亿个272字节的UDP数据包?

java - 你如何在java中编写零拷贝?主要区别是什么

json - 在同一属性中解析具有多种表示形式的 JSON

rust - 为什么没有为明确实现的类型实现特征?

json - 如何在没有中间结构的情况下有效地将 JSON 的一部分提取为 Vec?