我正在尝试在 Kafka Streams 应用程序中将 List[KeyValue] 发送到该主题。但流需要一个 KeyValue。如何将 KeyValues 发送到流,而不是整个列表?
class MainTopology {
val createTopology = (sourceTopic: String, sinkTopic: String) => {
val builder = new StreamsBuilder()
builder.stream(sourceTopic)
.map[String, Option[JsonMessage]]((key, value) => toJsonEvent(key, value))
.map[String, String]((key, value) => toFormattedEvents(key, value))
.to(sinkTopic)
builder.build()
}
private val toJsonEvent = (key: String, value: String) => {
println(value)
val jsonEventsAsCaseClasses = jsonToClass(value)
new KeyValue(key, jsonEventsAsCaseClasses)
}
private val toFormattedEvents = (key: String, value: Option[JsonMessage]) => {
val jsonEvents: List[String] = formatEvents(value)
jsonEvents.map(event => new KeyValue(key,event))
}
}
因此第二张 map 无法编译。
Expression of type List[KeyValue[String, String]] doesn't conform to expected type KeyValue[_ <: String, _ <: String]
我更新了我的代码,但现在它抛出了另一个错误:
val builder = new StreamsBuilder()
builder.stream(sourceTopic)
.map[String, Option[JsonMessage]]((key, value) => toJsonEvent(key, value))
.flatMap(
(key, value) => toFormattedEvents(key, value)
)
.to(sinkTopic)
builder.build()
}
private val toJsonEvent = (key: String, value: String) => {
println(value)
val jsonEventsAsCaseClasses = jsonToClass(value)
new KeyValue(key, jsonEventsAsCaseClasses)
}
private val toFormattedEvents = (key: String, value: Option[JsonMessage]) => {
val jsonEvents: List[String] = formatEvents(value)
jsonEvents.map(event => new KeyValue(key,event)).toIterable.asJava
}
Error:(15, 8) inferred type arguments [?1,?0] do not conform to method flatMap's type parameter bounds [KR,VR]
.flatMap(
Error:(16, 20) type mismatch;
found : org.apache.kafka.streams.kstream.KeyValueMapper[String,Option[org.minsait.streams.model.JsonMessage],Iterable[org.apache.kafka.streams.KeyValue[String,String]]]
required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: String, _ >: Option[org.minsait.streams.model.JsonMessage], _ <: Iterable[_ <: org.apache.kafka.streams.KeyValue[_ <: KR, _ <: VR]]]
(key, value) => toFormattedEvents(key, value)
最佳答案
看一下 flatMap()
和 flatMapValues()
来替换第二个 map()
。
https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/KStream.html
例如:
.flatMap[String, Long]((key, value) => Seq(("foo", 1L), ("bar", 2L)))
如果您出于某种原因确实打算继续使用 map
,请参阅下文。
The second map is not compiling due to this.
Expression of type List[KeyValue[String, String]] doesn't conform to expected type KeyValue[_ <: String, _ <: String]
对应的代码,供引用:
.map[String, String]((key, value) => toFormattedEvents(key, value))
您需要的是:
.map[String, List[KeyValue[KR, VR]]]((key, value) => toFormattedEvents(key, value))
其中 KR
和 VR
分别是 toFormattedEvents()
返回的键和值类型(实际返回类型不是从你的问题中可以清楚地看出)。为此,您还必须具有 List[KeyValue[KR, VR]]
类型的 Serde
。
让我用几种不同的类型来说明这一点,以便更容易理解方法调用的哪一部分引用哪种类型。假设我们希望映射输出具有 Integer
的键类型和 List[KeyValue[String, Long]]
的值类型:
.map[Integer, List[KeyValue[String, Long]]]((key, value) => (5, List(KeyValue.pair("foo", 1L), KeyValue.pair("bar", 2L))))
请注意,此示例为每个映射记录分配相同的值,但这不是示例的重点。
关于scala - 如何将KeyValue列表发送到Kafka?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56356112/