java - 无法让 KafkaProducer/KafkaConsumer 在 Scala 中工作

标签 java scala apache-kafka

我正在尝试创建一个简单的 KafkaProducer 和 KafkaConsumer,以便我可以将数据发送到代理上的主题,然后验证是否已收到数据。下面是我用来定义消费者和生产者的两种方法,以及如何发送消息。 send 方法至少需要 20 秒才能完成,据我所知,consumer.poll 方法实际上从未完成,但我最长的时间是 10 分钟。

有人对我做错了什么有建议吗?生产者/消费者是否有一些我没有正确设置的属性?这些属性是直接从文档复制的,所以我不明白为什么它们不起作用。

KafkaProducer docs
KafkaConsumer docs

"verify we can send to producer" in {
    val consumer = createKafkaConsumer("address:9002")
    val producer = createKafkaProducer("address:9002")

    val message = "I am a message"
    val record = new ProducerRecord[String, String]("myTopic", message)

    producer.send(record)
    TimeUnit.SECONDS.sleep(5)
    val records = consumer.poll(5000)
    println("records: "+records)
    consumer1.close()
}

def createKafkaProducer(kafka: String): KafkaProducer[String,String] = {
    val props = new Properties()
    props.put("bootstrap.servers", kafka)
    props.put("acks", "all")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    new KafkaProducer[String,String](props)
}

def createKafkaConsumer(kafka: String): KafkaConsumer[String, String] = {
    val props = new Properties()
    props.put("bootstrap.servers", kafka)
    props.put("group.id", "test")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    props.put("session.timeout.ms", "30000")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val consumer = new KafkaConsumer[String, String](props)
    consumer.subscribe(Collections.singletonList("myTopic"))
    consumer
}

编辑:我已经更新了我的代码,以便现在从发送方法获得响应,并且似乎超时了 org.apache.kafka.common.errors.TimeoutException: 无法更新元数据60000 毫秒后。

最佳答案

事实证明,我遇到了 DNS 问题,这意味着我实际上并未连接到代理。修复此问题后,消息即可通过,配置没有任何问题。

关于java - 无法让 KafkaProducer/KafkaConsumer 在 Scala 中工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40251976/

相关文章:

Java - 执行 I/O 操作时的线程状态

Java:如何使用Scanner类读取资源文件夹中的文本文件

scala 注释参数需要是常量,但最终的 val 没有成功

mongodb - Play2 & ReactiveMongo 测试问题 : db connection right after test fails

scala - 如何使用 Spark 结构化流连接到受 Kerberos 保护的 Kafka 集群?

java - 如何通过debezium CDC机制反序列化从kafka代理收到的BigDecimal值?

java - 仅在 Android 中调整 ScrollView 的大小

java - 如何使用 Twilio REST API 将 DateSent 过滤器设置为 Java 中的 getMessages

java - Scala - 组合 LocalDate 和 LocalTime 时如何正确添加 ZoneOffset?

java - kafka-clients 包中的 AdminClient 线程安全吗?