scala - KafkaUtils API |偏移量管理 | Spark 流

标签 scala apache-spark apache-kafka spark-streaming

我正在尝试管理 kafka 偏移量以获得一次语义。

使用偏移图创建直接流时遇到问题,如下所示:

val fromOffsets : (TopicAndPartition, Long) = TopicAndPartition(metrics_rs.getString(1), metrics_rs.getInt(2)) -> metrics_rs.getLong(3)

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,(String, String)] (ssc,kafkaParams,fromOffsets,messageHandler)

这里,

val messageHandler =
      (mmd: MessageAndMetadata[String, String]) => mmd.message.length

metrics_rs = metricsStatement.executeQuery("SELECT part,off from metrics.txn_offsets where topic='"+t+''' )

我想我的声明风格做错了......如果你能帮忙的话。 编译错误显示“createDirectStream 的类型参数太多”

最佳答案

我发现你做错了一些事情。

您需要传递一个Map[TopicAndPartition, Long],而当前您有一个Tuple2[TopicAndPartition, Long]。所以你需要:

val fromOffsets: Map[TopicAndPartition, Long] = 
    Map(TopicAndPartition(metrics_rs.getString(1), 
                          metrics_rs.getInt(2)) -> metrics_rs.getLong(3))

您说 createDirectStream 的返回类型是 (String, String) 类型的元组,但您的 messageHandler 值是 >int。如果你想返回一个带有键值对的元组,你需要:

val messageHandler: MessageAndMetadata[String, String] => (String, String) =
  (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())

修复该问题后,应该可以编译:

val stream = KafkaUtils
              .createDirectStream[String, String,
                      StringDecoder, StringDecoder,
                      (String, String)] (ssc, 
                                         kafkaParams, 
                                         fromOffsets, 
                                         messageHandler)

关于scala - KafkaUtils API |偏移量管理 | Spark 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39451307/

相关文章:

scala - Spark-SQL : How to read a TSV or CSV file into dataframe and apply a custom schema?

java - 格式化日期时间时如何应用时区?

scala - 当声明为var时,Scala不可变集是可变的

scala - 在 ScalaCheck 中生成任意线性函数?

hadoop - 在 Amazon EMR 上设置 Spark 类路径

python - 如何在spark中将rdd数据一分为二?

hadoop - 将小文件存储在HDFS中并在Nifi Flow中归档

node.js - kafka 消费者组在 kafka-node 中获取重复消息

java - Kubernetes 上的 Kafka 流 : Long rebalancing after redeployment

linux - 输入 spark-shell 时找不到 spark submit