java - 将 Flux 条目与 previous 和 map 相结合

标签 java project-reactor

问题

使用 Flux 如何访问前一个元素?

背景

我有一个外部事件流,它按顺序提供事件,该流的顺序是调度一个事件,然后立即调度另一个事件。但是,第二个事件的元数据位于第一个事件中。

请注意,事件数量并不总是偶数。

我想做的是将事件组合成事件流以供下游使用。

Flux#zip 看起来很有希望,但这意味着返回外部事件类型的对象。

初始代码

到目前为止我所得到的是。

    BinaryLogClient client = new BinaryLogClient(host, port, username, password);
    Flux<Event> bridge = Flux.create(sink -> {
        EventListener fluxListener = event -> {
            sink.next(event);
        };

        client.registerEventListener(fluxListener);
    });

    bridge.subscribe(DemoApplication::printEvent);
    bridge.map(new EventPairMemorizer());


public class EventPair  {
    private final Event previous;
    private final Event current;

    public EventPair(Event previous, Event current) {
        this.previous = previous;
        this.current = current;
    }

    /**
     * @return `null` if no previous events.
     */
    public Event getPrevious() {
        return previous;
    }

    public Event getCurrent() {
        return current;
    }
}

/**
 * Not thread safe has to go on a single thread
 */
public class EventPairMemorizer implements Function<Event, EventPair> {
    Event previous = null;

    EventPair toPair(Event e) {
        EventPair pair = new EventPair(previous, e);
        previous = e;
        return pair;
    }

    @Override
    public EventPair apply(Event current) {
        return toPair(current);
    }
}

这部分是一个学习练习,部分是一个概念验证。

不相关的细节

我正在尝试使用 mysql-binlog-connector-java 来获取有关数据库中更改内容的流。

因此,如果我收到 EXT_WRITE_ROWS 事件,则前一个事件是 TABLE_MAP 事件。然后我想对 TABLE_MAP 事件进行列查找(使用 jdbc)。然后转换为一些 JSON 友好的内部结构。

这同样适用于 EXT_UPDATE_ROWS 事件。

所以想法代码看起来像

  1. onExternalEvent 推送至 Flux
  2. 检查事件类型。如果使用 Mono 在 jdbc 线程上匹配调用 jdbc
  3. 将 Mono 和当前事件结合起来。
  4. 映射到内部类型。
  5. 发送到不同的流。
  6. 利润

最佳答案

重叠缓冲区怎么样?

使用 buffer(2, 1),您将为每个元素打开一个缓冲区,每个缓冲区将包含 2 个元素。

然后,您可以忽略未以您感兴趣的事件结束的缓冲区,并获取您感兴趣的事件的先前值...

关于java - 将 Flux 条目与 previous 和 map 相结合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53837314/

相关文章:

java - react 堆: Expand a ParallelFlux

spring-webflux - 在多个 Flux 的末尾进行同步,例如 subscribe() 和 then()

java - Selenium WebDriver - 使用 @FindBy 注释查找复选框的方法是什么?

java - 内部没有抽象方法的抽象类

java - 如何在Java Reactor中阻塞调用后重新抛出错误?

java - 取消使用 WebFlux 创建的 SSE 流的正确方法

java - 查询中的单个添加标量

java - 如何创建字符串枚举类型而不覆盖 Mule Studio 中使用的 toString 方法?

java - OpenCV linux下如何安装FFMPEG

spring-boot - 如果 Mono 为空,则创建一个未找到的 ServerResponse