java - 关于理解的奇怪问题

标签 java scala apache-kafka for-comprehension

我是整个 Scala 场景的新手,但到目前为止我一直很喜欢这个旅程!然而,我遇到了一个问题,还没有弄清楚原因...... 我目前正在使用 Kafka,并尝试从某个主题读取数据并将其传递到其他地方。

问题是:内部 for 理解中的 println 按预期输出底部的行,但内部 for 之外的所有其他 prinln 都被跳过,并且该函数最终什么也不返回(甚至无法发出测试用例中的 getClass!)...可能是什么原因造成的?我真的没有主意了......

相关代码:

def tryBatchRead(maxMessages: Int = 100, skipMessageOnError: Boolean = true): List[String] = {
  var numMessages = 0L

  var list = List[String]()

  val iter = if (maxMessages >= 0) stream.slice(0, maxMessages) else stream

  for (messageAndTopic <- iter) {
    for (m <- messageAndTopic) {
      println(m.offset.toString + " --- " + new String(m.message))
      list = list ++ List(new String(m.message))
      println("DEBUG " + list)
      numMessages += 1
    }
    println("test1")
  }

  println("test2")
  println("FINISH" + list)
  connector.shutdown()
  println("test3")
  list
}

输出:

6 --- {"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}}
DEBUG List({"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}})
7 --- test 2
DEBUG List({"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}}, test 2)
8 --- {"StartSurvey":{"user":{"id":"6a736fdd-79a0-466a-9030-61b5ac3a3a0e","age":25,"sex":"M","location":"PT"}}}
DEBUG List({"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}}, test 2, {"StartSurvey":{"user":{"id":"6a736fdd-79a0-466a-9030-61b5ac3a3a0e","age":25,"sex":"M","location":"PT"}}})

感谢您的帮助!

最佳答案

我不完全确定,但很可能您在阅读上一条消息后会阻塞,等待下一条消息的到来(kafka 流基本上是无限的)。为kafka消费者配置超时,所以如果一段时间没有消息就会放弃。有一个 consumer.timeout.ms 属性(例如将其设置为 3000 毫秒),一旦达到等待限制,这将导致 ConsumerTimeoutException。

顺便说一句,我会将您的代码重写为:

def tryBatchRead(maxMessages: Int = 100): List[String] = {
  // `.take` works fine if collection has less elements than max
  val batchStream = stream.take(maxMessages)  

  // TODO: add try/catch section, according to the above comments
  // in scala we usually write a single joined for, instead of multiple nested ones
  val batch = for {
    messageAndTopic <- batchStream.take(maxMessages)
    msg <- messageAndTopic // are you sure you can iterate message and topic? 0_o
  } yield {
    println(m.offset.toString + " --- " + new String(m.message))
    msg
  }

  println("Number of messages: " + batch.length)

  // shutdown has to be done outside, it's bad idea to implicitly tear down streams in reading function
  batch 
}

关于java - 关于理解的奇怪问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24434288/

相关文章:

Scala如何获得流的最后计算值?

java - Lagom 使用 Kafka 发布消息

java - 丢弃数字的前 10 位

java - AES 加密 Java 无效 key 长度

scala - future 的阻塞仍然阻塞吗?

java - 如何动态获取主题名称然后从中读取

c# - 如何使用 Confluence 的 .NET 客户端列出 Kafka 消费者组

java - Sonarqube Custom Rule- String Literal 不应该重复,在记录器的上下文中被忽略

java - 使用 Spring Boot Maven 着色无法正常工作

javascript - Play framework 2 scala 模板 - JavaScript