apache-spark - pyspark rdd/dataframe 不会自动在 cassandra 中创建表

标签 apache-spark pyspark cassandra rdd spark-cassandra-connector

检查所有来源后发现datastax-spark-cassandra连接器支持在scala和java中使用rdd在cassandra中自动创建表。对于 pyspark,特别是另一个包可以完成这项工作 - https://github.com/anguenot/pyspark-cassandra 。但即使使用这个包也无法自动创建表。对于数据框,我根本没有找到任何选项。我是 pyspark 和 cassandra 的新手,非常感谢任何帮助。也尝试仅使用 anguenot 包作为依赖项。 Spark 版本:2.4.7 Cassandra:最新的 docker 镜像

Pyspark shell >> pyspark --packages anguenot/pyspark-cassandra:2.4.0,com.datastax.spark:spark-cassandra-connector_2.11:2.5.1
>>> spark = SparkSession.builder.master('local[*]').appName('cassandra').config("spark.cassandra.connection.host", "ip").config("spark.cassandra.connection.port", "port").config("spark.cassandra.auth.username", "username").config("spark.cassandra.auth.password", "password").getOrCreate()
>>> from datetime import datetime
>>> rdd = sc.parallelize([{
...     "key": k,
...     "stamp": datetime.now(),
...     "tags": ["a", "b", "c"],
...     "options": {
...             "foo": "bar",
...             "baz": "qux",
...     }
... } for k in ["x", "y", "z"]])

>>> rdd.saveToCassandra("test", "testTable")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'RDD' object has no attribute 'saveToCassandra' 

最佳答案

通常,可以从 Spark Cassandra 连接器为 RDD ( saveAsCassandraTable or saveAsCassandraTableEx ) 或 Dataframe ( createCassandraTable and createCassandraTableEx ) 创建表,但此功能仅在 Scala API 中可用。

自版本 3.0 起,Spark Cassandra 连接器 supports Catalogs API (Spark 3+),因此您将能够使用 Spark SQL 处理键空间和表(创建/更改/删除),如下所示:

spark.sql("""
CREATE TABLE casscatalog.ksname.testTable (
     key_1 Int, key_2 Int, key_3 Int, 
     cc1 STRING, cc2 String, cc3 String, value String) 
  USING cassandra
  PARTITIONED BY (key_1, key_2, key_3)
  TBLPROPERTIES (
    clustering_key='cc1.asc, cc2.desc, cc3.asc'
  )
""")

关于apache-spark - pyspark rdd/dataframe 不会自动在 cassandra 中创建表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66450607/

相关文章:

cassandra - 如何配置 Cassandra 监听的接口(interface)?

java - cassandra 中是否有任何机制可以在通过 INSERT 或 UPDATE 查询更改表时发送通知?

scala - Spark/Mllib 以分布式方式训练许多 GaussianMixture 模型

hadoop - 在 pyspark 数据帧计数函数中得到 `java.nio.BufferOverflowException`

apache-spark - 使用 SparkLauncher 以编程方式向 dse Spark 集群提交 Spark 作业

datetime - 从 Pyspark 中包含时间戳的字符串列中提取日期

java - 如何使用 Java 驱动程序 (2.0.2,3.1) 和 cassandra 3.7 在 java 中使用 DCAwareRoundRobinPolicy

java - 使用 Java 将数据存储为 Apache Spark 中的 Hive 表

python - 如何在 pyspark 的 RDD 上访问元组中的单个元素?

python - 查找任何 pyspark 数据集的 "primary key"