检查所有来源后发现datastax-spark-cassandra连接器支持在scala和java中使用rdd在cassandra中自动创建表。对于 pyspark,特别是另一个包可以完成这项工作 - https://github.com/anguenot/pyspark-cassandra 。但即使使用这个包也无法自动创建表。对于数据框,我根本没有找到任何选项。我是 pyspark 和 cassandra 的新手,非常感谢任何帮助。也尝试仅使用 anguenot 包作为依赖项。 Spark 版本:2.4.7 Cassandra:最新的 docker 镜像
Pyspark shell >> pyspark --packages anguenot/pyspark-cassandra:2.4.0,com.datastax.spark:spark-cassandra-connector_2.11:2.5.1
>>> spark = SparkSession.builder.master('local[*]').appName('cassandra').config("spark.cassandra.connection.host", "ip").config("spark.cassandra.connection.port", "port").config("spark.cassandra.auth.username", "username").config("spark.cassandra.auth.password", "password").getOrCreate()
>>> from datetime import datetime
>>> rdd = sc.parallelize([{
... "key": k,
... "stamp": datetime.now(),
... "tags": ["a", "b", "c"],
... "options": {
... "foo": "bar",
... "baz": "qux",
... }
... } for k in ["x", "y", "z"]])
>>> rdd.saveToCassandra("test", "testTable")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: 'RDD' object has no attribute 'saveToCassandra'
最佳答案
通常,可以从 Spark Cassandra 连接器为 RDD ( saveAsCassandraTable or saveAsCassandraTableEx ) 或 Dataframe ( createCassandraTable and createCassandraTableEx ) 创建表,但此功能仅在 Scala API 中可用。
自版本 3.0 起,Spark Cassandra 连接器 supports Catalogs API (Spark 3+),因此您将能够使用 Spark SQL 处理键空间和表(创建/更改/删除),如下所示:
spark.sql("""
CREATE TABLE casscatalog.ksname.testTable (
key_1 Int, key_2 Int, key_3 Int,
cc1 STRING, cc2 String, cc3 String, value String)
USING cassandra
PARTITIONED BY (key_1, key_2, key_3)
TBLPROPERTIES (
clustering_key='cc1.asc, cc2.desc, cc3.asc'
)
""")
关于apache-spark - pyspark rdd/dataframe 不会自动在 cassandra 中创建表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66450607/