问题描述
在检查所有来源后发现 datastax-spark-cassandra 连接器支持在 cassandra 中使用 rdd 在 Scala 和 java 中自动创建表。对于 pyspark,特别是另一个包可用于完成这项工作——https://github.com/anguenot/pyspark-cassandra。但即使使用此包也无法自动创建表。使用数据框,我根本没有找到任何选项。我是 pyspark 和 cassandra 的新手,非常感谢任何帮助。也尝试仅使用 anguenot 包作为依赖项。 火花版本:2.4.7 Cassandra : 最新的 docker 镜像
Pyspark shell >> pyspark --packages anguenot/pyspark-cassandra:2.4.0,com.datastax.spark:spark-cassandra-connector_2.11:2.5.1
>>> spark = SparkSession.builder.master('local[*]').appName('cassandra').config("spark.cassandra.connection.host","ip").config("spark.cassandra.connection.port","port").config("spark.cassandra.auth.username","username").config("spark.cassandra.auth.password","password").getorCreate()
>>> from datetime import datetime
>>> rdd = sc.parallelize([{
... "key": k,... "stamp": datetime.Now(),... "tags": ["a","b","c"],... "options": {
... "foo": "bar",... "baz": "qux",... }
... } for k in ["x","y","z"]])
>>> rdd.savetoCassandra("test","testTable")
Traceback (most recent call last):
File "<stdin>",line 1,in <module>
AttributeError: 'RDD' object has no attribute 'savetoCassandra'
解决方法
通常,可以从 Spark Cassandra Connector for RDD (saveAsCassandraTable or saveAsCassandraTableEx) 或 Dataframes (createCassandraTable and createCassandraTableEx) 创建表,但此功能仅在 Scala API 中可用。
自 3.0 版起,Spark Cassandra Connector supports Catalogs API (Spark 3+),因此您将能够使用 Spark SQL 处理键空间和表(创建/更改/删除),如下所示:>
spark.sql("""
CREATE TABLE casscatalog.ksname.testTable (
key_1 Int,key_2 Int,key_3 Int,cc1 STRING,cc2 String,cc3 String,value String)
USING cassandra
PARTITIONED BY (key_1,key_2,key_3)
TBLPROPERTIES (
clustering_key='cc1.asc,cc2.desc,cc3.asc'
)
""")
,
您应该在创建 rdd 之前导入 pyspark_cassandra
:
>>> import pyspark_cassandra
>>> rdd = sc.parallelize(...)
>>> rdd.saveToCassandra("test","testTable")