java - Kafka Streams 表转换

标签 java apache-kafka apache-kafka-streams apache-kafka-connect

我在 SQL Server 中有一个表,我想将其流式传输到 Kafka 主题,结构如下:

(UserID, ReportID)


我想将其转换为这种结构并放入 Elasticsearch 中:

  "UserID": 1,
  "Reports": [1, 2, 3, 4, 5, 6]


这种用例是否可能?我总是可以只看 UserID更改并查询数据库,但这似乎很幼稚,而且不是最好的方法。


import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.util.ArrayList;
import java.util.Properties;

public class MyDemo {
  public static void main(String... args) {
    System.out.println("Hello KTable!");

    final Serde<Long> longSerde = Serdes.Long();

    KStreamBuilder builder = new KStreamBuilder();

    KStream<Long, Long> reportPermission =;

    KTable<Long, ArrayList<Long>> result = reportPermission
            new Initializer<ArrayList<Long>>() {
              public ArrayList<Long> apply() {
                return null;
            new Aggregator<Long, Long, ArrayList<Long>>() {
              public ArrayList<Long> apply(Long key, Long value, ArrayList<Long> aggregate) {
                return aggregate;
            new Serde<ArrayList<Long>>() {
              public void configure(Map<String, ?> configs, boolean isKey) {}

              public void close() {}

              public Serializer<ArrayList<Long>> serializer() {
                return null;

              public Deserializer<ArrayList<Long>> deserializer() {
                return null;

    KafkaStreams streams = new KafkaStreams(builder, createStreamProperties());

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

  private static final String TOPIC = "report-permission";

  private static final Properties createStreamProperties() {
    Properties props = new Properties();

    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "report-permission-app");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");

    return props;

我实际上陷入了聚合阶段,因为我无法为 ArrayList<Long> 编写正确的 SerDe (还没有足够的技能),lambda 似乎不适用于聚合器 - 它不知道 agg 的类型是什么。 :

KTable<Long, ArrayList<Long>> sample =
        () -> new ArrayList<Long>(),
        (key, val, agg) -> agg.add(val),


您可以使用 Kafka 的 Connect API 将数据从 SQL Server 获取到 Kafka。我不知道 SQL Server 的任何特定连接器,但您可以使用任何基于通用 JDBC 的连接器:

要处理数据,您可以使用 Kafka 的 Streams API。您只需aggregate()每个用户的所有报告即可。像这样的事情:

KTable<UserId, List<Reports>> result ="topic-name")
           // init a new empty list and
           // `add` the items to the list in the actual aggregation

查看文档以了解有关 Streams API 的更多详细信息:

Note, that you need to make sure that the list of reports does not grow unbounded. Kafka has some (configurable) maximum message size and the whole list will be contained in a single message. Thus, you should estimate the maximum message size and apply the corresponding configuration (-> max.message.bytes) before going into production. Check out configs at the webpage:

最后,您使用 Connect API 将数据推送到 Elastic Search。有多种不同的连接器可供使用(我当然会推荐 Confluence 的连接器)。有关 Connect API 的更多详细信息:

关于java - Kafka Streams 表转换,我们在Stack Overflow上找到一个类似的问题:


apache-kafka-streams - KStream 在 kafka 2.2 中自动创建主题吗?

java - 从在 32 位 jre 上运行的桌面应用程序内部使用 64 位 jre 调用第 3 方 API?是否可以?

java - 如何在 Java 中链接函数调用?

java - 使用 java/groovy 将文件中一行的分号替换为哈希

java - 为库包声明类时出现 IllegalAccessError

apache-kafka - 有没有办法对 Kafka 流中的输入主题进行重新分区?

java - 如何使用 Maven 包含远程 java 源?

hadoop - Apache Phoenix 4.7引发ClassNotFoundException

java - Spring Kafka、Spring Cloud Stream 和 Avro 兼容性 Unknown magic byte

java - 获取NoSuchFileException : while starting Kafka instance