java - Kafka 流消费为 CSV

标签 java apache-kafka kafka-consumer-api apache-kafka-connect

我在java中使用Kafka,我将JSON消息作为字符串使用,速率为每分钟100万条消息,我需要将字符串拆分为仅取一些值并将其保存到CSV以将其加载到数据库,我怎样才能制作这样的东西?

最佳答案

您可以使用 Kafka Connect JDBC sink 将数据从 Kafka 主题直接流式传输到数据库。要了解有关 Kafka Connect 的更多信息,请参阅 the docsthis talk

以下是接收器连接器配置示例:

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:mysql://mysql:3306/demo",
    "topics": "test01",
    "connection.user": "connect_user",
    "connection.password": "asgard",
    "auto.create": true,
    "auto.evolve": true,
    "insert.mode": "insert",
    "pk.mode": "record_key",
    "pk.fields": "MESSAGE_KEY"
}

在本教程中了解更多信息:https://rmoff.dev/kafka-jdbc-video

了解如何在 Kafka Connect 中安装 JDBC 驱动程序 here

关于java - Kafka 流消费为 CSV,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61915158/

相关文章:

java - Spring Cloud API Gateway 自定义过滤对象上的 ClassCastException 到 Config

java - java中带有类型属性的JSON到XML

spring - Apache Kafka 与 tomcat 和 spring 的集成

php - 如何使用 php-rdkafka 在 kafka 中确认消费消息?

go - 如何使用 Sarama Go Kafka Consumer 从最新的偏移量中消费

java - 为什么我的 libgdx 游戏速度逐渐变慢?

java - 在android中处理多个 View

python - 如何在 Python 中解码 Avro 消息?

Python 错误 : Cannot import name KafkaConsumer

apache-kafka - --from-beginning 参数在 kafka-console-consumer.sh 中不起作用,但 --offset 0 --partition 0 有效