java - 如何将外部源的上下文添加到 Kafka Streams 中的记录的正确方法

标签 java apache-kafka apache-kafka-streams

我有使用 Kafka Streams(使用处理器 API)处理的记录。假设记录有 city_id以及其他一些领域。

在 Kafka Streams 应用程序中,我想将目标城市的当前温度添加到记录中。
Temperature<->City对存储在例如。 Postgres.

在 Java 应用程序中,我能够使用 JDBC 连接到 Postgres 并构建 new HashMap<CityId, Temperature>所以我可以根据 city_id 查找温度.类似于 tempHM.get(record.city_id) .

有几个问题如何最好地处理它:

在哪里初始化上下文数据?

本来我一直在AbstractProcessor::init()内做的但这似乎是错误的,因为它是为每个线程初始化的,并且还在重新平衡时重新初始化。

所以我在使用它构建流拓扑构建器和处理器之前移动了它。数据仅在所有处理器实例上独立获取一次。

这是正确有效的方法吗?它有效但是...

HashMap<CityId, Temperature> tempHM = new HashMap<CityId, Temperature>;

// Connect to DB and initialize tempHM here

Topology topology = new Topology();

topology
    .addSource(SOURCE, stringDerializer, protoDeserializer, "topic-in")

    .addProcessor(TemperatureAppender.NAME, () -> new TemperatureAppender(tempHm), SOURCE)

    .addSink(SINK, "topic-out", stringSerializer, protoSerializer, TemperatureAppender.NAME)
;

如何刷新上下文数据?

例如,我想每 15 分钟刷新一次温度数据。我正在考虑使用 Hashmap 容器而不是 Hashmap,这样可以处理它:

abstract class ContextContainer<T> {

    T context;
    Date lastRefreshAt;

    ContextContainer(Date now) {
        refresh(now);
    }

    abstract void refresh(Date now);

    abstract Duration getRefreshInterval();

    T get() {
        return context;
    }

    boolean isDueToRefresh(Date now) {
        return lastRefreshAt == null
            || lastRefreshAt.getTime() + getRefreshInterval().toMillis() < now.getTime();
    }
}

final class CityTemperatureContextContainer extends ContextContainer<HashMap> {

    CityTemperatureContextContainer(Date now) {
        super(now);
    }

    void refresh(Date now) {
        if (!isDueToRefresh(now)) {
            return;
        }

        HashMap context = new HashMap();
        // Connect to DB and get data and fill hashmap

        lastRefreshAt = now;
        this.context = context;
    }

    Duration getRefreshInterval() {
        return Duration.ofMinutes(15);
    }
}

这是一个用 SO textarea 编写的简短概念,可能包含一些语法错误,但我希望重点很清楚

然后将其传递给处理器,如 .addProcessor(TemperatureAppender.NAME, () -> new TemperatureAppender(cityTemperatureContextContainer), SOURCE)

在处理器中做

    public void init(final ProcessorContext context) {
        context.schedule(
            Duration.ofMinutes(1),
            PunctuationType.STREAM_TIME,
            (timestamp) -> { 
                cityTemperatureContextContainer.refresh(new Date(timestamp));
                tempHm = cityTemperatureContextContainer.get();
            }    
        );

        super.init(context);
    }

有没有更好的方法?主要问题是找到合适的概念,然后我就能实现它。不过,关于该主题的资源并不多。

最佳答案

In Kafka Streams app I want to add current temperature in the target city to the record. Temperature<->City pairs are stored in eg. Postgres.

In Java application I'm able to connect to Postgres using JDBC and build new HashMap<CityId, Temperature> so I'm able to lookup temperature based on city_id. Something like tempHM.get(record.city_id).

更好的选择是使用 Kafka Connect 将您的数据从 Postgres 提取到 Kafka 主题中,将此主题读入 KTable在您的 Kafka Streams 应用程序中,然后加入此 KTable与您的其他流(记录流“带有 city_id 和一些其他字段”)。也就是说,您将执行 KStream -到- KTable加入。

思考:

### Architecture view

DB (here: Postgres) --Kafka Connect--> Kafka --> Kafka Streams Application


### Data view

Postgres Table ----------------------> Topic --> KTable

您的用例的示例连接器是 https://www.confluent.io/hub/confluentinc/kafka-connect-jdbchttps://www.confluent.io/hub/debezium/debezium-connector-postgresql .

上述基于 Kafka Connect 的设置的一个优点是您不再需要直接从您的 Java 应用程序(使用 Kafka Streams)与您的 Postgres 数据库对话。

另一个优点是您不需要将上下文数据(您提到的每 15 分钟一次)从数据库“批量刷新”到 Java 应用程序中,因为应用程序会实时获取最新的数据库更改自动通过 DB->KConnect->Kafka->KStreams-app 流程。

关于java - 如何将外部源的上下文添加到 Kafka Streams 中的记录的正确方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56038703/

相关文章:

java - Jboss 5、类加载器和多个类实例

java - m2e 生命周期映射未执行

Java 等待/通知不工作

apache-kafka - 消息代理中的分区如何解决排序问题?

apache-kafka - kafka 在重启时丢失所有主题

apache-kafka-streams - 卡夫卡流 : Handle Aging of events in a stream on window expiry

java - 使用kafka lib反序列化PRIMITIVE AVRO KEY

apache-kafka - Spring Embedded Kafka + Mock Schema Registry : State Store ChangeLog Schema not registered

java - String ReplaceAll 方法不起作用

apache-kafka - 带有状态存储的流媒体应用程序最多需要 1 小时才能重新启动