rust - 如何配置 actix websocket 服务器的有效负载限制

标签 rust actix-web

我正在学习 Actix 并尝试创建 WebSocket 服务

代码片段:

启动服务器

pub async fn start(addr: &str) -> std::result::Result<(), IoError> {
    let connections = Connections::default().start();
    HttpServer::new(move || {
        App::new().service(
            web::resource("/ws/")
                .data(connections.clone())
                .route(web::get().to(ws_index)),
        )
    })
    .bind(addr)?
    .run()
    .await
}

处理程序

async fn ws_index(
    req: HttpRequest,
    stream: web::Payload,
    addr: web::Data<Addr<Connections>>,
) -> Result<HttpResponse, Error> {
    let id = Uuid::new_v4().to_simple().to_string();
    let client = Connection::new(id, addr.get_ref().clone());
    let resp = ws::start(client, &req, stream);
    resp
}

流处理器

impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Connection {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        match msg {
            Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
            Ok(ws::Message::Pong(_)) => self.hb = Instant::now(),
            Ok(ws::Message::Text(text)) => ctx.text(text),
            Ok(ws::Message::Close(reason)) => {
                ctx.stop();
                println!("Connection {} closed with reason {:?}", self.id, reason);
            }
            Err(e) => println!("Error: {}", e),
            _ => (),
        }
    }
}

现在 WebSocket 服务器正在运行,它可以接收文本并将其发回。但是,如果我发送大文本,服务器会记录“错误:有效负载达到大小限制。”。如何解决?

最佳答案

您必须使用特定编解码器创建 WebSocket,而不是使用 ws::start(),因为您可以在其中设置有效负载大小 https://docs.rs/actix-http/2.2.0/actix_http/ws/struct.Codec.html .

因此创建您的启动函数:

use actix_web::error::PayloadError;
use ws::{handshake, WebsocketContext};                        
use actix_http::ws::{Codec, Message, ProtocolError};
use bytes::Bytes;

fn start_with_codec<A, S>(actor: A, req: &HttpRequest, stream: S, codec: Codec) -> Result<HttpResponse, Error>
where
    A: Actor<Context = WebsocketContext<A>>
        + StreamHandler<Result<Message, ProtocolError>>,
    S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{   
    let mut res = handshake(req)?;
    Ok(res.streaming(WebsocketContext::with_codec(actor, stream, codec)))
} 

然后更改代码以调用新函数并传递编解码器:

async fn ws_index(
    req: HttpRequest,
    stream: web::Payload,
    addr: web::Data<Addr<Connections>>,
) -> Result<HttpResponse, Error> {
    let id = Uuid::new_v4().to_simple().to_string();
    let client = Connection::new(id, addr.get_ref().clone());
    let resp = start_with_codec(client, &req, stream, Codec::new().max_size(MAX_SIZE));
    resp
}

已更新

在大负载上,可以通过 Message::Continuation 变体将消息分解为几个较小的部分。

此变体包含具有以下变体的 Item 枚举:

pub enum Item {
    FirstText(Bytes),
    FirstBinary(Bytes),
    Continue(Bytes),
    Last(Bytes),
}

要重新编写原始消息,您必须收集从 FirstText/FirstBinary 到 Last 的所有片段,或者确保以正确的顺序将所有这些消息转发到将检索消息的目的地。

关于rust - 如何配置 actix websocket 服务器的有效负载限制,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67125073/

相关文章:

rust - 什么是 Rust 类型关键字?

rust - Actix actor 的错误处理和条件链接

rust - 在actix-web中用作查询参数时,如何将无效的枚举变量转换为“无”

rust - 有没有办法从闭包内部的函数返回?

rust - 将 HashMap 拆分为相等的 block

rust - "type mismatch"在二维数组上循环

types - rust :预期类型,发现不透明类型

rust - 如何在Rust中将变量传递给actix-web guard()?

rust - 如何为 actix-web HttpResponse 创建流以逐 block 发送文件?

rust - 双冒号在 Rust 中是什么意思?