java - 如何根据发出的事件有条件地缓冲分组的 Observable/Flux?

标签 java rx-java reactive-programming rx-java2 project-reactor

我正在尝试根据以下信息编写 react 流:

我们有一个实体事件流,其中每个事件都包含其实体的 ID 和一个 INTENT 或 COMMIT 类型。假定具有给定 ID 的 COMMIT 之前总是有一个或多个具有相同 ID 的 INTENT。收到 INTENT 时,应按其 ID 对其进行分组,并应打开该组的“缓冲区”。当接收到同一组的 COMMIT 或配置的超时已过时,缓冲区应“关闭”。应发出生成的缓冲区。

请注意,在收到关闭 COMMIT 之前可能会收到多个 INTENT。 (编辑:) bufferDuration 应该保证在 bufferDuration 时间过去后发出任何“打开的”缓冲区,因为收到打开缓冲区的 INTENT,有或没有 COMMIT .

我最近的尝试如下:

public EntityEventBufferFactory {
    private final Duration bufferDuration;

    public EntityEventBufferFactory(Duration bufferDuration) {
        this.bufferDuration = bufferDuration;
    }

    public Flux<List<EntityEvent>> createGroupBufferFlux(Flux<EntityEvent> eventFlux) {
        return eventFlux.groupBy(EntityEvent::getId)
            .map(groupedFlux -> createGroupBuffer(groupedFlux))
            .flatMap(Function.identity());
    }

    protected Flux<List<EntityEvent>> createGroupBuffer(Flux<EntityEvent> groupFlux) {
        return groupFlux.publish().buffer(groupFlux.filter(this::shouldOpenBufferOnEvent), createGroupBufferCloseSelector(groupFlux));
    }

    protected Function<EntityEvent, Publisher<EntityEvent>> createGroupBufferCloseSelector(Flux<EntityEvent> groupFlux) {
        return event -> Flux.firstEmitting(Flux.just(event).delay(bufferDuration), groupFlux.filter(this::shouldCloseBufferOnEvent).publish());
    }

    protected boolean shouldOpenBufferOnEvent(EntityEvent entityEvent) {
        return entityEvent.getEventType() == EventType.INTENT;
    }

    protected boolean shouldCloseBufferOnEvent(EntityEvent entityEvent) {
        return entityEvent.getEventType() == EventType.COMMIT;
    }
}

这是我试图通过的测试:

@Test
public void entityEventsCanBeBuffered() throws Exception {
    FluxProcessor<EntityEvent, EntityEvent> eventQueue = UnicastProcessor.create();

    Duration bufferDuration = Duration.ofMillis(250);

    Flux<List<EntityEvent>> bufferFlux = new EntityEventBufferFactory(bufferDuration).createGroupBufferFlux(eventQueue);
    bufferFactory.setBufferDuration(bufferDuration);

    List<List<EntityEvent>> buffers = new ArrayList<>();
    bufferFlux.subscribe(buffers::add);

    EntityEvent intent = new EntityEvent();
    intent.setId("SOME_ID");
    intent.setEventType(EventType.INTENT);

    EntityEvent commit = new EntityEvent();
    commit.setId(intent.getId());
    commit.setEventType(EventType.COMMIT);

    eventQueue.onNext(intent);
    eventQueue.onNext(commit);

    eventQueue.onNext(intent);
    eventQueue.onNext(commit);

    Thread.sleep(500);

    assertEquals(2, buffers.size());
    assertFalse(buffers.get(0).isEmpty());
    assertFalse(buffers.get(1).isEmpty());
}

通过这个测试,我得到了两个发射缓冲区,但它们都是空的。您会注意到,在深入研究之后,我不得不在某些点添加 .publish(),以免 Reactor 出现异常,提示 This processor allows only a single Subscriber。这个问题的答案,RxJava: "java.lang.IllegalStateException: Only one subscriber allowed!" ,是促使我采用这种方法的原因。

我目前正在使用 Reactor,但我认为这与使用 Observable 和同名方法的 RxJava 一对一转换。

有什么想法吗?

最佳答案

我认为这是 Rx groupBy 的最终用例。来自文档:

Groups the items emitted by a Publisher according to a specified criterion, and emits these grouped items as GroupedFlowables. The emitted GroupedPublisher allows only a single Subscriber during its lifetime and if this Subscriber cancels before the source terminates, the next emission by the source having the same key will trigger a new GroupedPublisher emission.

在你的例子中,这个标准是 ID,并且在每个 GroupedPublisher 上发出你 takeUntil 类型是 COMMIT:

source
.groupBy(EntityEvent::getId)
.flatMap(group -> 
    group
    .takeUntil(Flowable.timer(10,TimeUnit.SECONDS))
    .takeUntil(this::shouldCloseBufferOnEvent)
    .toList())

编辑:添加时间条件。

关于java - 如何根据发出的事件有条件地缓冲分组的 Observable/Flux?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43928112/

相关文章:

java 使用 isAlive() 启动一个线程

java - sendMultipartTextMessage 作为多条 SMS 消息发送?

java - C++/Java - int(0 - 1023) 到字节数组(仅限两个字节)

java - 如何使用 RxJava 将 Listeners 正确转换为 Reactive(Observables)?

java - 按需执行热 Observable

reactive-programming - 有条件的两个 Mono 的组合

ios - ReactiveCocoa - 停止触发 subscribeNext 直到另一个信号完成

java - Android Realm 不正确的线程

java - Java 8 流上的缓冲区运算符

java - Spring react 器: What's the corresponding class to Optional<T>?