尝试使用 Debezium 将 MySql 数据库流式传输到 Kafka。
因此,在 Docker 容器中,我启动了 Zookeeper、Kafka、MySQL 数据库、MySQL 命令行和 Kafka Connect。
当我在MySQL命令行中运行任何DML命令时,我可以在我在docker中启动的观察器窗口中看到更改事件。所以目前一切看起来都很好。请在下面找到相同的内容。
现在我正在尝试使用 Java 代码中的更改事件,每当我在 MySQL 命令行中执行任何 DML 命令时,这些事件都可以在观察器窗口中看到。请在下面找到消费者。
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-first-consumer-group");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
ArrayList<String> topics = new ArrayList<>();
topics.add("dbserver1.inventory.customers");
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1L);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Message received: " + record.value());
}
consumer.commitAsync();
}
无法使用上述消费者的数据更改事件。如果需要做任何事情,请告诉我。
最佳答案
有效和无效之间的区别在于您访问的监听器端口。
所以要么:
- 29092 是错误的,9092 是正确的
- 29092 绑定(bind)到 Docker 网络内部公布的监听器,9092 绑定(bind)到
localhost
( learn more )
关于java - 无法从 Java 连接到在 Docker 中运行的 Kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62014239/