asynchronous - 从 tokio 套接字反序列化

标签 asynchronous rust serde rust-tokio

我正在使用 tokio 来实现一个服务器,该服务器与使用 serde (bincode) 序列化的消息进行通信。没有异步和 future 我会做

extern crate tokio_io;
extern crate bincode;
extern crate serde;
extern crate bytes;
extern crate futures;
#[macro_use] extern crate serde_derive;

use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::io::{read_exact, write_all};
use bincode::{serialize, deserialize, deserialize_from, Infinite, serialized_size};
use std::io::Read;
use std::io::Cursor;
use futures::future::Future;

type Item = String; // Dummy, this is a complex struct with derived Serizalize
type Error = bincode::Error;

// This works
fn decode<R>(reader: &mut R) -> Result<Item, Error> where R: Read {
    let message: Item = deserialize_from(reader, Infinite)?;
    Ok(message)
}

fn main() {

    let ser = serialize("Test", Infinite).unwrap();
    let buf = Cursor::new(ser);

    let mut reader = std::io::BufReader::new(buf);

    println!("{:?}", decode(&mut reader))
}

但我需要的是一个decode 函数,它可以像 asyncronous socket 一样工作

// I need this since I get the reader from a (tokio) socket as
// let socket = TcpListener::bind(&addr, &handle).unwrap();
// let (reader, writer) = socket.split();
fn decode_async<R>(reader: R) -> Result<Item, Error> where R: AsyncRead {
    // Does not work:   
    let message: Item = deserialize_from(reader, Infinite)?;
    Ok(message)
}

我唯一的想法是在编码时手动将长度写入缓冲区,然后使用 read_exact:

// Encode with size
fn encode_async(item: &Item) -> Result<Vec<u8>, Error>{
    let size = serialized_size(item);
    let mut buf = serialize(&size, Infinite).unwrap();
    let ser = serialize(item, Infinite).unwrap();
    buf.extend(ser);
    Ok(buf)
}

// Decode with size
fn decode_async<R>(reader: R) -> Box<Future<Item = Item, Error = std::io::Error>>
    where R: AsyncRead + 'static {

    let read = read_exact(reader, vec![0u8; 8]).and_then(|(reader, buf)| {
        let size = deserialize::<u64>(&mut &buf[..]).unwrap();
        Ok((reader, size as usize))
    }).and_then(|(reader, size)| {
        read_exact(reader, vec![0u8; size])
    }).and_then(|(reader, buf)| {
        let item = deserialize(&mut &buf[..]).unwrap();
        Ok(item)
    });

    Box::new(read)
}

fn main() {

    let ser = encode_async(&String::from("Test")).unwrap();
    let buf = Cursor::new(ser);

    let mut reader = std::io::BufReader::new(buf);
    let dec = decode_async(reader).wait();

    println!("{:?}", dec)
}

有没有更好的方式实现解码?

最佳答案

deserialize_from无法处理 IO 错误,尤其是那种 WouldBlock这是由异步(非阻塞)Readerer 在等待更多数据时返回的。这是受接口(interface)限制:deserialize_from 不返回 Future 或部分状态,它返回完整解码的 Result 并且不会知道如何将 Readerer 与事件循环结合起来以在不忙循环的情况下处理 WouldBlock

理论上可以实现一个async_deserialize_from,但是不能使用serde提供的接口(interface),除非你提前读取完整的数据进行解码,否则会失败目的。

您需要使用 tokio_io::io::read_to_end 读取完整数据或 tokio_io::io::read_exact (您当前使用的是什么),如果您知道“无休止”流(或后跟其他数据的流)中编码数据的大小。

关于asynchronous - 从 tokio 套接字反序列化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48558605/

相关文章:

java - Netty 是否违反了 Future.cancel(...) 方法的约定?

javascript( promise )拼图

javascript - 在嵌套 Mongoose 函数完成之前完成循环

rust - 将宏参数传递给其他宏

rust - 有没有一种方法可以使用Serde定义标签字段?

rust - 为什么没有为明确实现的类型实现特征?

C# 套接字 : synchronous calls within asynchronous ones

recursion - 返回递归闭包的函数签名

rust - 为什么没有为明确实现的类型实现特征?

json - 使用 Serde 反序列化嵌套 JSON 结构时出现 "invalid type: map, expected a sequence"