websocket - 通过对 websockets 的多个引用向客户端发送消息

标签 websocket rust rust-tokio rust-actix actix-web

我的问题是关于将 actix-web 与 Rust 结合使用。

不幸的是,如果没有一个有点庞大的代码示例,我无法解释这一点,所以让我从这里开始。

struct MyWs {
    game: Arc<RwLock<Game>>,
}

impl Actor for MyWs {
    type Context = ws::WebsocketContext<Self>;
}

impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        match msg {
            Ok(ws::Message::Text(text)) => {
                debug!("Echoing text with {:?}", text);
                self.game.write().unwrap().some_method();
                ctx.text(text)
            },
            _ => (),
        }
    }
}

struct Game {
    websockets: Vec<Arc<RwLock<MyWs>>>,
}

impl Game {
    pub fn new() -> GameWrapper {
        GameWrapper {
            websockets: vec![],
        }
    }

    pub fn add_websocket(&mut self, my_ws: Arc<RwLock<MyWs>>) {
        self.websockets.push(my_ws);
    }

    pub fn some_method(&mut self) {
        // Do something to influence internal state.
        self.push_state();
    }

    pub fn push_state(&self) {
        for w in self.websockets {
            // I'm not sure about this part, idk how to access the
            // WebsocketContext with which I can send stuff back to the client.
            let game_state = get_game_state_or_something();
            w.write().unwrap().ctx.text(self.game_state);
        }
    }
}

struct GameWrapper {
    pub game: Arc<RwLock<Game>>,
}

impl GameWrapper {
    pub fn new(game: Arc<RwLock<Game>>) -> GameWrapper {
        GameWrapper { game }
    }
}

#[actix_rt::main]
async fn main() -> std::io::Result<()> {
    let game = Arc::new(RwLock::new(Game::new()));
    let game_wrapper = RwLock::new(GameWrapper::new(game.clone()));
    let game_wrapper_data = web::Data::new(game_wrapper);
    HttpServer::new(move || {
        App::new()
            .app_data(game_wrapper_data.clone())
            .route("/play_game", web::get().to(play_game))
    })
    .bind(ip_port)?
    .run()
    .await
}

pub async fn play_game(
    req: HttpRequest,
    stream: web::Payload,
    game_wrapper: web::Data<GameWrapper>,
) -> impl Responder {
    let my_ws = MyWs { game: game_wrapper.game.clone() };
    let my_ws = Arc::new(RwLock::new(my_ws));
    let mut game = game_wrapper.game.write().unwrap();
    game.add_websocket(my_ws);
    let resp = ws::start(my_ws, &req, stream);  // This is the problem.
    let resp = match resp {
        Ok(resp) => resp,
        Err(e) => return HttpResponse::from_error(e),
    };
    debug!("Successfully upgraded to websocket");
    resp
}

让我先解释一下我想做什么。当我的客户端连接时,我与他们建立了一个 websocket。我需要这些 websocket 的列表,因此当游戏中发生某些变化时,我可以将更新推送给所有客户端。

我将 play_game 函数绑定(bind)为 play_game 路由的处理程序。在此函数中,我将 HTTP get 请求升级为 websocket。在此之前,我复制了一个游戏的 Arc+RwLock 并将其传递到 MyWs,即 websocket 结构。您可以在 StreamHandler 的 MyWs impl 的 handle 函数中看到我修改了游戏(使用 some_method 函数)。到目前为止这很好。

当我尝试获取对 websocket 的多个引用时,事情发生了爆炸。您可以在 play_game 中看到我调用了 add_websocket,为 Game 提供了对它的引用,因此它可以在发生变化时将更新推送回所有客户端。例如,在调用 some_method 之后,我们将调用 push_updates。这样做的问题是,ws::start 不接受 Arc,它必须接受使用 WebSocketContext 实现 StreamHandler 的 Actor。

所以我的主要两个问题是:

  1. 我需要一种方法来保留对 websocket 的多个引用,以便我可以从多个位置(阅读:线程)与客户端通信。
  2. 我什至需要一些方法来做到这一点。我不确定在 actix 中如何在我的 MyWs Actor 的上下文之外实际将消息发送回客户端。框架将 WebSocketContext 传递给 handle,但我不知道如何自己动手。

我解决这个问题的想法:

  1. 在 MyWs 的handle(或started)函数中,将对 Context 的引用传递到 self.game 中。这不起作用,因为我要移出一个可变引用。
  2. 制作我自己的ws::start 可以引用。我还没有尝试过这个,因为看起来我最终会重写很多东西。
  3. 以某种方式在 Arc 上实现 Actor 和 StreamHandler,或者我自己的具有内部可变性的结构/允许我保留对它的多个引用的东西。

这并不能真正帮助我发回消息,因为我仍然不知道如何在 handle 函数的上下文之外通过 websocket 发回消息。

抱歉这个问题的长度。 tl;dr 是,我如何在 actix-web 中获取对 websocket 的多个引用并使用它们向客户端发送消息?

以下是我使用的每个组件的相关文档:

最佳答案

好吧,毫无疑问,解决我的困境的方法是改变我试图解决这个问题的方式。我真正需要的不是对 websocket 的多个引用,而是对每个持有 websocket 的参与者的引用。考虑到 Actix 是一个 actor 框架,我认为这就是您的本意。

这意味着代码应该是这样的:

impl Game {
    ...

    pub fn register_actor(&mut self, actor: Addr<MyWs>) {
        self.actors.push(actor);
    }
}

pub async fn play_game(
    req: HttpRequest,
    stream: web::Payload,
    game_wrapper: web::Data<GameWrapper>,
) -> impl Responder {
    let my_ws = MyWs { game: game_wrapper.game.clone() };
    let my_ws = Arc::new(RwLock::new(my_ws));
    let mut game = game_wrapper.game.write().unwrap();
    let res = ws::start_with_addr(my_ws, &req, stream);
    let (addr, resp) = match res {
        Ok(res) => res,
        Err(e) => return HttpResponse::from_error(e),
    };
    game_manager.register_actor(handle, addr);
    debug!("Successfully upgraded to websocket");
    resp
}

然后您可以通过 Addr<MyWs> 向 Actor 发送消息.

我会暂时离开这个问题,以防其他人对如何更好地完成整个事情有想法。

关于websocket - 通过对 websockets 的多个引用向客户端发送消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62364088/

相关文章:

rust - 暗示AsyncRead为补品::流

rust - 当只需要部分结构时,如何避免克隆整个大型结构以发送给线程?

javascript - C 中 Websocket 服务器的问题

javascript - 使用 socket.io 和 node.js 向特定客户端发送消息

websocket - 在 Wildfly 9 中通过 websocket 配置 STOMP

rust - Rust 的设计者选择符号 !/&&/|| 的原因是什么?而不是单词 not/and/or?

rust - 在给定数字范围的引用上使用迭代器的最有效方法是什么?

rust - 如何便宜地发送延迟消息?

spring - 如何在 Spring Boot 应用程序中向 STOMP CREATED 消息添加自定义 header ?

file - 如何使用 tokio::fs 复制文件