apache-spark - Cassandra/Spark 显示大表的错误条目数

标签 apache-spark cassandra pyspark spark-cassandra-connector

我正在尝试使用 spark 处理大型 cassandra 表(约 4.02 亿个条目和 84 列),但我得到的结果不一致。最初的需求是将一些列从这个表复制到另一个表。复制数据后,我注意到新表中的一些条目丢失了。为了验证我是否计算了大型源表,但我每次都得到不同的值。我在一个较小的表(约 700 万条记录)上尝试了查询,结果很好。

最初,我尝试使用 pyspark 进行计数。这是我的 pyspark 脚本:

spark = SparkSession.builder.appName("Datacopy App").getOrCreate() 
df = spark.read.format("org.apache.spark.sql.cassandra").options(table=sourcetable, keyspace=sourcekeyspace).load().cache() 
df.createOrReplaceTempView("data") 
query = ("select count(1) from data " ) 
vgDF = spark.sql(query) 
vgDF.show(10)

Spark提交命令如下:

~/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --master spark://10.128.0.18:7077 --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 --conf spark.cassandra.connection.host="10.128.1.1,10.128.1.2,10.128.1.3" --conf "spark.storage.memoryFraction=1" --conf spark.local.dir=/media/db/ --executor-memory 10G --num-executors=6 --executor-cores=2 --total-executor-cores 18 pyspark_script.py

上述 spark 提交过程大约需要 90 分钟才能完成。我运行了 3 次,这是我得到的计数:

  • Spark 迭代 1:402273852
  • Spark 迭代 2:402273884
  • Spark 迭代 3:402274209

Spark在整个过程中没有显示任何错误或异常。我在 cqlsh 中运行相同的查询三次,但再次得到不同的结果:

  • Cqlsh 迭代 1:402273598
  • Cqlsh 迭代 2:402273499
  • Cqlsh 迭代 3:402273515

我无法找出为什么我从同一个查询中得到不同的结果。 Cassandra 系统日志 (/var/log/cassandra/system.log) 仅显示一次以下错误消息:

ERROR [SSTableBatchOpen:3] 2018-02-27 09:48:23,592 CassandraDaemon.java:226 - Exception in thread Thread[SSTableBatchOpen:3,5,main]
java.lang.AssertionError: Stats component is missing for sstable /media/db/datakeyspace/sensordata1-acfa7880acba11e782fd9bf3ae460699/mc-58617-big
        at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:460) ~[apache-cassandra-3.9.jar:3.9]
        at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:375) ~[apache-cassandra-3.9.jar:3.9]
        at org.apache.cassandra.io.sstable.format.SSTableReader$4.run(SSTableReader.java:536) ~[apache-cassandra-3.9.jar:3.9]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_131]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_131]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

版本:

  • Cassandra 3.9。
  • Spark 2.1.0。
  • Datastax 的 spark-cassandra-connector 2.0.1
  • Scala 2.11 版

集群:

  • Spark 设置有 3 个工作节点和 1 个主节点。
  • 3 个工作节点也安装了一个 cassandra 集群。
  • 每个工作节点都有 8 个 CPU 内核和 40 GB RAM。

任何帮助将不胜感激。

最佳答案

Spark Cassandra 连接器默认读取一致性为“LOCAL_ONE”,默认写入一致性为“LOCAL_QUORUM”,因此可以在使用该默认值进行完全修复之前读取部分数据。对于写入数据失败的节点,您可以读取“ONE”,但这不是错误,因为其他 2 个副本成功。因此,您应该将两个级别都设置为 QUORUM,或者将其中一个级别设置为 ALL

config("spark.cassandra.input.consistency.level", "LOCAL_QUORUM").
config("spark.cassandra.output.consistency.level", "LOCAL_QUORUM").

默认的 CQL shell 级别也是 ONE,所以你也应该增加它:

cqlsh> CONSISTENCY QUORUM

关于apache-spark - Cassandra/Spark 显示大表的错误条目数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49081803/

相关文章:

r - sparkR hdfs 错误 - 服务器 IPC 版本 9 无法与客户端版本 4 通信

scala - 不兼容的 Jackson 版本 : Spark Structured Streaming

apache-spark - 在 Spark Thrift 服务器中缓存 DataFrame

python - 为什么我从 date_format() PySpark 函数得到空结果?

python - 如何将行值与前一行值进行比较?

apache-spark - 为什么 Spark 流很慢?

java - 使用 datastax java 驱动程序 2.0 的多线程

java - Cassandra 中的时间戳

database - Apache Cassandra 数据库的安全副本

apache-spark - 在spark中saveAsTextFile时如何命名文件?