java - 无法从队列创建热流

标签 java spring-mvc project-reactor

我有以下剩余 Controller ,它接收请求,将它们转换为 JSON 字符串并将它们放入并发队列中。 我想从这个队列中创建一个 Flux 并订阅它。
不幸的是,它不起作用。
我在这里做错了什么?

@RestController
public class EventController {
    private final ObjectMapper mapper = new ObjectMapper();
    private final FirehosePutService firehosePutService;
    private ConcurrentLinkedQueue<String> events = new ConcurrentLinkedQueue<>();
    private int batchSize = 10;

    @Autowired
    public EventController(FirehosePutService firehosePutService) {
        this.firehosePutService = firehosePutService;

        Flux<String> eventFlux = Flux.create((FluxSink<String> sink) -> {
            String next;
            while (( next = events.poll()) != null) {
                sink.next(next);
            }
        });

        eventFlux.publish().autoConnect().subscribe(new BaseSubscriber<String>() {
            int consumed;
            List<String> batchOfEvents = new ArrayList<>(batchSize);

            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                request(batchSize);
            }

            @Override
            protected void hookOnNext(String value) {
                batchOfEvents.add(value);
                consumed++;

                if (consumed == batchSize) {
                    batchOfEvents.addAll(events);
                    log.info("Consume {} elements. Size of batchOfEvents={}", consumed, batchOfEvents.size());
                    firehosePutService.saveBulk(batchOfEvents);
                    consumed = 0;
                    batchOfEvents.clear();
                    events.clear();

                    request(batchSize);
                }
            }

        });
    }

    @GetMapping(value = "/saveMany", produces = "text/html")
    public ResponseEntity<Void> saveMany(@RequestParam MultiValueMap<String, String> allRequestParams) throws JsonProcessingException {
        Map<String, String> paramValues = allRequestParams.toSingleValueMap();
        String reignnEvent = mapper.writeValueAsString(paramValues);
        events.add(reignnEvent);

        return new ResponseEntity<>(HttpStatus.OK);
    }
}

最佳答案

首先,您使用poll方法。如果队列为空,它不会阻塞并返回null。您循环收集直到第一个 null (即 while (next != null)),因此您的代码几乎立即退出循环,因为队列在开始时为空。您必须替换 polltake 正在阻塞,并将等待直到元素可用。

其次,当事件从队列中删除时,将调用 hookOnNext。但是,您尝试使用 batchOfEvents.addAll(events); 再次读取事件。此外,您还清除所有待处理事件 events.clear();

我建议您从 hookOnNext 方法中删除对 events 集合的所有直接访问。

<小时/>

为什么在这里使用 Flux?看起来过于复杂。您可以在此处使用普通线程

@Autowired
public EventController(FirehosePutService firehosePutService) {
    this.firehosePutService = firehosePutService;

    Thread persister = new Thread(() -> {
        List<String> batchOfEvents = new ArrayList<>(batchSize);

        String next;
        while (( next = events.take()) != null) {
            batchOfEvents.add(value);

            if (batchOfEvents.size() == batchSize) {
                log.info("Consume {} elements. Size of batchOfEvents={}", consumed, batchOfEvents.size());
                firehosePutService.saveBulk(batchOfEvents);
                batchOfEvents.clear();
            }
        }
    });
    persister.start();

}

关于java - 无法从队列创建热流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56441039/

相关文章:

java - jdbcTemplate 为 null 并抛出空指针异常

Spring MVC 与 Spring Integration HTTP : how to pass the queryString avoiding the "?" encoding with "%3F" ?

java - Mono<Mono<Object>> 如何订阅

java - 如何在 Java Reactor 中设置完全背压驱动的通量?

java - ViewModelProviders#of(Fragment) 类型错误

java - 如何在一个for循环中计算具有多个数组的数组内的值的数量?

java - 在 Java 中处理 String 数组元素

java - 在 Java 中将对象与字符串相互转换的公认做法?

java - 如何在 Heroku 上使用 Mysql DB 部署 Spring MVC

java - "text/event-stream"和 "application/stream+json"有什么区别