我正在使用 Jersey 来实现 SSE 场景。
服务器保持连接有效。并定期向客户端推送数据。
在我的场景中,有一个连接限制,只有一定数量的客户端可以同时订阅服务器。
因此,当新客户端尝试订阅时,我会检查(EventOutput.isClosed)以查看是否有任何旧连接不再处于 Activity 状态,以便它们可以为新连接腾出空间。
但是EventOutput.isClosed的结果总是false,除非客户端显式调用EventSource的close。这意味着如果客户端意外掉线(断电或网络中断),它仍然会占用连接,新客户端无法订阅。
有解决办法吗?
最佳答案
@崔鹏飞,
因此,在我试图自己寻找答案的旅行中,我偶然发现了一个存储库,该存储库解释了如何从断开连接的客户端优雅地清理连接。
将所有 SSE EventOutput 逻辑封装到服务/管理器中。在这里,他们启动了一个线程来检查 EventOutput 是否已被客户端关闭。如果是,他们正式关闭连接 (EventOutput#close())。如果不是,他们会尝试写入流。如果它抛出异常,则客户端已断开连接但未关闭,它会处理关闭它。如果写入成功,则 EventOutput 返回到池中,因为它仍然是 Activity 连接。
repo(和实际类)可用here .我还在下面包含了没有导入的类,以防 repo 被删除。
请注意,他们将 this 绑定(bind)到单例。该商店应该是全局唯一的。
public class SseWriteManager {
private final ConcurrentHashMap<String, EventOutput> connectionMap = new ConcurrentHashMap<>();
private final ScheduledExecutorService messageExecutorService;
private final Logger logger = LoggerFactory.getLogger(SseWriteManager.class);
public SseWriteManager() {
messageExecutorService = Executors.newScheduledThreadPool(1);
messageExecutorService.scheduleWithFixedDelay(new messageProcessor(), 0, 5, TimeUnit.SECONDS);
}
public void addSseConnection(String id, EventOutput eventOutput) {
logger.info("adding connection for id={}.", id);
connectionMap.put(id, eventOutput);
}
private class messageProcessor implements Runnable {
@Override
public void run() {
try {
Iterator<Map.Entry<String, EventOutput>> iterator = connectionMap.entrySet().iterator();
while (iterator.hasNext()) {
boolean remove = false;
Map.Entry<String, EventOutput> entry = iterator.next();
EventOutput eventOutput = entry.getValue();
if (eventOutput != null) {
if (eventOutput.isClosed()) {
remove = true;
} else {
try {
logger.info("writing to id={}.", entry.getKey());
eventOutput.write(new OutboundEvent.Builder().name("custom-message").data(String.class, "EOM").build());
} catch (Exception ex) {
logger.info(String.format("write failed to id=%s.", entry.getKey()), ex);
remove = true;
}
}
}
if (remove) {
// we are removing the eventOutput. close it is if it not already closed.
if (!eventOutput.isClosed()) {
try {
eventOutput.close();
} catch (Exception ex) {
// do nothing.
}
}
iterator.remove();
}
}
} catch (Exception ex) {
logger.error("messageProcessor.run threw exception.", ex);
}
}
}
public void shutdown() {
if (messageExecutorService != null && !messageExecutorService.isShutdown()) {
logger.info("SseWriteManager.shutdown: calling messageExecutorService.shutdown.");
messageExecutorService.shutdown();
} else {
logger.info("SseWriteManager.shutdown: messageExecutorService == null || messageExecutorService.isShutdown().");
}
}}
关于java - 服务器发送了带有 Jersey : EventOutput is not closed after client drops 的事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22275030/