java - Spring sseEmitter,调用方法发送后不会立即发送事件

标签 java spring spring-mvc server-sent-events

我正在尝试使用 Spring 4(tomcat 7、servlet-api 3.0.1)制作服务器发送的事件。

问题是我的 Events 没有在方法 send 被调用后立即发送。它们仅在 SseEmitter 超时后同时(具有相同的时间戳)到达客户端,并带有 EventSource 的错误事件。然后客户端正在尝试重新连接。知道发生了什么吗?

我创建了一个简单的服务:

@RequestMapping(value = "subscribe", method = RequestMethod.GET)
public SseEmitter subscribe () throws IOException {
    final SseEmitter emitter = new SseEmitter();
    Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                emitter.send(SseEmitter.event().data("Thread writing: " + Thread.currentThread()).name("ping"));
            } catch (Exception e) {
            }
        }
    } , 1000, 1000, TimeUnit.MILLISECONDS);
    return emitter;
}

使用客户端代码:

sse = new EventSource(urlBuilder(base, url));
sse.addEventListener('ping', function (event) {
    dfd.notify(event);
});

sse.addEventListener('message', function(event){
    dfd.notify(event);
});

sse.addEventListener('close', function(event){
    dfd.notify(event);
});

sse.onerror = function (error) {
    console.log(error);
};

sse.onmessage = function (event){
    dfd.notify(event);
};

应用初始化代码

public class WebAppInitializer implements WebApplicationInitializer {
    @Override
    public void onStartup(ServletContext servletContext) throws ServletException {
        AnnotationConfigWebApplicationContext ctx = new AnnotationConfigWebApplicationContext();
        ctx.register(AppConfig.class);
        ctx.setServletContext(servletContext);
        ctx.refresh();

        ServletRegistration.Dynamic dynamic = servletContext.addServlet("dispatcher", new DispatcherServlet(ctx));
        dynamic.setAsyncSupported(true);
        dynamic.addMapping("/api/*");
        dynamic.setLoadOnStartup(1);
        dynamic.setMultipartConfig(ctx.getBean(MultipartConfigElement.class));

        javax.servlet.FilterRegistration.Dynamic filter = servletContext
                .addFilter("StatelessAuthenticationFilter",
                        ctx.getBean("statelessAuthenticationFilter", StatelessAuthenticationFilter.class));
        filter.setAsyncSupported(true);
        filter.addMappingForUrlPatterns(null, false, "/api/*");

        filter = servletContext.addFilter("HibernateSessionRequestFilter",
                ctx.getBean("hibernateSessionRequestFilter", HibernateSessionRequestFilter.class));
        filter.setAsyncSupported(true);
        filter.addMappingForUrlPatterns(null, false, "/api/user/*");
    }
}

AppConfig.java

@Configuration
@ComponentScan("ru.esoft.workflow")
@EnableWebMvc
@PropertySource({"classpath:mail.properties", "classpath:fatclient.properties"})
@EnableAsync
@EnableScheduling
public class AppConfig extends WebMvcConfigurerAdapter {
...
}

我的客户端日志图片: enter image description here

最佳答案

我在测试 SSEEmitter 时遇到了这个问题。从我在网上阅读的所有内容来看,SSEEmitter 旨在与 Reactive Streams 的某些实现结合使用,例如 RxJava .这有点复杂,但绝对有效。这个想法是您创建发射器和一个 Observable,并将后者订阅到 Publisher。 Publisher 在一个单独的线程中执行它的行为,当输出准备好时通知 Observable,并且 Observable 触发 emitter.send。这是一个示例代码段,它应该可以满足您的要求:

@RequestMapping("/whatever")
public SseEmitter index(    
    SseEmitter emitter = new SseEmitter();
    Publisher<String> responsePublisher = someResponseGenerator.getPublisher();
    Observable<String> responseObservable = RxReactiveStreams.toObservable(responsePublisher);

    responseObservable.subscribe(
        str -> {
            try {
                emitter.send(str);
            } catch (IOException ex) {
                emitter.completeWithError(ex);
            }
        },
        error -> {
            emitter.completeWithError(error);
        },
        emitter::complete
        );

        return emitter;
};

这是相应的发布者:

public class SomeResponseGenerator {    
    public Publisher<String> getPublisher() {
        Publisher<String> pub = new Publisher<String>() {
            @Override
            public void subscribe(Subscriber subscriber) {
                Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        subscriber.onNext("Thread writing: " + Thread.currentThread().getName());
                    }
                }, 1000, 1000, TimeUnit.MILLISECONDS);
            }
        };

        return pub;
    }
}

网上有几个这个模型的例子herehere ,您可以通过谷歌搜索“RxJava SseEmitter”找到更多信息。理解 Reactive Streams/RxJava/SseEmitter 交互需要一些时间,但一旦你完成了它就非常优雅了。希望这能让您走上正确的道路!

关于java - Spring sseEmitter,调用方法发送后不会立即发送事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34168903/

相关文章:

java - 读取蓝色棱镜中的java表

java - 分配Java类的二维数组

java - Spring Security 身份验证后出现 HTTP Status 404 错误

spring - 如何在不将图像存储在本地磁盘的情况下使用spring MultipartFile将图像上传到ftp服务器

java - 有什么方法可以用 2 个嵌套循环来加速算法吗?

java - JFrame 无法为多个客户端加载

java - 从 java 杀死 linux 进程在 Runnable JAR 中不起作用

java - 如何使用 spring-integration 重试套接字服务器创建?

java - 使用 Spring Social Framework 开发 Jenkins 插件时出现 NoSuchMethodError

java - 检索数据时未选中 Spring MVC 表单复选框