java - Vert.x OutOfMemoryError 工作线程队列

标签 java vert.x

我正在使用 Vert.x 库打开 WebSocket 并使用其中的消息。消息处理可能会阻塞,因此我将这些消息传递给工作线程。我假设它们有一些具有预定义大小的有界队列,但它只是创建一个 LinkedList,如 vertx 核心库中的 TaskQueue.java 所示:

private final LinkedList<Task> tasks = new LinkedList<>();

因此,当我的进程成为缓慢的消费者而不是向源施加反压时,它就会耗尽内存 - 分析堆转储证实了这一点。我正在考虑根本不使用工作线程,尽管我对此感到有点惊讶 - 如果单个套接字发送了太多数据,我们可能想单独对其施加反压,阻塞事件循环也会惩罚其他套接字 - 或者也许我错过了一些东西/不了解如何正确使用该库?

添加:代码片段 好的,我有课 MarshallingStage.java它在 webSocket 上注册为 textMessageHandler:

MarshallingStage marshallingStage = new MarshallingStage(ctx);
webSocket.textMessageHandler(marshallingStage);

因此 marshallingStage 实现 Handler<String> ,它解码 json 并将其传递给工作线程。

@Override
public void handle(String text) {
    Message message;
    try {
        message = mUnmarshaller.unmarshall(text);
    }
    catch (MarshallingException e) {
        mCtx.disconnect("failed to unmarshall "+ text, e);
        return;
    }
    if (message != null) {
        handleInbound(new MessageEvent(message), mCtx);
    }
    else {
        log.error("{} unmarshalled null message", mCtx.getConnectionId());
    }
}

@Override
public void handleInbound(PipelineEvent event, ProviderContext ctx) {
    Vertx.currentContext().executeBlocking(
            future -> process(event, ctx),
            asyncResult -> {}
    );
}

假设工作线程有一个无限的任务列表,如果事件循环线程从 tcp 缓冲区消耗消息的速度比工作线程消耗该列表中的任务的速度快,则列表会不断增长,直到进程耗尽内存。我真的不明白这怎么行得通。

最佳答案

你写错了:

@Override
public void handleInbound(PipelineEvent event, ProviderContext ctx) {
    Vertx.currentContext().executeBlocking(
            future -> process(event, ctx),
            asyncResult -> {}
    );
}

处理完成后,您需要实际调用 future.handle(Future.succeededFuture())

@Override
public void handleInbound(PipelineEvent event, ProviderContext ctx) {
    Vertx.currentContext().executeBlocking(
            future -> { process(event, ctx); future.handle(Future.succeededFuture()); },
            asyncResult -> {}
    );
}

否则您将陷入等待释放工作线程的困境。

关于java - Vert.x OutOfMemoryError 工作线程队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52987710/

相关文章:

java - 如何定期更改页面标题并打开/关闭更改?

java - Spring 数据 : is it possible to have subqueries in the Query annotation?

java - Vertx.io 核心 vs reactivex verticle 在 jUnit 中的使用

logging - 如何关联分布式 Vertx 系统中的日志事件

java - Vertx 使用 Log4j 配置日志记录

mysql - Quarkus 响应式(Reactive)客户端 MySql 客户端未插入记录

Java线程输出

java - 检索工件项目作为属性

java - Spring Data 和 JDBC/ORM 模块的区别

java - 从哪里获取用于在 Java 或 Kotlin 中验证 JWT token 的公钥