我有以下剩余 Controller ,它接收请求,将它们转换为 JSON 字符串并将它们放入并发队列中。
我想从这个队列中创建一个 Flux 并订阅它。
不幸的是,它不起作用。
我在这里做错了什么?
@RestController
public class EventController {
private final ObjectMapper mapper = new ObjectMapper();
private final FirehosePutService firehosePutService;
private ConcurrentLinkedQueue<String> events = new ConcurrentLinkedQueue<>();
private int batchSize = 10;
@Autowired
public EventController(FirehosePutService firehosePutService) {
this.firehosePutService = firehosePutService;
Flux<String> eventFlux = Flux.create((FluxSink<String> sink) -> {
String next;
while (( next = events.poll()) != null) {
sink.next(next);
}
});
eventFlux.publish().autoConnect().subscribe(new BaseSubscriber<String>() {
int consumed;
List<String> batchOfEvents = new ArrayList<>(batchSize);
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(batchSize);
}
@Override
protected void hookOnNext(String value) {
batchOfEvents.add(value);
consumed++;
if (consumed == batchSize) {
batchOfEvents.addAll(events);
log.info("Consume {} elements. Size of batchOfEvents={}", consumed, batchOfEvents.size());
firehosePutService.saveBulk(batchOfEvents);
consumed = 0;
batchOfEvents.clear();
events.clear();
request(batchSize);
}
}
});
}
@GetMapping(value = "/saveMany", produces = "text/html")
public ResponseEntity<Void> saveMany(@RequestParam MultiValueMap<String, String> allRequestParams) throws JsonProcessingException {
Map<String, String> paramValues = allRequestParams.toSingleValueMap();
String reignnEvent = mapper.writeValueAsString(paramValues);
events.add(reignnEvent);
return new ResponseEntity<>(HttpStatus.OK);
}
}
最佳答案
首先,您使用poll
方法。如果队列为空,它不会阻塞并返回null
。您循环收集直到第一个 null
(即 while (next != null)
),因此您的代码几乎立即退出循环,因为队列在开始时为空。您必须替换 poll
和 take
正在阻塞,并将等待直到元素可用。
其次,当事件从队列中删除时,将调用 hookOnNext
。但是,您尝试使用 batchOfEvents.addAll(events);
再次读取事件。此外,您还清除所有待处理事件 events.clear();
我建议您从 hookOnNext
方法中删除对 events
集合的所有直接访问。
为什么在这里使用 Flux?看起来过于复杂。您可以在此处使用普通线程
@Autowired
public EventController(FirehosePutService firehosePutService) {
this.firehosePutService = firehosePutService;
Thread persister = new Thread(() -> {
List<String> batchOfEvents = new ArrayList<>(batchSize);
String next;
while (( next = events.take()) != null) {
batchOfEvents.add(value);
if (batchOfEvents.size() == batchSize) {
log.info("Consume {} elements. Size of batchOfEvents={}", consumed, batchOfEvents.size());
firehosePutService.saveBulk(batchOfEvents);
batchOfEvents.clear();
}
}
});
persister.start();
}
关于java - 无法从队列创建热流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56441039/