java - KStream 到 KTable 左连接返回 Null

标签 java apache-kafka apache-kafka-streams kafka-join

我目前正在尝试使用 KStream 到 KTable 联接来丰富 Kafka 主题。为了我的概念证明,我目前有一个包含大约 600,000 条记录的 Kafka Stream,这些记录都具有相同的键,以及一个从具有 1 个键值对记录的主题创建的 KTable,其中 KTable 主题中的键与 600,000 条记录的键匹配创建 KStream 的主题中的记录。

当我使用左连接(通过下面的代码)时,所有记录在 ValueJoiner 上返回 NULL。

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe-json-parse-" + System.currentTimeMillis());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xx.xx.xxx:9092");        
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());   
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.streams.processor.WallclockTimestampExtractor");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5);


final StreamsBuilder builder = new StreamsBuilder();
// Build a Kafka Stream from the Netcool Input Topic
KStream<String, String> source = builder.stream("output-100k");


// Join the KStream to the KTable
KStream<String, String> enriched_output = source
    .leftJoin(netcool_enrichment, (orig_msg, description) -> {
        String new_msg = jsonEnricher(orig_msg, description);
        if (description != null) {
            System.out.println("\n[DEBUG] Enriched Input Orig: " + orig_msg);
            System.out.println("[DEBUG] Enriched Input Desc: " + description);                
            System.out.println("[DEBUG] Enriched Output: " + new_msg);
        }
        return new_msg;
        });

下面是来自源 KStream 的示例输出记录(使用 forEach 循环):

[KSTREAM] Key: ismlogs
[KSTREAM] Value: {"severity":"debug","ingested_timestamp":"2018-07-18T19:32:47.227Z","@timestamp":"2018-06-28T23:36:31.000Z","offset":482,"@metadata":{"beat":"filebeat","topic":"input-100k","type":"doc","version":"6.2.2"},"beat":{"hostname":"abc.dec.com","name":"abc.dec.com","version":"6.2.2"},"source":"/root/100k-raw.txt","message":"Thu Jun 28 23:36:31 2018 Debug: Checking status of file /ism/profiles/active/test.xml","key":"ismlogs","tags":["ismlogs"]}

我尝试将 KTable 转换回 KStream,并对转换后的 Stream 使用 forEach 循环,并验证记录实际上位于 KTable 中。

KTable<String, String> enrichment = builder.table("enrichment");
KStream<String, String> ktable_debug = enrichment.toStream();
ktable_debug.foreach(new ForeachAction<String, String>() {
    public void apply(String key, String value) {
        System.out.println("[KTABLE] Key: " + key);
        System.out.println("[KTABLE] Value: " + value);
    }
 });

上面的代码输出:

[KTABLE] Key: "ismlogs"
[KTABLE] Value: "ISM Logs"

最佳答案

根据您的控制台消息, key 不同,因此它们不会加入:

[KSTREAM] Key: ismlogs
[KTABLE] Key: "ismlogs"

对于KTable,键实际上是带有双引号的"ismlogs"

关于java - KStream 到 KTable 左连接返回 Null,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51430687/

相关文章:

java - scala 中的 val 与 java 中的 const 有何不同?

apache-kafka - kafka 流调用 poll() 多少次以从 kafka 主题中获取记录

kotlin - 当我使用多个流时,Kafka Streams StreamsException

apache-kafka - Kafka 流 - 加入两个 ktables 调用连接函数两次

apache-kafka - kafka streams - 加入分区主题

java - 在 Android/Java 中循环 JSONArray 时出现类型错误

java - 如何检查 IP 地址是否来自 Java 中的特定网络/网络掩码?

java - 对 Queap 数据结构的插入操作

java - 找不到 org.apache.flink.api.common.serialization.DeserializationSchema 的类文件

java - 如何在 Kafka 中使用多个消费者?