scala - 如何使用 fs2-kafka 从 embedded-kafka 读取数据

标签 scala apache-kafka fs2 cats-effect embedded-kafka

我正在使用 fs2-kafka阅读 embedded-kafka .

我使用 withRunningKafkaOnFoundPort 创建嵌入式 kafka,创建主题并发布一些消息。然而,当我尝试用 fs2-kafka 读回它时,我得到一个 NullPointerException。我已经隔离了一个测试用例,代码如下。

这是我的代码:

import cats.effect._
import cats.implicits._
import cats.effect.implicits._
import fs2.Stream
import fs2.kafka.{AutoOffsetReset, ConsumerSettings, KafkaConsumer, consumerStream}
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.scalatest.{BeforeAndAfterAll, FunSuite}

import scala.concurrent.ExecutionContext

class KafkaSuite extends FunSuite with EmbeddedKafka {

  val singleThreadExecutor = ExecutionContext.fromExecutor((task: Runnable) => task.run())
  implicit val contextShift = IO.contextShift(singleThreadExecutor)
  implicit val timer = IO.timer(singleThreadExecutor)

  val topic = "example"
  val partition = 0
  val clientId = "client"

  test("works") {
    val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)

    withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig =>
      createCustomTopic(topic)
      publishStringMessageToKafka(topic, "example-message1")
      publishStringMessageToKafka(topic, "example-message2")
      publishStringMessageToKafka(topic, "example-message3")
      publishStringMessageToKafka(topic, "example-message4")

      val broker = s"localhost:${actualConfig.kafkaPort}"

      val consumerSettings = ConsumerSettings[IO, String, String]
        .withAutoOffsetReset(AutoOffsetReset.Earliest)
        .withBootstrapServers(broker)
        .withGroupId("group")
        .withClientId(clientId)

      val r = consumerStream[IO].using(consumerSettings)
        .evalTap(_.subscribeTo(topic))
        .evalTap(_.seekToBeginning)
        .flatMap { consumer =>
          consumer.stream.take(1)
        }
        .compile
        .toList

      val res = r.unsafeRunSync()
      Console.println(res)
      assert(res.size == 1)
    }
  }

}

build.sbt:

name := "test"

version := "0.1"

scalaVersion := "2.12.6"


libraryDependencies ++= Seq(
  "org.scalatest" % "scalatest_2.12" % "3.1.2" % "test",
  "org.slf4j" % "slf4j-simple" % "1.7.25",
  "com.github.fd4s" %% "fs2-kafka" % "1.0.0",
  "io.github.embeddedkafka" %% "embedded-kafka" % "2.4.1.1" % Test
)

这是堆栈跟踪:

java.lang.NullPointerException was thrown.
java.lang.NullPointerException
    at java.lang.String.<init>(String.java:515)
    at fs2.kafka.Deserializer$.$anonfun$string$1(Deserializer.scala:208)
    at fs2.kafka.Deserializer$.$anonfun$lift$1(Deserializer.scala:184)
    at fs2.kafka.Deserializer$$anon$1.deserialize(Deserializer.scala:133)
    at fs2.kafka.ConsumerRecord$.deserializeFromBytes(ConsumerRecord.scala:166)
    at fs2.kafka.ConsumerRecord$.fromJava(ConsumerRecord.scala:177)
    at fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$2(KafkaConsumerActor.scala:378)
    at cats.data.NonEmptyVectorInstances$$anon$1.traverse(NonEmptyVector.scala:300)
    at cats.data.NonEmptyVectorInstances$$anon$1.traverse(NonEmptyVector.scala:245)
    at cats.Traverse$Ops.traverse(Traverse.scala:19)
    at cats.Traverse$Ops.traverse$(Traverse.scala:19)
    at cats.Traverse$ToTraverseOps$$anon$2.traverse(Traverse.scala:19)
    at fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$1(KafkaConsumerActor.scala:376)
    at cats.instances.VectorInstances$$anon$1.$anonfun$traverse$2(vector.scala:80)
    at cats.instances.VectorInstances$$anon$1.loop$2(vector.scala:43)
    at cats.instances.VectorInstances$$anon$1.$anonfun$foldRight$2(vector.scala:44)
    at cats.Eval$.advance(Eval.scala:271)
    at cats.Eval$.loop$1(Eval.scala:350)
    at cats.Eval$.cats$Eval$$evaluate(Eval.scala:368)
    at cats.Eval$Defer.value(Eval.scala:257)
    at cats.instances.VectorInstances$$anon$1.traverse(vector.scala:79)
    at cats.instances.VectorInstances$$anon$1.traverse(vector.scala:15)
    at cats.Traverse$Ops.traverse(Traverse.scala:19)
    at cats.Traverse$Ops.traverse$(Traverse.scala:19)
    at cats.Traverse$ToTraverseOps$$anon$2.traverse(Traverse.scala:19)
    at fs2.kafka.internal.KafkaConsumerActor.records(KafkaConsumerActor.scala:373)
    at fs2.kafka.internal.KafkaConsumerActor.$anonfun$poll$2(KafkaConsumerActor.scala:405)
    at cats.effect.internals.IORunLoop$.liftedTree1$1(IORunLoop.scala:95)
    at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:95)
    at cats.effect.internals.IORunLoop$.startCancelable(IORunLoop.scala:41)
    at cats.effect.internals.IOBracket$BracketStart.run(IOBracket.scala:86)
    at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:70)
    at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:36)
    at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:93)
    at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:93)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
    at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:93)
    at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
    at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:44)
    at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:72)
    at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:52)
    at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:136)
    at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:355)
    at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:376)
    at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:316)
    at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

