rust - 如何阅读基于Tokio的Hyper请求的整个正文?

标签 rust hyper rust-tokio

我想使用Hyper的当前主分支编写服务器,该分支保存由POST请求传递的消息,并将此消息发送到每个传入的GET请求。

我有这个,主要是从 super 示例目录复制的:

extern crate futures;
extern crate hyper;
extern crate pretty_env_logger;

use futures::future::FutureResult;

use hyper::{Get, Post, StatusCode};
use hyper::header::{ContentLength};
use hyper::server::{Http, Service, Request, Response};
use futures::Stream;

struct Echo {
    data: Vec<u8>,
}

impl Echo {
    fn new() -> Self {
        Echo {
            data: "text".into(),
        }
    }
}

impl Service for Echo {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = FutureResult<Response, hyper::Error>;

    fn call(&self, req: Self::Request) -> Self::Future {
        let resp = match (req.method(), req.path()) {
            (&Get, "/") | (&Get, "/echo") => {
                Response::new()
                    .with_header(ContentLength(self.data.len() as u64))
                    .with_body(self.data.clone())
            },
            (&Post, "/") => {
                //self.data.clear(); // argh. &self is not mutable :(
                // even if it was mutable... how to put the entire body into it?
                //req.body().fold(...) ?
                let mut res = Response::new();
                if let Some(len) = req.headers().get::<ContentLength>() {
                    res.headers_mut().set(ContentLength(0));
                }
                res.with_body(req.body())
            },
            _ => {
                Response::new()
                    .with_status(StatusCode::NotFound)
            }
        };
        futures::future::ok(resp)
    }

}


fn main() {
    pretty_env_logger::init().unwrap();
    let addr = "127.0.0.1:12346".parse().unwrap();

    let server = Http::new().bind(&addr, || Ok(Echo::new())).unwrap();
    println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
    server.run().unwrap();
}

如何将req.body()(似乎是StreamChunks)转换为Vec<u8>?我认为我必须以某种方式返回消耗FutureStream并将其转换为单个Vec<u8>,也许使用fold()。但是我不知道该怎么做。

最佳答案

我将简化问题,只返回字节总数,而不是回显整个流。
future 0.3
super 0.13 + TryStreamExt::try_fold如果只希望将所有数据作为一个大块,请参阅euclio's answer有关 hyper::body::to_bytes 的信息。
访问流允许进行更细粒度的控制:

use futures::TryStreamExt; // 0.3.7
use hyper::{server::Server, service, Body, Method, Request, Response}; // 0.13.9
use std::convert::Infallible;
use tokio; // 0.2.22

#[tokio::main]
async fn main() {
    let addr = "127.0.0.1:12346".parse().expect("Unable to parse address");

    let server = Server::bind(&addr).serve(service::make_service_fn(|_conn| async {
        Ok::<_, Infallible>(service::service_fn(echo))
    }));

    println!("Listening on http://{}.", server.local_addr());

    if let Err(e) = server.await {
        eprintln!("Error: {}", e);
    }
}

async fn echo(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
    let (parts, body) = req.into_parts();
    match (parts.method, parts.uri.path()) {
        (Method::POST, "/") => {
            let entire_body = body
                .try_fold(Vec::new(), |mut data, chunk| async move {
                    data.extend_from_slice(&chunk);
                    Ok(data)
                })
                .await;

            entire_body.map(|body| {
                let body = Body::from(format!("Read {} bytes", body.len()));
                Response::new(body)
            })
        }
        _ => {
            let body = Body::from("Can only POST to /");
            Ok(Response::new(body))
        }
    }
}
不幸的是,当前Bytes的实现不再与 TryStreamExt::try_concat 兼容,因此我们必须切换回fold。
future 0.1
super 0.12 + Stream::concat2从Futures 0.1.14开始,您可以使用 Stream::concat2 将所有数据整合为一个:
fn concat2(self) -> Concat2<Self>
where
    Self: Sized,
    Self::Item: Extend<<Self::Item as IntoIterator>::Item> + IntoIterator + Default, 
use futures::{
    future::{self, Either},
    Future, Stream,
}; // 0.1.25

use hyper::{server::Server, service, Body, Method, Request, Response}; // 0.12.20

use tokio; // 0.1.14

