我有一项艰巨的任务要从 Cassandra 表中读取数百万行。实际上这个表包含大约 40~50 百万行。
数据实际上是我们系统的内部 URL,我们需要触发所有这些 URL。为了启动它,我们使用 Akka Streams 并且它一直工作得很好,根据需要做一些背压。但是我们仍然没有找到一种有效阅读所有内容的方法。
到目前为止我们尝试过的:
代码如下:
val cassandraRdd =
sc
.cassandraTable("keyspace", "my_table")
.select("id", "url")
.where("year = ? and month = ? and day = ?", date.getYear, date.getMonthOfYear, date.getDayOfMonth)
不幸的是,我无法遍历分区以获取更少的数据,我必须使用 collect,因为它提示 actor 不可序列化。
val httpPool: Flow[(HttpRequest, String), (Try[HttpResponse], String), HostConnectionPool] = Http().cachedHostConnectionPool[String](host, port).async
val source =
Source
.actorRef[CassandraRow](10000000, OverflowStrategy.fail)
.map(row => makeUrl(row.getString("id"), row.getString("url")))
.map(url => HttpRequest(uri = url) -> url)
val ref = Flow[(HttpRequest, String)]
.via(httpPool.withAttributes(ActorAttributes.supervisionStrategy(decider)))
.to(Sink.actorRef(httpHandlerActor, IsDone))
.runWith(source)
cassandraRdd.collect().foreach { row =>
ref ! row
}
我想知道你们中是否有人在阅读数百万行以执行与聚合等不同的任何操作方面有这样的经验。
我还想阅读所有内容并发送到 Kafka 主题,我将在其中使用 Streaming(spark 或 Akka)接收,但问题是相同的,如何有效加载所有这些数据?
编辑
现在,我正在一个具有合理数量的 100GB 内存的集群上运行,并对其进行收集和迭代。
此外,这与使用 spark 获取大数据并使用 reduceByKey、aggregateByKey 等进行分析大不相同。
我需要通过 HTTP 获取和发送所有内容 =/
到目前为止,它的工作方式和我一样,但我担心这些数据会越来越大,以至于将所有内容都提取到内存中毫无意义。
流式传输这些数据将是最好的解决方案,分块获取,但我还没有找到一个好的方法。
最后,我想使用 Spark 来获取所有这些数据,生成一个 CSV 文件并使用 Akka Stream IO 进行处理,这样我会驱逐在内存中保留很多东西,因为处理每一个都需要几个小时百万。
最佳答案
好吧,在花了一些时间阅读,与其他人交谈并进行测试之后,结果可以通过以下代码示例来实现:
val sc = new SparkContext(sparkConf)
val cassandraRdd = sc.cassandraTable(config.getString("myKeyspace"), "myTable")
.select("key", "value")
.as((key: String, value: String) => (key, value))
.partitionBy(new HashPartitioner(2 * sc.defaultParallelism))
.cache()
cassandraRdd
.groupByKey()
.foreachPartition { partition =>
partition.foreach { row =>
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val myActor = system.actorOf(Props(new MyActor(system)), name = "my-actor")
val source = Source.fromIterator { () => row._2.toIterator }
source
.map { str =>
myActor ! Count
str
}
.to(Sink.actorRef(myActor, Finish))
.run()
}
}
sc.stop()
class MyActor(system: ActorSystem) extends Actor {
var count = 0
def receive = {
case Count =>
count = count + 1
case Finish =>
println(s"total: $count")
system.shutdown()
}
}
case object Count
case object Finish
我正在做的是以下内容:
也许这段代码可以进一步改进,但到目前为止我很高兴不再使用 .collect 并且只在 Spark 内部工作。
关于apache-spark - 如何有效地从 Cassandra 读取数百万行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36902958/