我正在尝试管理 kafka 偏移量以获得一次语义。
使用偏移图创建直接流时遇到问题,如下所示:
val fromOffsets : (TopicAndPartition, Long) = TopicAndPartition(metrics_rs.getString(1), metrics_rs.getInt(2)) -> metrics_rs.getLong(3)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,(String, String)] (ssc,kafkaParams,fromOffsets,messageHandler)
这里,
val messageHandler =
(mmd: MessageAndMetadata[String, String]) => mmd.message.length
和
metrics_rs = metricsStatement.executeQuery("SELECT part,off from metrics.txn_offsets where topic='"+t+''' )
我想我的声明风格做错了......如果你能帮忙的话。 编译错误显示“createDirectStream 的类型参数太多”
最佳答案
我发现你做错了一些事情。
您需要传递一个Map[TopicAndPartition, Long]
,而当前您有一个Tuple2[TopicAndPartition, Long]
。所以你需要:
val fromOffsets: Map[TopicAndPartition, Long] =
Map(TopicAndPartition(metrics_rs.getString(1),
metrics_rs.getInt(2)) -> metrics_rs.getLong(3))
您说 createDirectStream
的返回类型是 (String, String)
类型的元组,但您的 messageHandler
值是 >int
。如果你想返回一个带有键值对的元组,你需要:
val messageHandler: MessageAndMetadata[String, String] => (String, String) =
(mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())
修复该问题后,应该可以编译:
val stream = KafkaUtils
.createDirectStream[String, String,
StringDecoder, StringDecoder,
(String, String)] (ssc,
kafkaParams,
fromOffsets,
messageHandler)
关于scala - KafkaUtils API |偏移量管理 | Spark 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39451307/