fn main() {
    let addr = "127.0.0.1:12346".parse().expect("Unable to parse address");

    let server = Server::bind(&addr).serve(|| service::service_fn(echo));

    println!("Listening on http://{}.", server.local_addr());

    let server = server.map_err(|e| eprintln!("Error: {}", e));
    tokio::run(server);
}

fn echo(req: Request<Body>) -> impl Future<Item = Response<Body>, Error = hyper::Error> {
    let (parts, body) = req.into_parts();

    match (parts.method, parts.uri.path()) {
        (Method::POST, "/") => {
            let entire_body = body.concat2();
            let resp = entire_body.map(|body| {
                let body = Body::from(format!("Read {} bytes", body.len()));
                Response::new(body)
            });
            Either::A(resp)
        }
        _ => {
            let body = Body::from("Can only POST to /");
            let resp = future::ok(Response::new(body));
            Either::B(resp)
        }
    }
}
您还可以通过BytesVec<u8>转换为entire_body.to_vec(),然后将其转换为String
也可以看看:
  • How do I convert a Vector of bytes (u8) to a string

  • super 0.11 + Stream::fold类似于Iterator::fold Stream::fold 带有一个累加器(称为o​​jit_code)和一个对累加器进行操作的函数以及流中的一项。该函数的结果必须是另一个具有与原始错误类型相同的错误类型的将来。总的结果本身就是 future 。
    fn fold<F, T, Fut>(self, init: T, f: F) -> Fold<Self, F, Fut, T>
    where
        F: FnMut(T, Self::Item) -> Fut,
        Fut: IntoFuture<Item = T>,
        Self::Error: From<Fut::Error>,
        Self: Sized,
    
    我们可以使用init作为累加器。 VecBody实现返回Stream。这实现了Chunk,因此我们可以使用它来将每个块的数据附加到Deref<[u8]>
    extern crate futures; // 0.1.23
    extern crate hyper;   // 0.11.27
    
    use futures::{Future, Stream};
    use hyper::{
        server::{Http, Request, Response, Service}, Post,
    };
    
    fn main() {
        let addr = "127.0.0.1:12346".parse().unwrap();
    
        let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
        println!(
            "Listening on http://{} with 1 thread.",
            server.local_addr().unwrap()
        );
        server.run().unwrap();
    }
    
    struct Echo;
    
    impl Service for Echo {
        type Request = Request;
        type Response = Response;
        type Error = hyper::Error;
        type Future = Box<futures::Future<Item = Response, Error = Self::Error>>;
    
        fn call(&self, req: Self::Request) -> Self::Future {
            match (req.method(), req.path()) {
                (&Post, "/") => {
                    let f = req.body()
                        .fold(Vec::new(), |mut acc, chunk| {
                            acc.extend_from_slice(&*chunk);
                            futures::future::ok::<_, Self::Error>(acc)
                        })
                        .map(|body| Response::new().with_body(format!("Read {} bytes", body.len())));
    
                    Box::new(f)
                }
                _ => panic!("Nope"),
            }
        }
    }
    
    您还可以将Vec Vec<u8>转换为body
    也可以看看:
  • How do I convert a Vector of bytes (u8) to a string

  • 输出
    从命令行调用时,我们可以看到结果:
    $ curl -X POST --data hello http://127.0.0.1:12346/
    Read 5 bytes
    
    警告
    所有这些解决方案都允许恶意的最终用户发布无限大小的文件,这将导致计算机内存不足。根据预期的用途,您可能希望对读取的字节数设置某种上限,从而有可能在某个断点处写入文件系统。
    也可以看看:
  • How do I apply a limit to the number of bytes read by futures::Stream::concat2?
  • 关于rust - 如何阅读基于Tokio的Hyper请求的整个正文?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63210038/

    相关文章:

    Rust:预期类型 [X],但发现类型 [X]

    node.js - [tokio-rs] [文档]具有共享状态示例的多个异步 “sub-apps”?

    loops - 你会如何在 Rust 中编写这个 C++ 循环的等价物

    arrays - 使用索引数组并行写入数组

    rust - while 循环中变量的突变

    rust - 如何阅读基于 Tokio 的 Hyper 请求的整个主体?

    rust - 为什么在 hyper 中匹配请求路径后没有任何反应?

    electron - 如何使用 Hyper 编写 Electron 应用程序?

    rust - 为什么编译器不推断 impl 特征返回值的关联类型的具体类型?

    rust - 如何向 tokio-io 添加特殊的 NotReady 逻辑?