apache-spark - Spark : PySpark + Cassandra query performance

标签 apache-spark cassandra pyspark

我在本地机器(8 核,16gb 内存)上设置了 Spark 2.0 和 Cassandra 3.0 用于测试目的并编辑了 spark-defaults.conf如下:

spark.python.worker.memory 1g
spark.executor.cores 4
spark.executor.instances 4
spark.sql.shuffle.partitions 4

接下来我在 Cassandra 中导入了 150 万行:
test(
    tid int,
    cid int,
    pid int,
    ev list<double>,
    primary key (tid)
)
test.ev是一个包含数值的列表,即 [2240,2081,159,304,1189,1125,1779,693,2187,1738,546,496,382,1761,680]
现在在代码中,为了测试整个事情,我刚刚创建了一个 SparkSession ,连接到 Cassandra 并进行简单的选择计数:
cassandra = spark.read.format("org.apache.spark.sql.cassandra")
df = cassandra.load(keyspace="testks",table="test")
df.select().count()

此时Spark输出count并需要大约 28 秒才能完成 Job , 分布于 13 Tasks (在 Spark UI 中,任务的总输入为 331.6MB)

问题:
  • 这是预期的表现吗?如果没有,我错过了什么?
  • 理论上说,DataFrame 的分区数决定了 Spark 将在其中分配作业的任务数。如果我设置了 spark.sql.shuffle.partitions to 4、为什么是创建13个Tasks? (还要确保在我的 DataFrame 上调用 rdd.getNumPartitions() 的分区数)

  • 更新

    我想对这些数据进行测试的一个常见操作:
  • 查询一个大数据集,比如说,从按 pid 分组的 100,000 ~ N 行
  • 选择 ev , list<double>
  • 对每个成员执行平均,假设现在每个列表的长度相同,即 df.groupBy('pid').agg(avg(df['ev'][1]))

  • 正如@zero323 建议的那样,我为这个测试部署了一台带有 Cassandra 的外部机器(2Gb RAM、4 核、SSD),并加载了相同的数据集。 df.select().count()的结果与我之前的测试相比,预期延迟更大,整体性能更差(完成 Job 大约需要 70 秒)。

    编辑:我误解了他的建议。 @zero323 意味着让 Cassandra 执行计数而不是使用 Spark SQL,如 here 中所述

    另外我想指出的是,我知道设置 list<double> 的固有反模式。取而代之的是这类数据的一大排,但此时我更关心的是检索大型数据集所花费的时间,而不是实际的平均计算时间。

    最佳答案

    Is that the expected performance? If not, what am I missing?



    它看起来很慢,但并不完全出乎意料。一般情况count表示为
    SELECT 1 FROM table
    

    其次是 Spark 边求和。因此,虽然它被优化了,但它仍然相当低效,因为您从外部源获取 N 个长整数只是为了在本地对这些进行求和。

    正如 the docs 所解释的Cassandra 支持的 RDD(不是 Datasets)提供优化的 cassandraCount执行服务器端计数的方法。

    Theory says the number of partitions of a DataFrame determines the number of tasks Spark will distribute the job in. If I am setting the spark.sql.shuffle.partitions to (...), why is creating (...) Tasks?



    因为 spark.sql.shuffle.partitions这里不使用。此属性用于确定 shuffle 的分区数(当数据由某些键集聚合时),不适用于 Dataset创建或全局聚合,如 count(*) (始终使用 1 个分区进行最终聚合)。

    如果您对控制初始分区的数量感兴趣,您应该查看 spark.cassandra.input.split.size_in_mb 其中定义:

    Approx amount of data to be fetched into a Spark partition. Minimum number of resulting Spark partitions is 1 + 2 * SparkContext.defaultParallelism



    如您所见,这里的另一个因素是 spark.default.parallelism但它并不完全是一种微妙的配置,因此通常依赖于它不是最佳选择。

    关于apache-spark - Spark : PySpark + Cassandra query performance,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39576448/

    相关文章:

    java - 使用 Spark 运行 geotool 的影子 jar 时解析conf core-default.xml 时出错

    python - Pyspark 将列除以按另一列分组的小计

    apache-spark-sql - 如何计算 pyspark dataframe 中的每日基础(时间序列)

    sql - 如何在 SQL/Spark/GraphFrames 中进行这种转换

    python - 通过应用来自第二个数据框的规则来改变数据框

    python - 聚合数据框pyspark

    scala - 如何将 show 运算符的输出读回数据集?

    Elasticsearch vs Cassandra vs Elasticsearch with Cassandra

    cassandra - 如何在 Pig 中过滤 Cassandra TimeUUID/UUID

    python - 为什么 cqlsh 失败并出现 LookupError : unknown encoding?