java - Kafka Streams 表转换

标签 java apache-kafka apache-kafka-streams apache-kafka-connect

我在 SQL Server 中有一个表,我想将其流式传输到 Kafka 主题,结构如下:

(UserID, ReportID)

该表将不断更改(添加记录、插入记录、无更新)

我想将其转换为这种结构并放入 Elasticsearch 中:

{
  "UserID": 1,
  "Reports": [1, 2, 3, 4, 5, 6]
}

到目前为止,我看到的示例是日志或点击流,但在我的情况下不起作用。

这种用例是否可能?我总是可以只看 UserID更改并查询数据库,但这似乎很幼稚,而且不是最好的方法。

更新

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.util.ArrayList;
import java.util.Properties;

public class MyDemo {
  public static void main(String... args) {
    System.out.println("Hello KTable!");

    final Serde<Long> longSerde = Serdes.Long();

    KStreamBuilder builder = new KStreamBuilder();

    KStream<Long, Long> reportPermission = builder.stream(TOPIC);

    KTable<Long, ArrayList<Long>> result = reportPermission
        .groupByKey()
        .aggregate(
            new Initializer<ArrayList<Long>>() {
              @Override
              public ArrayList<Long> apply() {
                return null;
              }
            },
            new Aggregator<Long, Long, ArrayList<Long>>() {
              @Override
              public ArrayList<Long> apply(Long key, Long value, ArrayList<Long> aggregate) {
                aggregate.add(value);
                return aggregate;
              }
            },
            new Serde<ArrayList<Long>>() {
              @Override
              public void configure(Map<String, ?> configs, boolean isKey) {}

              @Override
              public void close() {}

              @Override
              public Serializer<ArrayList<Long>> serializer() {
                return null;
              }

              @Override
              public Deserializer<ArrayList<Long>> deserializer() {
                return null;
              }
            });

    result.to("report-aggregated-topic");

    KafkaStreams streams = new KafkaStreams(builder, createStreamProperties());
    streams.cleanUp();
    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  }

  private static final String TOPIC = "report-permission";

  private static final Properties createStreamProperties() {
    Properties props = new Properties();

    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "report-permission-app");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");

    return props;
  }
}

我实际上陷入了聚合阶段,因为我无法为 ArrayList<Long> 编写正确的 SerDe (还没有足够的技能),lambda 似乎不适用于聚合器 - 它不知道 agg 的类型是什么。 :

KTable<Long, ArrayList<Long>> sample = builder.stream(TOPIC)
    .groupByKey()
    .aggregate(
        () -> new ArrayList<Long>(),
        (key, val, agg) -> agg.add(val),
        longSerde
    );

最佳答案

您可以使用 Kafka 的 Connect API 将数据从 SQL Server 获取到 Kafka。我不知道 SQL Server 的任何特定连接器,但您可以使用任何基于通用 JDBC 的连接器:https://www.confluent.io/product/connectors/

要处理数据,您可以使用 Kafka 的 Streams API。您只需aggregate()每个用户的所有报告即可。像这样的事情:

KTable<UserId, List<Reports>> result =
    builder.stream("topic-name")
           .groupByKey()
           // init a new empty list and
           // `add` the items to the list in the actual aggregation
           .aggregate(...);

result.to("result-topic");

查看文档以了解有关 Streams API 的更多详细信息:https://docs.confluent.io/current/streams/index.html

Note, that you need to make sure that the list of reports does not grow unbounded. Kafka has some (configurable) maximum message size and the whole list will be contained in a single message. Thus, you should estimate the maximum message size and apply the corresponding configuration (-> max.message.bytes) before going into production. Check out configs at the webpage: http://kafka.apache.org/documentation/#brokerconfigs

最后,您使用 Connect API 将数据推送到 Elastic Search。有多种不同的连接器可供使用(我当然会推荐 Confluence 的连接器)。有关 Connect API 的更多详细信息:https://docs.confluent.io/current/connect/userguide.html

关于java - Kafka Streams 表转换,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46303973/

相关文章:

apache-kafka-streams - KStream 在 kafka 2.2 中自动创建主题吗?

java - 从在 32 位 jre 上运行的桌面应用程序内部使用 64 位 jre 调用第 3 方 API?是否可以?

java - 如何在 Java 中链接函数调用?

java - 使用 java/groovy 将文件中一行的分号替换为哈希

java - 为库包声明类时出现 IllegalAccessError

apache-kafka - 有没有办法对 Kafka 流中的输入主题进行重新分区?

java - 如何使用 Maven 包含远程 java 源?

hadoop - Apache Phoenix 4.7 csvBulkLoad.run()引发ClassNotFoundException

java - Spring Kafka、Spring Cloud Stream 和 Avro 兼容性 Unknown magic byte

java - 获取NoSuchFileException : while starting Kafka instance