apache-kafka - Kafka-MongoDB Debezium 连接器 : distributed mode

标签 apache-kafka apache-kafka-connect debezium mongodb-kafka-connector

我正在研究 debezium mongodb 源连接器。我可以通过将 kafka Bootstrap 服务器地址作为远程机器(部署在 Kubernetes 中)和远程 MongoDB url,以分布式模式在本地机器上运行连接器吗?

我试过了,我看到连接器成功启动,没有错误,只有几条警告,但没有数据从 mongodb 流出。

使用下面的命令运行连接器

./bin/connect-distributed ./etc/schema-registry/connect-avro-distributed.properties ./etc/kafka/connect-mongodb-source.properties

如果不是,我还能如何实现这一目标,我不想像大多数教程建议的那样安装本地 kafka 或 mondoDB。我想为此使用我们的测试服务器。

按照下面的教程进行操作 : https://medium.com/tech-that-works/cloud-kafka-connector-for-mongodb-source-8b525b779772

下面是该问题的更多详细信息 连接器工作正常,我在连接器日志末尾看到以下几行

 INFO [Worker clientId=connect-1, groupId=connect-cluster] Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1000)
] INFO [Worker clientId=connect-1, groupId=connect-cluster] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1021)

我还在/etc/kafka/connect-mongodb-source.properties 中定义了 MongoDB 配置,如下所示

name=mongodb-source-connector 
connector.class=io.debezium.connector.mongodb.MongoDbConnector 
mongodb.hosts=/remoteserveraddress:27017 
mongodb.name=mongo_conn 
initial.sync.max.threads=1 
tasks.max=1

但是数据在 MongoDB 和 Kafka 之间没有流动。我还针对此 Kafka-MongoDB Debezium 连接器发布了单独的问题:分布式模式

任何指针都适用

最佳答案

connect-distributed 只接受一个属性文件。

您必须使用 REST API 以分布式模式配置 Kafka Connect。

https://docs.confluent.io/current/connect/references/restapi.html

注意:默认情况下,消费者将读取主题之外的最新数据,而不是现有数据。

您可以将其添加到 connect-avro-distributed.properties 中以修复它

consumer.auto.offset.reset=earliest

关于apache-kafka - Kafka-MongoDB Debezium 连接器 : distributed mode,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59673908/

相关文章:

java - 无法使用泛型使用 Jackson 反序列化动态 json

mysql - Debezium CDC Connector 任务出现错误 : javax. management.InstanceAlreadyExistsException

spring - 如何按类型消费来自Kafka的消息

apache-kafka - 使用 Kafka 的日志压缩,如果消费者没有落后于某个定义的时间限制,我该怎么做才能保证消费者不会错过消息?

mysql - 通过kafka jdbc连接器删除数据库中的记录

hadoop - 合流:Hdfs转换为avro格式,但是在 hive 中读取avro文件时,我的时间比 “timezone”提前5:30小时: “Asia/Kolkata”

apache-kafka - Kafka生产者线程不断增加

apache-kafka - 我如何使用嵌入式模式从 Kafka 反序列化 Avro

apache-kafka - 如何使用 kafka 0.10.x 获取所有组列表

mysql - 由 : io. debezium.text.ParsingException 引起:外部输入 'ASC' 期望