rust - 为 Rust 的 hyper http crate 使用自定义传输器

标签 rust hyper

ps:下面的答案有帮助,但这不是我需要的答案,我有一个新问题,我编辑了问题
我正在尝试为 hyper http crate 制作一个定制的运输车,所以我可以用我自己的方式传输http数据包。
Hyper的http客户端可以通过自定义https://docs.rs/hyper/0.14.2/hyper/client/connect/trait.Connect.html这里:pub fn build<C, B>(&self, connector: C) -> Client<C, B> where C: Connect + Clone, B: HttpBody + Send, B::Data: Send, 如果我们看

impl<S, T> Connect for S where    

S: Service<Uri, Response = T> + Send + 'static,    

S::Error: Into<Box<dyn StdError + Send + Sync>>,    

S::Future: Unpin + Send,    

T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, 
型号 T ,这是 Response 的类型, 必须执行 AsyncRead + AsyncWrite ,所以我选择了 type Response = Cursor<Vec<u8>> .
这是我的定制运输车,带有 Response类型 std::io::Cursor包裹在 CustomResponse所以我可以实现 AsyncWriteAsyncRead给它:
use hyper::service::Service;
use core::task::{Context, Poll};
use core::future::Future;
use std::pin::Pin;
use std::io::Cursor;
use hyper::client::connect::{Connection, Connected};
use tokio::io::{AsyncRead, AsyncWrite};

#[derive(Clone)]
pub struct CustomTransporter;

unsafe impl Send for CustomTransporter {}

impl CustomTransporter {
    pub fn new() -> CustomTransporter {
        CustomTransporter{}
    }
}

impl Connection for CustomTransporter {
    fn connected(&self) -> Connected {
        Connected::new()
    }
}

pub struct CustomResponse {
    //w: Cursor<Vec<u8>>,
    v: Vec<u8>,
    i: i32
}

unsafe impl Send for CustomResponse {
    
}

impl Connection for CustomResponse {
    fn connected(&self) -> Connected {
        println!("connected");
        Connected::new()
    }
}

impl AsyncRead for CustomResponse {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>
    ) -> Poll<std::io::Result<()>> {
        self.i+=1;
        if self.i >=3 {
            println!("poll_read for buf size {}", buf.capacity());
            buf.put_slice(self.v.as_slice());
            println!("did poll_read");
            Poll::Ready(Ok(()))
        } else {
            println!("poll read pending, i={}", self.i);
            Poll::Pending
        }
    }
}

impl AsyncWrite for CustomResponse {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8]
    ) -> Poll<Result<usize, std::io::Error>>{
        //let v = vec!();
        println!("poll_write____");

        let s = match std::str::from_utf8(buf) {
            Ok(v) => v,
            Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
        };

        println!("result: {}, size: {}, i: {}", s, s.len(), self.i);
        if self.i>=0{
            //r
            Poll::Ready(Ok(s.len()))
        }else{
            println!("poll_write pending");
            Poll::Pending
        }
    }
    fn poll_flush(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Result<(), std::io::Error>> {
        println!("poll_flush");
        if self.i>=0{
            println!("DID poll_flush");
            Poll::Ready(Ok(()))
        }else{
            println!("poll_flush pending");
            Poll::Pending
        }
    }

    fn poll_shutdown(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Result<(), std::io::Error>>
    {
        println!("poll_shutdown");
        Poll::Ready(Ok(()))
    }
}


impl Service<hyper::Uri> for CustomTransporter {
    type Response = CustomResponse;
    type Error = hyper::http::Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        println!("poll_ready");
        Poll::Ready(Ok(()))
        //Poll::Pending
    }

    fn call(&mut self, req: hyper::Uri) -> Self::Future {
        println!("call");
        // create the body
        let body: Vec<u8> = "HTTP/1.1 200 OK\nDate: Mon, 27 Jul 2009 12:28:53 GMT\nServer: Apache/2.2.14 (Win32)\nLast-Modified: Wed, 22 Jul 2009 19:15:56 GMT\nContent-Length: 88\nContent-Type: text/html\nConnection: Closed<html><body><h1>Hello, World!</h1></body></html>".as_bytes()
            .to_owned();
        // Create the HTTP response
        let resp = CustomResponse{
            //w: Cursor::new(body),
            v: body,
            i: 0
        };
         
        // create a response in a future.
        let fut = async move{
            Ok(resp)
        };
        println!("gonna return from call");
        // Return the response as an immediate future
        Box::pin(fut)
    }
}
然后我像这样使用它:
let connector = CustomTransporter::new();
let client: Client<CustomTransporter, hyper::Body> = Client::builder().build(connector);
let mut res = client.get(url).await.unwrap();
但是,它卡住了,hyper 永远不会读取我的响应,而是将 GET 写入其中。
这是一个完整的测试项目: https://github.com/lzunsec/rust_hyper_custom_transporter/blob/39cd036fc929057d975a71969ccbe97312543061/src/custom_req.rs
像这样运行:
cargo run http://google.com

最佳答案

I cannot simply implement Send to Future, and I cannot change Future by a wrapper. What should I do here?


看起来问题出在您的 Service::Future缺少 Send约束。 future 正在返回 call已经是 Send所以它将与简单的更改一起工作:
impl Service<hyper::Uri> for CustomTransporter {
    type Response = CustomResponse;
    type Error = hyper::http::Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
                                                                                  // ^^^^
    ...

您的代码还有一些其他错误:未推断 vec!() , self: Pin<...>失踪 mut , CustomResponse应该是 pub ...
您可以指定 Bclient通过使用推理:
let client: Client<CustomTransporter, hyper::Body> = Client::builder().build(connector);
或者通过在 build 上使用 turbofish 运算符:
let client = Client::builder().build::<CustomTransporter, hyper::Body>(connector);

我对创建自定义 super 传输的了解还不够多,无法知道其功能是否正常,但这些修复使其可以编译。希望它能帮助你取得进步。

关于rust - 为 Rust 的 hyper http crate 使用自定义传输器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66002357/

相关文章:

rust - 是 Rust 中的外部 crate 序列影响吗

rust - Rust 中精确的内存布局控制?

http - 错误: Could not find main or io in tokio,无效的返回类型 `impl Future`

terminal - 如何更改我的 super 终端(从 hyper.is 下载)的主题?

rust - 使用 Hyper 的 Rust 客户端证书

rust - 如何指定仅二进制依赖项?

rust - Rust 目标组件的源代码在哪里?

rust - 通过原始指针克隆类型删除的 Arc 是否安全?

rust - actix-web 处理程序中的 HTTP 请求 -> 多个执行程序一次 : EnterError

rust - 使用 Hyper 发出请求时如何设置 User-Agent header ?