我想编写一个 Kafka 应用程序,它从主题中消费并将某些内容保存在数据库中。主题是由 Debezium Kafka connect 基于 mysql binlog 创建的。所以我每张 table 都有一个主题。 这是我用于从一个主题消费的代码:
KStream<GenericRecord,mysql.company.tiers.Envelope>[] tierStream = builder.stream("mysql.alopeyk.tiers",
Consumed.with(TierSerde.getGenericKeySerde(), TierSerde.getEnvelopeSerde()));
从架构的角度来看,我应该为每个表创建一个 KStream 并并行运行它们。但表的数量如此之大,拥有那么多的线程可能不是最好的选择。
所有表都有一个名为created_at的列(它是一个laravel应用程序),所以我很好奇是否有一种方法可以为提取这个公共(public)列的值提供通用的Serde。除了表名称之外,这是我唯一对其值感兴趣的列。
最佳答案
这完全取决于生成消息的应用程序(连接器)如何序列化您的值。
如果Deserializer
(Serdes
)可以从不同类型的消息中提取created_at
,这是可能的。
所以,答案是肯定的,但这取决于您的消息值和反序列化器
。
假设序列化后的所有消息的格式如下:
- create_at;名称:位置;...
- create_at;城市,国家;...
- create_at;产品名称;...
在这种情况下,Deserializer
只需要获取第一个 ;
之前的字符,并将其转换为日期,其余值可以删除。
示例代码:
public class CustomDeserializer implements Deserializer<Date> {
@Override
public Date deserialize(String topic, byte[] data) {
String strDate = new String(data);
return new Date(Long.parseLong(strDate.substring(0, strDate.indexOf(";"))));
}
}
关于java - 从多个 Kafka 主题消费,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59015934/