java - 用 Jersey SSE : Detect closed connection 广播

标签 java jersey server-sent-events

我相信这个问题不是 Server sent event with Jersey: EventOutput is not closed after client drops 的重复问题, 但可能与 Jersey Server-Sent Events - write to broken connection does not throw exception 有关.

chapter 15.4.2 Jersey 文档的 SseBroadcaster描述:

However, the SseBroadcaster internally identifies and handles also client disconnects. When a client closes the connection the broadcaster detects this and removes the stale connection from the internal collection of the registered EventOutputs as well as it frees all the server-side resources associated with the stale connection.

我无法证实这一点。在下面的测试用例中,我看到子类 SseBroadcasteronClose() 方法从未被调用:当 EventInput 关闭时没有,并且没有当广播另一条消息时。

public class NotificationsResourceTest extends JerseyTest {
    final static Logger log = LoggerFactory.getLogger(NotificationsResourceTest.class);

    final static CountingSseBroadcaster broadcaster = new CountingSseBroadcaster();

    public static class CountingSseBroadcaster extends SseBroadcaster { 
        final AtomicInteger connectionCounter = new AtomicInteger(0);

        public EventOutput createAndAttachEventOutput() {
            EventOutput output = new EventOutput();
            if (add(output)) {
                int cons = connectionCounter.incrementAndGet();
                log.debug("Active connection count: "+ cons);
            }
            return output;
        }

        @Override
        public void onClose(final ChunkedOutput<OutboundEvent> output) {
            int cons = connectionCounter.decrementAndGet();
            log.debug("A connection has been closed. Active connection count: "+ cons);
        }

        @Override
        public void onException(final ChunkedOutput<OutboundEvent> chunkedOutput, final Exception exception) {
            log.trace("An exception has been detected", exception);
        }

        public int getConnectionCount() {
            return connectionCounter.get();
        }
    }

    @Path("notifications")
    public static class NotificationsResource {

        @GET
        @Produces(SseFeature.SERVER_SENT_EVENTS)
        public EventOutput subscribe() {
            log.debug("New stream subscription");

            EventOutput eventOutput = broadcaster.createAndAttachEventOutput();
            return eventOutput;
        }
    }   

    @Override
    protected Application configure() {
        ResourceConfig config = new ResourceConfig(NotificationsResource.class);
        config.register(SseFeature.class);

        return config;
    }


    @Test
    public void test() throws Exception {
        // check that there are no connections
        assertEquals(0, broadcaster.getConnectionCount());

        // connect subscriber
        log.info("Connecting subscriber");
        EventInput eventInput = target("notifications").request().get(EventInput.class);
        assertFalse(eventInput.isClosed());

        // now there are connections
        assertEquals(1, broadcaster.getConnectionCount());

        // push data
        log.info("Broadcasting data");
        String payload = UUID.randomUUID().toString();
        OutboundEvent chunk = new OutboundEvent.Builder()
                .mediaType(MediaType.TEXT_PLAIN_TYPE)
                .name("message")
                .data(payload)
                .build();
        broadcaster.broadcast(chunk);

        // read data
        log.info("Reading data");
        InboundEvent inboundEvent = eventInput.read();
        assertNotNull(inboundEvent);
        assertEquals(payload, inboundEvent.readData());

        // close subscription 
        log.info("Closing subscription");
        eventInput.close();
        assertTrue(eventInput.isClosed());

        // at this point, the subscriber has disconnected itself, 
        // but jersey doesnt realise that
        assertEquals(1, broadcaster.getConnectionCount());

        // wait, give TCP a chance to close the connection
        log.debug("Sleeping for some time");
        Thread.sleep(10000);

        // push data again, this should really flush out the not-connected client
        log.info("Broadcasting data again");
        broadcaster.broadcast(chunk);
        Thread.sleep(100);

        // there is no subscriber anymore
        assertEquals(0, broadcaster.getConnectionCount());  // FAILS!
    }
}

也许 JerseyTest 不是测试它的好方法。在使用 JavaScript EventSource 的不太...临床设置中,我看到 onClose() 被调用,但仅在先前关闭的连接上广播消息之后.

我做错了什么?

为什么 SseBroadcaster 没有检测到客户端关闭连接?

跟进

我找到了 JERSEY-2833被拒绝,按设计工作:

According to the Jersey Documentation in SSE chapter (https://jersey.java.net/documentation/latest/sse.html) in 15.4.1 it's mentioned that Jersey does not explicitly close the connection, it's the responsibility of the resource method or the client.

这到底是什么意思?资源是否应该强制超时并终止所有 Activity 的和客户端关闭的连接?

最佳答案

在构造函数的文档中org.glassfish.jersey.media.sse.SseBroadcaster.SseBroadcaster() ,它说:

Creates a new instance. If this constructor is called by a subclass, it assumes the the reason for the subclass to exist is to implement onClose(org.glassfish.jersey.server.ChunkedOutput) and onException(org.glassfish.jersey.server.ChunkedOutput, Exception)methods, so it adds the newly created instance as the listener. To avoid this, subclasses may call SseBroadcaster(Class) passing their class as an argument.

所以你不应该保留默认构造函数并尝试在你的类中实现调用 super 的构造函数:

public CountingSseBroadcaster(){
    super(CountingSseBroadcaster.class);
}

关于java - 用 Jersey SSE : Detect closed connection 广播,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32770635/

相关文章:

java - 匿名内部类的集合操作

python - 为什么没有桶排序库(或者有?)

java - Tomcat 6 内存泄漏日志条目

java - 2014 年实现 Java REST Web 服务的最简单框架

java - Jersey 客户端读取超时,但 Apache HTTP 客户端连接正常

javascript - 事件流请求未在 passenger apache 下触发关闭事件

server-sent-events - 服务器发送的事件查询

java - Tomcat SSL 和 keystore

google-app-engine - Jersey 链接支持与 Google App Engine 问题

php - 如何在用户按下按钮后将进度更新从 PHP 推送到页面?