scala - 如何仅在 Spark Streaming 的分区内使用 `reduce`,也许使用 combineByKey?

标签 scala apache-spark redis spark-streaming partitioning

我已经通过 Kafka 将数据按键排序到我的 Spark Streaming 分区中,即在一个节点上找到的键在任何其他节点上都找不到。

我想使用 redis 及其 incrby(递增方式)命令作为状态引擎并减少发送到 redis 的请求数量,我想通过执行以下操作来部分减少我的数据每个工作节点本身的字数。 (关键是标签+时间戳从字数统计中获取我的功能)。 我想避免改组,让 Redis 负责跨工作节点添加数据。

即使我已检查数据是否在工作节点之间干净地拆分,.reduce(_ + _)(Scala 语法)也需要很长时间( map task 需要几秒而不是亚秒) ),因为 HashPartitioner 似乎将我的数据洗牌到随机节点以将其添加到那里。

如何在不触发 Scala 中使用 Spark Streaming 的洗牌步骤的情况下,在每个分区器上编写一个简单的字数减少?

注意 DStream 对象缺少一些 RDD 方法,这些方法只能通过 transform 方法获得。

看来我可以使用 combineByKey。我想跳过 mergeCombiners() 步骤,而是将累积的元组保留在原处。 《Learning Spark》一书神秘地说:

We can disable map-side aggregation in combineByKey() if we know that our data won’t benefit from it. For example, groupByKey() disables map-side aggregation as the aggregation function (appending to a list) does not save any space. If we want to disable map-side combines, we need to specify the partitioner; for now you can just use the partitioner on the source RDD by passing rdd.partitioner.

https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

然后这本书继续没有提供如何执行此操作的语法,到目前为止我也没有在谷歌上有任何运气。

更糟糕的是,据我所知,在 Spark Streaming 中没有为 DStream RDD 设置分区器,所以我不知道如何为 combineByKey 提供一个分区器,它不会最终打乱数据。

此外,“map-side”到底是什么意思,mapSideCombine = false 究竟有什么后果?

combineByKey 的 Scala 实现可以在以下位置找到 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 寻找 combineByKeyWithClassTag

如果解决方案涉及自定义分区器,请同时包含一个代码示例,说明如何将该分区器应用于传入的 DStream。

最佳答案

这可以使用 mapPartitions 来完成,它采用一个函数,将一个分区上的输入 RDD 的迭代器映射到输出 RDD 上的迭代器。

为了实现字数统计,我映射到 _._2 以删除 Kafka 键,然后使用 foldLeft 执行快速迭代器字数统计,初始化一个 mutable.hashMap,然后将其转换为 Iterator 以形成输出 RDD。

val myDstream = messages
  .mapPartitions( it =>
    it.map(_._2)
    .foldLeft(new mutable.HashMap[String, Int])(
      (count, key) => count += (key -> (count.getOrElse(key, 0) + 1))
    ).toIterator
  )

关于scala - 如何仅在 Spark Streaming 的分区内使用 `reduce`,也许使用 combineByKey?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39761558/

相关文章:

go - 如何在redis golang中将键值对传递给MSet?

java - Jedis断管异常

java - Spark 错误 : NoSuchMethodError: scala. Predef$.$conforms()Lscala/Predef$$less$colon$less

linux - 为什么 SBT 0.7.7 在我的 Linux 系统上不能正常工作? (案例详情在里面)

java - Apache Spark - 无法理解 scala 示例

apache-spark - 如何使用 flatMapGroupsWithState 进行有状态聚合?

scala - 将数据框转换为 spark scala 中的配置单元表

java - Apache common-email lib - TLS 显示警告

java - 将 Spark Streaming 中的新值与之前的平均值进行比较

php - 使用 Twemproxy 安装了 Redis 集群,我真的很困惑为什么某些 SET 命令被 MOVED