apache-spark - 如何有效地从 Cassandra 读取数百万行?

标签 apache-spark cassandra spark-streaming akka-stream phantom-dsl

我有一项艰巨的任务要从 Cassandra 表中读取数百万行。实际上这个表包含大约 40~50 百万行。
数据实际上是我们系统的内部 URL,我们需要触发所有这些 URL。为了启动它,我们使用 Akka Streams 并且它一直工作得很好,根据需要做一些背压。但是我们仍然没有找到一种有效阅读所有内容的方法。

到目前为止我们尝试过的:

  • 使用 Akka Stream 将数据作为 Stream 读取。我们正在使用 phantom-dsl 为特定表提供发布者。但它不会读取所有内容,只读取一小部分。实际上它在第一个 100 万之后停止阅读。
  • 在特定日期使用 Spark 阅读。我们的表被建模为一个时间序列表,有年、月、日、分钟……列。现在我们是按天选择的,所以Spark不会取出很多要处理的东西,但是选择所有这些天很痛苦。

  • 代码如下:
    val cassandraRdd =
          sc
            .cassandraTable("keyspace", "my_table")
            .select("id", "url")
            .where("year = ? and month = ? and day = ?", date.getYear, date.getMonthOfYear, date.getDayOfMonth)
    

    不幸的是,我无法遍历分区以获取更少的数据,我必须使用 collect,因为它提示 actor 不可序列化。
    val httpPool: Flow[(HttpRequest, String), (Try[HttpResponse], String), HostConnectionPool] = Http().cachedHostConnectionPool[String](host, port).async
    
    val source =
      Source
        .actorRef[CassandraRow](10000000, OverflowStrategy.fail)
        .map(row => makeUrl(row.getString("id"), row.getString("url")))
        .map(url => HttpRequest(uri = url) -> url)
    
    val ref = Flow[(HttpRequest, String)]
      .via(httpPool.withAttributes(ActorAttributes.supervisionStrategy(decider)))
      .to(Sink.actorRef(httpHandlerActor, IsDone))
      .runWith(source)
    
    cassandraRdd.collect().foreach { row =>
      ref ! row
    }
    

    我想知道你们中是否有人在阅读数百万行以执行与聚合等不同的任何操作方面有这样的经验。

    我还想阅读所有内容并发送到 Kafka 主题,我将在其中使用 Streaming(spark 或 Akka)接收,但问题是相同的,如何有效加载所有这些数据?

    编辑

    现在,我正在一个具有合理数量的 100GB 内存的集群上运行,并对其进行收集和迭代。

    此外,这与使用 spark 获取大数据并使用 reduceByKey、aggregateByKey 等进行分析大不相同。

    我需要通过 HTTP 获取和发送所有内容 =/

    到目前为止,它的工作方式和我一样,但我担心这些数据会越来越大,以至于将所有内容都提取到内存中毫无意义。

    流式传输这些数据将是最好的解决方案,分块获取,但我还没有找到一个好的方法。

    最后,我想使用 Spark 来获取所有这些数据,生成一个 CSV 文件并使用 Akka Stream IO 进行处理,这样我会驱逐在内存中保留很多东西,因为处理每一个都需要几个小时百万。

    最佳答案

    好吧,在花了一些时间阅读,与其他人交谈并进行测试之后,结果可以通过以下代码示例来实现:

    val sc = new SparkContext(sparkConf)
    
    val cassandraRdd = sc.cassandraTable(config.getString("myKeyspace"), "myTable")
      .select("key", "value")
      .as((key: String, value: String) => (key, value))
      .partitionBy(new HashPartitioner(2 * sc.defaultParallelism))
      .cache()
    
    cassandraRdd
      .groupByKey()
      .foreachPartition { partition =>
        partition.foreach { row =>
    
          implicit val system = ActorSystem()
          implicit val materializer = ActorMaterializer()
    
          val myActor = system.actorOf(Props(new MyActor(system)), name = "my-actor")
    
          val source = Source.fromIterator { () => row._2.toIterator }
          source
            .map { str =>
              myActor ! Count
              str
            }
            .to(Sink.actorRef(myActor, Finish))
            .run()
        }
      }
    
    sc.stop()
    
    
    class MyActor(system: ActorSystem) extends Actor {
    
      var count = 0
    
      def receive = {
    
        case Count =>
          count = count + 1
    
        case Finish =>
          println(s"total: $count")
          system.shutdown()
    
      }
    }
    
    case object Count
    case object Finish
    

    我正在做的是以下内容:
  • 尝试使用 partitionBy 和 groupBy 方法实现大量分区和分区器
  • 使用Cache防止Data Shuffle,让你的Spark跨节点移动大数据,使用高IO等
  • 使用它的依赖项以及 foreachPartition 方法中的 Stream 创建整个 actor 系统。这是一个权衡,您只能拥有一个 ActorSystem 但您将不得不像我在问题中所写的那样滥用 .collect 。然而,在内部创建所有内容,您仍然可以在分布在集群中的 Spark 中运行内容。
  • 使用 Sink.actorRef 在迭代器的末尾完成每个 actor 系统,并带有一条消息 to kill(Finish)

  • 也许这段代码可以进一步改进,但到目前为止我很高兴不再使用 .collect 并且只在 Spark 内部工作。

    关于apache-spark - 如何有效地从 Cassandra 读取数百万行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36902958/

    相关文章:

    apache-spark - 为什么我们需要 kafka 将数据提供给 apache spark

    python - 删除或加速 PySpark 中的显式 for 循环

    apache-spark - 流式场景的 Spark UI 上的 "Stages"是什么意思

    scala - 使用 scalapb 在 Spark Streaming 中解码 Proto Buf 消息时出错

    cassandra - Scylla 登录 CentOS 和 RedHat

    mongodb - 用于时间序列/记录仪器读取数据的 NoSQL,该数据也是版本化的

    python - 结合 Spark Streaming + MLlib

    apache-spark - EMR 没有检测到所有的内存

    hadoop - 将 S3 连接器与 Cloud Dataproc 结合使用时出现 java.lang.VerifyError

    cassandra - 如何选择Cassandra版本?