我已经使用 Jersey 实现了一个 Restful Web 界面,用于通过 HTTP 将从内部 JMS 发布者接收的消息发送到外部客户端。我已经成功地向 Java 客户端发送了一条测试消息,但是线程在完成 write() 执行之前抛出了一个空指针异常,关闭了连接并阻止了进一步的通信。
这是我的资源类:
@GET
@Path("/stream_data")
@Produces(SseFeature.SERVER_SENT_EVENTS)
public EventOutput getServerSentEvents(@Context ServletContext context){
final EventOutput eventOutput = new EventOutput();
new Thread( new ObserverThread(eventOutput, (MService) context.getAttribute("instance")) ).start();
return eventOutput;
}
这是我线程的运行方法:
public class ObserverThread implements Observer, Runnable {
//constructor sets eventOutput & mService objects
//mService notifyObservers() called when JMS message received
//text added to Thread's message queue to await sending to client
public void run() {
try {
String message = "{'symbol':'test','entryType'='0','price'='test'}";
Thread.sleep(1000);
OutboundEvent.Builder builder = new OutboundEvent.Builder();
builder.mediaType(MediaType.APPLICATION_JSON_TYPE);
builder.data(String.class, message);
OutboundEvent event = builder.build();
eventOutput.write(event);
System.out.println(">>>>>>SSE CLIENT HAS BEEN REGISTERED!");
mService.addObserver(this);
while(!eventOutput.isClosed()){
if(!updatesQ.isEmpty()){
pushUpdate(updatesQ.dequeue());
}
}
System.out.println("<<<<<<<SSE CLIENT HAS BEEN DEREGISTERED!");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
这是我的客户端代码:
Client client = ClientBuilder.newBuilder().register(SseFeature.class).build();
WebTarget target = client.target(url);
EventInput eventInput = target.request().get(EventInput.class);
try {
while (!eventInput.isClosed()) {
eventInput.setChunkType(MediaType.WILDCARD_TYPE);
final InboundEvent inboundEvent = eventInput.read();
if (inboundEvent != null) {
String theString = inboundEvent.readData();
System.out.println(theString + "\n");
}
}
} catch (Exception e) {
e.printStackTrace();
}
我将“{'symbol':'test','entryType'='0','price'='test'}”测试消息打印到客户端控制台,但服务器随后在之前打印了 NullPointerException它可以打印“>>>>SSE Client registered”消息。这将关闭连接,以便客户端退出 while 循环并停止监听更新。
我将项目转换为 webapp 3.0 版本 facet,以便将支持异步的标记添加到 web.xml,但我收到相同的空指针错误。我倾向于认为这是由 servlet 在返回第一条消息后结束请求/响应对象引起的,证据显示在堆栈跟踪中:
Exception in thread "Thread-20" java.lang.NullPointerException
at org.apache.coyote.http11.InternalOutputBuffer.realWriteBytes(InternalOutputBuffer.java:741)
at org.apache.tomcat.util.buf.ByteChunk.flushBuffer(ByteChunk.java:434)
at org.apache.coyote.http11.InternalOutputBuffer.flush(InternalOutputBuffer.java:299)
at org.apache.coyote.http11.Http11Processor.action(Http11Processor.java:981)
at org.apache.coyote.Response.action(Response.java:183)
at org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:314)
at org.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:288)
at org.apache.catalina.connector.CoyoteOutputStream.flush(CoyoteOutputStream.java:98)
at org.glassfish.jersey.message.internal.CommittingOutputStream.flush(CommittingOutputStream.java:292)
at org.glassfish.jersey.server.ChunkedOutput$1.call(ChunkedOutput.java:241)
at org.glassfish.jersey.server.ChunkedOutput$1.call(ChunkedOutput.java:192)
at org.glassfish.jersey.internal.Errors.process(Errors.java:315)
at org.glassfish.jersey.internal.Errors.process(Errors.java:242)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:345)
at org.glassfish.jersey.server.ChunkedOutput.flushQueue(ChunkedOutput.java:192)
at org.glassfish.jersey.server.ChunkedOutput.write(ChunkedOutput.java:182)
at com.bpc.services.service.ObserverThread.run(MarketObserverThread.java:32)
at java.lang.Thread.run(Thread.java:745)
<<<<<<<SSE CLIENT HAS BEEN DEREGISTERED!
我也尝试测试 sse 广播公司。在这种情况下,我没有看到任何异常抛出,但是一旦收到第一条消息,连接就会关闭,这让我相信这是 servlet 中的某些东西强制关闭连接。谁能告诉我如何在服务器端调试它?
最佳答案
我有一个类似的问题,这似乎是 Jersey 为 ExecutorService
实例注入(inject)的 @Context
中的一个长期存在的错误。在他们当前的 Sse
(版本 2.27)实现中,
class JerseySse implements Sse {
@Context
private ExecutorService executorService;
@Override
public OutboundSseEvent.Builder newEventBuilder() {
return new OutboundEvent.Builder();
}
@Override
public SseBroadcaster newBroadcaster() {
return new JerseySseBroadcaster(executorService);
}
}
executorService
字段从未初始化,因此 JerseySseBroadcaster
在我的例子中引发了一个 NullPointerException
。我通过显式触发注入(inject)解决了这个错误。
如果您将 HK2 用于 CDI(Jersey 的默认设置),则上述问题的粗略草图可能类似于以下内容:
@Singleton
@Path("...")
public class JmsPublisher {
private Sse sse;
private SseBroadcaster broadcaster;
private final ExecutorService executor;
private final BlockingQueue<String> jmsMessageQueue;
...
@Context
public void setSse(Sse sse, ServiceLocator locator) {
locator.inject(sse); // Inject sse.executorService
this.sse = sse;
this.broadcaster = sse.newBroadcaster();
}
...
@GET
@Path("/stream_data")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void register(SseEventSink eventSink) {
broadcaster.register(eventSink);
}
...
@PostConstruct
private void postConstruct() {
executor.submit(() -> {
try {
while(true) {
String message = jmsMessageQueue.take();
broadcaster.broadcast(sse.newEventBuilder()
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(String.class, message)
.build());
}
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
@PreDestroy
private void preDestroy() {
executor.shutdownNow();
}
}
关于java - Jersey SSE - 事件 Output.write 在发送第一条消息后抛出空指针,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30999164/