最佳答案

原来问题是 ConsumerSettings[IO, String, String] 中的键类型是 String 但 embedded-kafka 写入 Null作为 key ,因此在反序列化 key 时它会因 NullPointerException 而失败。将键类型设置为 Unit 解决了异常问题。

另一个问题是 withRunningKafkaOnFoundPort 在 IO 评估开始之前完成。要让它运行,需要从 embedded-kafka 创建一个 Resource 并将 IO 包装到其中。

val embeddedKafka = Resource.make(IO(EmbeddedKafka.start()))((kafka) => IO(kafka.stop(true)))

下一个问题是 fs2-kafka 无法与单线程执行器一起工作,因此您必须为其提供一个执行器池(例如 ExecutionContext.global)。

这是一个完整的工作示例:

import cats.effect._
import fs2.Stream
import fs2.kafka.{AutoOffsetReset, ConsumerSettings, consumerStream}
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.scalatest.FunSuite

import scala.concurrent.ExecutionContext

class KafkaSuite extends FunSuite with EmbeddedKafka {

  implicit val ec = ExecutionContext.global
  implicit val contextShift = IO.contextShift(ec)
  implicit val timer = IO.timer(ec)

  val topic = "example"
  val partition = 0
  val clientId = "client"
  val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)

  def broker(port: Long) = s"localhost:${port}"

  val consumerSettings = ConsumerSettings[IO, Unit, String]
    .withAutoOffsetReset(AutoOffsetReset.Earliest)
    .withEnableAutoCommit(true)
    .withGroupId("group")
    .withClientId(clientId)

  val embeddedKafka = Resource.make(IO(EmbeddedKafka.start()))((kafka) => IO(kafka.stop(true)))

  test("works") {
    val r = Stream.resource(embeddedKafka).flatMap { kafka =>
      implicit val actualConfig: EmbeddedKafkaConfig = kafka.config
      createCustomTopic(topic)
      publishStringMessageToKafka(topic, "example-message1")
      publishStringMessageToKafka(topic, "example-message2")
      publishStringMessageToKafka(topic, "example-message3")
      publishStringMessageToKafka(topic, "example-message4")

      consumerStream(consumerSettings.withBootstrapServers(broker(actualConfig.kafkaPort)))
        .evalTap(_.subscribeTo(topic))
        .evalTap(_.seekToBeginning)
        .flatMap(_.stream)
        .map(_.record.value)
        .take(1)
    }
    val res = r.compile.toList.unsafeRunSync()
    assert(res.contains("example-message1"))
  }

}

关于scala - 如何使用 fs2-kafka 从 embedded-kafka 读取数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62598197/

相关文章:

apache-kafka - 发布者的 KAFKA 消息限制?

scala - 将 fs2 流输出拆分为两个文件

scala - flatMap、flatTap、evalMap 和 evalTap 之间的区别

scala - 我在哪里可以在Intellij IDEA中设置SBT代理?

scala - Spark Scala简单加法和其他数学计算

scala - 手动排除 sbt 中的一些测试类

mysql - 使用 anorm 将多个值和字段插入表中

C# 无法在 Kafka 主题上使用消息?

apache-kafka - 在 Windows 中启动 Confluent Schema Registry

scala - 使用 scala fs2 文件流从文件中删除过滤行