我是整个 Scala 场景的新手,但到目前为止我一直很喜欢这个旅程!然而,我遇到了一个问题,还没有弄清楚原因...... 我目前正在使用 Kafka,并尝试从某个主题读取数据并将其传递到其他地方。
问题是:内部 for 理解中的 println 按预期输出底部的行,但内部 for 之外的所有其他 prinln 都被跳过,并且该函数最终什么也不返回(甚至无法发出测试用例中的 getClass!)...可能是什么原因造成的?我真的没有主意了......
相关代码:
def tryBatchRead(maxMessages: Int = 100, skipMessageOnError: Boolean = true): List[String] = {
var numMessages = 0L
var list = List[String]()
val iter = if (maxMessages >= 0) stream.slice(0, maxMessages) else stream
for (messageAndTopic <- iter) {
for (m <- messageAndTopic) {
println(m.offset.toString + " --- " + new String(m.message))
list = list ++ List(new String(m.message))
println("DEBUG " + list)
numMessages += 1
}
println("test1")
}
println("test2")
println("FINISH" + list)
connector.shutdown()
println("test3")
list
}
输出:
6 --- {"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}}
DEBUG List({"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}})
7 --- test 2
DEBUG List({"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}}, test 2)
8 --- {"StartSurvey":{"user":{"id":"6a736fdd-79a0-466a-9030-61b5ac3a3a0e","age":25,"sex":"M","location":"PT"}}}
DEBUG List({"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}}, test 2, {"StartSurvey":{"user":{"id":"6a736fdd-79a0-466a-9030-61b5ac3a3a0e","age":25,"sex":"M","location":"PT"}}})
感谢您的帮助!
最佳答案
我不完全确定,但很可能您在阅读上一条消息后会阻塞,等待下一条消息的到来(kafka 流基本上是无限的)。为kafka消费者配置超时,所以如果一段时间没有消息就会放弃。有一个 consumer.timeout.ms
属性(例如将其设置为 3000
毫秒),一旦达到等待限制,这将导致 ConsumerTimeoutException。
顺便说一句,我会将您的代码重写为:
def tryBatchRead(maxMessages: Int = 100): List[String] = {
// `.take` works fine if collection has less elements than max
val batchStream = stream.take(maxMessages)
// TODO: add try/catch section, according to the above comments
// in scala we usually write a single joined for, instead of multiple nested ones
val batch = for {
messageAndTopic <- batchStream.take(maxMessages)
msg <- messageAndTopic // are you sure you can iterate message and topic? 0_o
} yield {
println(m.offset.toString + " --- " + new String(m.message))
msg
}
println("Number of messages: " + batch.length)
// shutdown has to be done outside, it's bad idea to implicitly tear down streams in reading function
batch
}
关于java - 关于理解的奇怪问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24434288/