我在本地机器(8 核,16gb 内存)上设置了 Spark 2.0 和 Cassandra 3.0 用于测试目的并编辑了 spark-defaults.conf
如下:
spark.python.worker.memory 1g
spark.executor.cores 4
spark.executor.instances 4
spark.sql.shuffle.partitions 4
接下来我在 Cassandra 中导入了 150 万行:
test(
tid int,
cid int,
pid int,
ev list<double>,
primary key (tid)
)
test.ev
是一个包含数值的列表,即 [2240,2081,159,304,1189,1125,1779,693,2187,1738,546,496,382,1761,680]
现在在代码中,为了测试整个事情,我刚刚创建了一个
SparkSession
,连接到 Cassandra 并进行简单的选择计数:cassandra = spark.read.format("org.apache.spark.sql.cassandra")
df = cassandra.load(keyspace="testks",table="test")
df.select().count()
此时Spark输出
count
并需要大约 28 秒才能完成 Job
, 分布于 13 Tasks
(在 Spark UI
中,任务的总输入为 331.6MB)问题:
spark.sql.shuffle.partitions
to 4、为什么是创建13个Tasks? (还要确保在我的 DataFrame 上调用 rdd.getNumPartitions()
的分区数)更新
我想对这些数据进行测试的一个常见操作:
pid
分组的 100,000 ~ N 行ev
, list<double>
df.groupBy('pid').agg(avg(df['ev'][1]))
正如@zero323 建议的那样,我为这个测试部署了一台带有 Cassandra 的外部机器(2Gb RAM、4 核、SSD),并加载了相同的数据集。
df.select().count()
的结果与我之前的测试相比,预期延迟更大,整体性能更差(完成 Job
大约需要 70 秒)。编辑:我误解了他的建议。 @zero323 意味着让 Cassandra 执行计数而不是使用 Spark SQL,如 here 中所述
另外我想指出的是,我知道设置
list<double>
的固有反模式。取而代之的是这类数据的一大排,但此时我更关心的是检索大型数据集所花费的时间,而不是实际的平均计算时间。
最佳答案
Is that the expected performance? If not, what am I missing?
它看起来很慢,但并不完全出乎意料。一般情况
count
表示为SELECT 1 FROM table
其次是 Spark 边求和。因此,虽然它被优化了,但它仍然相当低效,因为您从外部源获取 N 个长整数只是为了在本地对这些进行求和。
正如 the docs 所解释的Cassandra 支持的 RDD(不是
Datasets
)提供优化的 cassandraCount
执行服务器端计数的方法。Theory says the number of partitions of a DataFrame determines the number of tasks Spark will distribute the job in. If I am setting the
spark.sql.shuffle.partitions
to (...), why is creating (...) Tasks?
因为
spark.sql.shuffle.partitions
这里不使用。此属性用于确定 shuffle 的分区数(当数据由某些键集聚合时),不适用于 Dataset
创建或全局聚合,如 count(*)
(始终使用 1 个分区进行最终聚合)。如果您对控制初始分区的数量感兴趣,您应该查看
spark.cassandra.input.split.size_in_mb
其中定义:Approx amount of data to be fetched into a Spark partition. Minimum number of resulting Spark partitions is 1 + 2 * SparkContext.defaultParallelism
如您所见,这里的另一个因素是
spark.default.parallelism
但它并不完全是一种微妙的配置,因此通常依赖于它不是最佳选择。
关于apache-spark - Spark : PySpark + Cassandra query performance,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39576448/