apache-spark - SaveToCassandra ,是否有写入行的顺序

标签 apache-spark cassandra spark-streaming datastax

这是我保存到 Cassandra 表的 RDD 的内容。 但是看起来第二行是先写的,然后第一行覆盖它。所以我最终得到了糟糕的输出。

(494bce4f393b474980290b8d1b6ebef9, 2017-02-01, PT0H9M30S, WEDNESDAY) (494bce4f393b474980290b8d1b6ebef9, 2017-02-01, PT0H10M0S, WEDNESDAY)

有没有办法强制写入 Cassandra 的行的顺序。 请帮忙。 谢谢

最佳答案

是否有 SaveToCassandra 命令?

在单个任务中执行是确定性的,但可能不是 订购您所期待的。这里有两件事需要考虑。

  1. RDD 由 Spark 分区组成,这些分区的执行顺序取决于系统条件。拥有不同数量的核心、异构机器或执行器故障都可能改变执行顺序。具有相同 Cassandra 分区数据的两个 Spark 分区可以基于系统以任何顺序执行。
  2. 对于每个 Spark 分区,记录按接收顺序进行批处理,但这并不一定意味着它们将按相同顺序发送到 Cassandra。连接器中有一些设置可以确定何时发送批处理,并且可以想象包含较晚数据的批处理将在包含较早数据的批处理之前执行。这意味着虽然发送批处理的顺序是确定的,但不一定与前一个迭代器的顺序相同。

这对您的申请重要吗?

可能不会。只有当你的数据真的分散时,这才真正重要 在 RDD 中。如果特定 Cassandra 分区的条目分布在 多个 Spark 分区,那么 Spark 执行的顺序可能会困惑 你的更新。考虑

Spark Partition 1 has Record A
Spark Partition 2 has Record B

Both Spark Partitions have work start simultaneously, but Record B is
reached before Record A.

但我认为这不太可能是问题。

您遇到的问题很可能是常见问题:the order of statements in my batch is not respected .这个问题的核心是 Cassandra 批处理中的所有语句都是“同时”执行的。这意味着如果任何 Primary Key 存在冲突,则需要解决冲突。在这些情况下,Cassandra 会为所有冲突选择较大的单元格值。由于连接器会自动将对同一分区键的写入批处理在一起,因此您最终可能会遇到冲突。

您可以在您的示例中看到这一点,较大的值 (PT0H9M30S) 被保留,较小的 (PT0H10M0S) 被丢弃。问题不在于顺序,而在于批处理正在发生。

那我怎样才能根据时间做upsert呢?

非常仔细。我会考虑采用几种方法。

最好的选择是不根据时间进行更新插入。如果您有一个 PRIMARY_KEY 的多个条目但只想要最后一个,请在访问 Cassandra 之前在 Spark 中进行缩减。在尝试写入之前删除不需要的条目将节省时间并减轻 Cassandra 集群的负载。否则,您会将 Cassandra 用作相当昂贵的重复数据删除机器。

一个更糟糕的选择是只禁用 Spark Cassandra 连接器中的批处理。这会损害性能,但如果您只关心 Spark 分区中的顺序,则可以解决问题。如果您有多个 Spark 分区,这仍然会导致冲突,因为您无法控制它们的执行顺序。

这个故事的寓意

状态不好。订单很糟糕。尽可能将系统设计为幂等的。如果有多个记录并且您知道哪些重要,请在进入分布式 LWW 系统之前删除不重要的记录。

关于apache-spark - SaveToCassandra ,是否有写入行的顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42020173/

相关文章:

hadoop - Yarn 上的 Spark 作业 |性能调整和优化

scala - Google dataproc spark 作业失败并显示 "Node was restarted while executing a job."消息

scala - 在 Spark 中使用 "when"函数填充数据框

apache-spark - 如何查看 SPARK 发送到我的数据库的 SQL 语句?

node.js - Cassandra Sails model.count() 有效,但 model.find() 和 model.findOne() 无效

java - 如何以稳健的方式处理kafka发布失败

apache-spark - 长时间正常运行后,Spark 有状态流作业在检查点到 S3 时挂起

scala - Spark : Input a vector

apache-spark - 在 Spark 中使用窗口函数

nosql - Cassandra (CQL3) 是否可以插入语句二进制数据?