scala - 如何将KeyValue列表发送到Kafka?

标签 scala apache-kafka apache-kafka-streams

我正在尝试在 Kafka Streams 应用程序中将 List[KeyValue] 发送到该主题。但流需要一个 KeyValue。如何将 KeyValues 发送到流,而不是整个列表?

class MainTopology {

  val createTopology = (sourceTopic: String, sinkTopic: String) => {
    val builder = new StreamsBuilder()
    builder.stream(sourceTopic)
      .map[String, Option[JsonMessage]]((key, value) => toJsonEvent(key, value))
      .map[String, String]((key, value) => toFormattedEvents(key, value))
      .to(sinkTopic)
    builder.build()
  }

  private val toJsonEvent = (key: String, value: String) => {
    println(value)
    val jsonEventsAsCaseClasses = jsonToClass(value)
    new KeyValue(key, jsonEventsAsCaseClasses)
  }

  private val toFormattedEvents = (key: String, value: Option[JsonMessage]) => {
    val jsonEvents: List[String] = formatEvents(value)
    jsonEvents.map(event => new KeyValue(key,event))
  }

}

因此第二张 map 无法编译。

Expression of type List[KeyValue[String, String]] doesn't conform to expected type KeyValue[_ <: String, _ <: String]

我更新了我的代码,但现在它抛出了另一个错误:

    val builder = new StreamsBuilder()
    builder.stream(sourceTopic)
      .map[String, Option[JsonMessage]]((key, value) => toJsonEvent(key, value))
      .flatMap(
      (key, value) => toFormattedEvents(key, value)
    )
      .to(sinkTopic)
    builder.build()
  }

  private val toJsonEvent = (key: String, value: String) => {
    println(value)
    val jsonEventsAsCaseClasses = jsonToClass(value)
    new KeyValue(key, jsonEventsAsCaseClasses)
  }

  private val toFormattedEvents = (key: String, value: Option[JsonMessage]) => {
    val jsonEvents: List[String] = formatEvents(value)
    jsonEvents.map(event => new KeyValue(key,event)).toIterable.asJava
  }
Error:(15, 8) inferred type arguments [?1,?0] do not conform to method flatMap's type parameter bounds [KR,VR]
      .flatMap(
Error:(16, 20) type mismatch;
 found   : org.apache.kafka.streams.kstream.KeyValueMapper[String,Option[org.minsait.streams.model.JsonMessage],Iterable[org.apache.kafka.streams.KeyValue[String,String]]]
 required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: String, _ >: Option[org.minsait.streams.model.JsonMessage], _ <: Iterable[_ <: org.apache.kafka.streams.KeyValue[_ <: KR, _ <: VR]]]
      (key, value) => toFormattedEvents(key, value)

最佳答案

看一下 flatMap()flatMapValues() 来替换第二个 map()https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/KStream.html

例如:

.flatMap[String, Long]((key, value) => Seq(("foo", 1L), ("bar", 2L)))

如果您出于某种原因确实打算继续使用 map,请参阅下文。

The second map is not compiling due to this.

Expression of type List[KeyValue[String, String]] doesn't conform to expected type KeyValue[_ <: String, _ <: String]

对应的代码,供引用:

.map[String, String]((key, value) => toFormattedEvents(key, value))

您需要的是:

.map[String, List[KeyValue[KR, VR]]]((key, value) => toFormattedEvents(key, value))

其中 KRVR 分别是 toFormattedEvents() 返回的键和值类型(实际返回类型不是从你的问题中可以清楚地看出)。为此,您还必须具有 List[KeyValue[KR, VR]] 类型的 Serde

让我用几种不同的类型来说明这一点,以便更容易理解方法调用的哪一部分引用哪种类型。假设我们希望映射输出具有 Integer 的键类型和 List[KeyValue[String, Long]] 的值类型:

.map[Integer, List[KeyValue[String, Long]]]((key, value) => (5, List(KeyValue.pair("foo", 1L), KeyValue.pair("bar", 2L))))

请注意,此示例为每个映射记录分配相同的值,但这不是示例的重点。

关于scala - 如何将KeyValue列表发送到Kafka?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56356112/

相关文章:

elasticsearch - 用kafka接收器在elasticsearch中重命名索引

java - Flink - 查询Kafka主题以获取消费者组的偏移量?

amazon-web-services - 如何使用 Cloudformation 在 AWS 上创建并保存脚本?尝试启动 kafka - Zookeeper 节点

apache-kafka - Kafka Streams - 无法重新平衡错误

scala - 在运行时编译和执行Scala代码

Scala 在同一行声明多个变量,且第一个字符为大写

Scala Stackable Trait 和 Self Type 不兼容类型

apache-kafka-streams - Kafka 流提交偏移语义

java - 卡夫卡流 : Get CorruptRecordException

xml - 如何使用 Scala 创建 xhtml 查询字符串?