我正在使用 Vert.x 库打开 WebSocket 并使用其中的消息。消息处理可能会阻塞,因此我将这些消息传递给工作线程。我假设它们有一些具有预定义大小的有界队列,但它只是创建一个 LinkedList,如 vertx 核心库中的 TaskQueue.java 所示:
private final LinkedList<Task> tasks = new LinkedList<>();
因此,当我的进程成为缓慢的消费者而不是向源施加反压时,它就会耗尽内存 - 分析堆转储证实了这一点。我正在考虑根本不使用工作线程,尽管我对此感到有点惊讶 - 如果单个套接字发送了太多数据,我们可能想单独对其施加反压,阻塞事件循环也会惩罚其他套接字 - 或者也许我错过了一些东西/不了解如何正确使用该库?
添加:代码片段
好的,我有课 MarshallingStage.java
它在 webSocket 上注册为 textMessageHandler:
MarshallingStage marshallingStage = new MarshallingStage(ctx);
webSocket.textMessageHandler(marshallingStage);
因此 marshallingStage 实现 Handler<String>
,它解码 json 并将其传递给工作线程。
@Override
public void handle(String text) {
Message message;
try {
message = mUnmarshaller.unmarshall(text);
}
catch (MarshallingException e) {
mCtx.disconnect("failed to unmarshall "+ text, e);
return;
}
if (message != null) {
handleInbound(new MessageEvent(message), mCtx);
}
else {
log.error("{} unmarshalled null message", mCtx.getConnectionId());
}
}
@Override
public void handleInbound(PipelineEvent event, ProviderContext ctx) {
Vertx.currentContext().executeBlocking(
future -> process(event, ctx),
asyncResult -> {}
);
}
假设工作线程有一个无限的任务列表,如果事件循环线程从 tcp 缓冲区消耗消息的速度比工作线程消耗该列表中的任务的速度快,则列表会不断增长,直到进程耗尽内存。我真的不明白这怎么行得通。
最佳答案
你写错了:
@Override
public void handleInbound(PipelineEvent event, ProviderContext ctx) {
Vertx.currentContext().executeBlocking(
future -> process(event, ctx),
asyncResult -> {}
);
}
处理完成后,您需要实际调用 future.handle(Future.succeededFuture())
。
@Override
public void handleInbound(PipelineEvent event, ProviderContext ctx) {
Vertx.currentContext().executeBlocking(
future -> { process(event, ctx); future.handle(Future.succeededFuture()); },
asyncResult -> {}
);
}
否则您将陷入等待释放工作线程的困境。
关于java - Vert.x OutOfMemoryError 工作线程队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52987710/