java - 从多个 Kafka 主题消费

标签 java apache-kafka apache-kafka-streams

我想编写一个 Kafka 应用程序,它从主题中消费并将某些内容保存在数据库中。主题是由 Debezium Kafka connect 基于 mysql binlog 创建的。所以我每张 table 都有一个主题。 这是我用于从一个主题消费的代码:

KStream<GenericRecord,mysql.company.tiers.Envelope>[] tierStream = builder.stream("mysql.alopeyk.tiers",
                Consumed.with(TierSerde.getGenericKeySerde(), TierSerde.getEnvelopeSerde()));

从架构的角度来看,我应该为每个表创建一个 KStream 并并行运行它们。但表的数量如此之大,拥有那么多的线程可能不是最好的选择。

所有表都有一个名为created_at的列(它是一个laravel应用程序),所以我很好奇是否有一种方法可以为提取这个公共(public)列的值提供通用的Serde。除了表名称之外,这是我唯一对其值感兴趣的列。

最佳答案

这完全取决于生成消息的应用程序(连接器)如何序列化您的值。 如果Deserializer (Serdes)可以从不同类型的消息中提取created_at,这是可能的。

所以,答案是肯定的,但这取决于您的消息值和反序列化器

假设序列化后的所有消息的格式如下:

  • create_at;名称:位置;...
  • create_at;城市,国家;...
  • create_at;产品名称;...

在这种情况下,Deserializer 只需要获取第一个 ; 之前的字符,并将其转换为日期,其余值可以删除。

示例代码:

public class CustomDeserializer implements Deserializer<Date> {

    @Override
    public Date deserialize(String topic, byte[] data) {
        String strDate = new String(data);
        return new Date(Long.parseLong(strDate.substring(0, strDate.indexOf(";"))));
    }
}

关于java - 从多个 Kafka 主题消费,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59015934/

相关文章:

java - Lucene,多词搜索,某一词必须精确匹配

java - 如何从一个 Maven 模块访问另一个 Maven 模块的 JSF 模板、类

azure - Kafka 连接器可以与事件中心代理一起使用吗?

python - 使用 Kafka-python 处理生产者和消费者

apache-kafka - 为什么我必须使用 Kafka Streams 配置状态存储

java - kafka KStream - 进行 n 秒计数的拓扑

java - GWT历史问题

java - 广播 Intent 回调 : result=CANCELLED forIntent

node.js - 使用 Node.js 从 Kafka 到 Elasticsearch 消费

apache-kafka-streams - 带有Http请求的Kafka流