java - Jersey SSE - 事件 Output.write 在发送第一条消息后抛出空指针

标签 java http tomcat servlets jersey

我已经使用 Jersey 实现了一个 Restful Web 界面,用于通过 HTTP 将从内部 JMS 发布者接收的消息发送到外部客户端。我已经成功地向 Java 客户端发送了一条测试消息,但是线程在完成 write() 执行之前抛出了一个空指针异常,关闭了连接并阻止了进一步的通信。

这是我的资源类:

@GET
@Path("/stream_data")
@Produces(SseFeature.SERVER_SENT_EVENTS)
public EventOutput getServerSentEvents(@Context ServletContext context){
    final EventOutput eventOutput = new EventOutput();
    new Thread( new ObserverThread(eventOutput, (MService) context.getAttribute("instance")) ).start();
    return eventOutput;
}

这是我线程的运行方法:

public class ObserverThread implements Observer, Runnable {
   //constructor sets eventOutput & mService objects
   //mService notifyObservers() called when JMS message received
   //text added to Thread's message queue to await sending to client 

   public void run() {
    try {
        String message = "{'symbol':'test','entryType'='0','price'='test'}";
        Thread.sleep(1000);
        OutboundEvent.Builder builder = new OutboundEvent.Builder();
        builder.mediaType(MediaType.APPLICATION_JSON_TYPE);
        builder.data(String.class, message);
        OutboundEvent event = builder.build();
        eventOutput.write(event);
        System.out.println(">>>>>>SSE CLIENT HAS BEEN REGISTERED!");
        mService.addObserver(this);
        while(!eventOutput.isClosed()){
            if(!updatesQ.isEmpty()){
                pushUpdate(updatesQ.dequeue());
            }
        }
        System.out.println("<<<<<<<SSE CLIENT HAS BEEN DEREGISTERED!");
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

这是我的客户端代码:

Client client = ClientBuilder.newBuilder().register(SseFeature.class).build();
    WebTarget target = client.target(url);
    EventInput eventInput = target.request().get(EventInput.class);
    try {
        while (!eventInput.isClosed()) {
            eventInput.setChunkType(MediaType.WILDCARD_TYPE);
            final InboundEvent inboundEvent = eventInput.read();
            if (inboundEvent != null) {
                String theString = inboundEvent.readData();
                System.out.println(theString + "\n");
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    }

我将“{'symbol':'test','entryType'='0','price'='test'}”测试消息打印到客户端控制台,但服务器随后在之前打印了 NullPointerException它可以打印“>>>>SSE Client registered”消息。这将关闭连接,以便客户端退出 while 循环并停止监听更新。

我将项目转换为 webapp 3.0 版本 facet,以便将支持异步的标记添加到 web.xml,但我收到相同的空指针错误。我倾向于认为这是由 servlet 在返回第一条消息后结束请求/响应对象引起的,证据显示在堆栈跟踪中:

Exception in thread "Thread-20" java.lang.NullPointerException
at org.apache.coyote.http11.InternalOutputBuffer.realWriteBytes(InternalOutputBuffer.java:741)
at org.apache.tomcat.util.buf.ByteChunk.flushBuffer(ByteChunk.java:434)
at org.apache.coyote.http11.InternalOutputBuffer.flush(InternalOutputBuffer.java:299)
at org.apache.coyote.http11.Http11Processor.action(Http11Processor.java:981)
at org.apache.coyote.Response.action(Response.java:183)
at org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:314)
at org.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:288)
at org.apache.catalina.connector.CoyoteOutputStream.flush(CoyoteOutputStream.java:98)
at org.glassfish.jersey.message.internal.CommittingOutputStream.flush(CommittingOutputStream.java:292)
at org.glassfish.jersey.server.ChunkedOutput$1.call(ChunkedOutput.java:241)
at org.glassfish.jersey.server.ChunkedOutput$1.call(ChunkedOutput.java:192)
at org.glassfish.jersey.internal.Errors.process(Errors.java:315)
at org.glassfish.jersey.internal.Errors.process(Errors.java:242)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:345)
at org.glassfish.jersey.server.ChunkedOutput.flushQueue(ChunkedOutput.java:192)
at org.glassfish.jersey.server.ChunkedOutput.write(ChunkedOutput.java:182)
at com.bpc.services.service.ObserverThread.run(MarketObserverThread.java:32)
at java.lang.Thread.run(Thread.java:745)
<<<<<<<SSE CLIENT HAS BEEN DEREGISTERED!

我也尝试测试 sse 广播公司。在这种情况下,我没有看到任何异常抛出,但是一旦收到第一条消息,连接就会关闭,这让我相信这是 servlet 中的某些东西强制关闭连接。谁能告诉我如何在服务器端调试它?

最佳答案

我有一个类似的问题,这似乎是 Jersey 为 ExecutorService 实例注入(inject)的 @Context 中的一个长期存在的错误。在他们当前的 Sse(版本 2.27)实现中,

class JerseySse implements Sse {

    @Context
    private ExecutorService executorService;

    @Override
    public OutboundSseEvent.Builder newEventBuilder() {
        return new OutboundEvent.Builder();
    }

    @Override
    public SseBroadcaster newBroadcaster() {
        return new JerseySseBroadcaster(executorService);
    }
}

executorService 字段从未初始化,因此 JerseySseBroadcaster 在我的例子中引发了一个 NullPointerException。我通过显式触发注入(inject)解决了这个错误。

如果您将 HK2 用于 CDI(Jersey 的默认设置),则上述问题的粗略草图可能类似于以下内容:

@Singleton
@Path("...")
public class JmsPublisher {

    private Sse sse;

    private SseBroadcaster broadcaster;

    private final ExecutorService executor;

    private final BlockingQueue<String> jmsMessageQueue;

    ...

    @Context
    public void setSse(Sse sse, ServiceLocator locator) {
        locator.inject(sse); // Inject sse.executorService
        this.sse = sse;
        this.broadcaster = sse.newBroadcaster();
    }

    ...

    @GET
    @Path("/stream_data")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public void register(SseEventSink eventSink) {
        broadcaster.register(eventSink);
    }

    ...

    @PostConstruct
    private void postConstruct() {
        executor.submit(() -> {
            try {
                while(true) {
                    String message = jmsMessageQueue.take();
                    broadcaster.broadcast(sse.newEventBuilder()
                        .mediaType(MediaType.APPLICATION_JSON_TYPE)
                        .data(String.class, message)
                        .build());
                }
            } catch(InterruptedException e) {
                Thread.currentThread().interrupt();
            }
       });
    }

    @PreDestroy
    private void preDestroy() {
        executor.shutdownNow();
    }

}

关于java - Jersey SSE - 事件 Output.write 在发送第一条消息后抛出空指针,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30999164/

相关文章:

http - 在 Google App Engine 上对封闭主体的读取无效

jakarta-ee - 对于 web.xml 中的 servlet 定义,<enabled>false</enabled> 的含义是什么?

java - 尝试获取 ftl 模板时未找到文件异常

java - 运算符(operator)困惑? !在声明的开头

jquery - 如何通过 AJAX 到 HTTP 请求 JSON 来预填充 Web 表单?

php - Internet Explorer 无法设置同名的 cookie

Apache Tomcat URI 编码 - UTF-8

linux - 外部无法访问tomcat

java - 当传递给带有可变参数的第二个泛型方法时,我在泛型方法中丢失了变量的类型

java - 流,检查两个对象列表是否具有相同的另一个对象的嵌套列表