问题描述
我目前正在尝试将我的应用程序升级到Spark 3.0.1。对于表创建,我使用cassandra-driver(Python-Cassandra连接器)拖放并创建表。然后,我使用spark-cassandra连接器将数据帧写入表中。仅使用spark-cassandra连接器来创建和删除表并没有很好的选择。
使用Spark 2.4,drop-create-write流没有问题。但是对于Spark 3.0,应用程序似乎没有特定的顺序执行这些操作,通常尝试在删除和创建之前进行编写。我不知道如何确保首先删除和创建表。我知道即使应用程序在写入时出错也确实会发生删除和创建,因为当我通过cqlsh查询Cassandra时,我可以看到该表已删除并重新创建。关于Spark 3.0中此行为的任何想法吗?
注意:由于架构会发生变化,因此需要删除并重新创建此特定表,而不是直接覆盖。
所要求的代码段:
session = self._get_python_cassandra_session(self.env_conf,self.database)
# build drop table query
drop_table_query = 'DROP TABLE IF EXISTS {}.{}'.format(self.database,tablename)
session.execute(drop_table_query)
df,table_columns,table_keys = self._create_table_Metadata(df,keys=keys)
# build create query
create_table_query = 'CREATE TABLE IF NOT EXISTS {}.{} ({} PRIMARY KEY({}),);'.format(self.database,tablename,table_keys)
# execute table creation
session.execute(create_table_query)
session.shutdown()
# spark-cassandra connection options
copts = _cassandra_cluster_spark_options(self.env_conf)
# set write mode
copts['confirm.truncate'] = overwrite
mode = 'overwrite' if overwrite else 'append'
# write dataframe to cassandra
get_dataframe_writer(df,'cassandra',keyspace=self.database,table=tablename,mode=mode,copts=copts).save()
解决方法
我最终建立了一个time.sleep(5)延迟,并具有100秒的超时时间,以便定期为该表ping Cassandra,然后写入是否找到该表。
,在Spark Cassandra Connector 3.0+中,您可以使用新功能-通过Catalogs API操作键空间和表。您可以使用Spark SQL创建/更改/删除键空间和表。例如,您可以使用以下命令在Cassandra中创建表:
null
如您在此处看到的,您可以指定非常复杂的主键,还可以指定表选项。 foo
件是链接到特定Cassandra群集的前缀(您可以同时使用多个)-在启动Spark作业时指定,例如:
CREATE TABLE casscatalog.ksname.table_name (
key_1 Int,key_2 Int,key_3 Int,cc1 STRING,cc2 String,cc3 String,value String)
USING cassandra
PARTITIONED BY (key_1,key_2,key_3)
TBLPROPERTIES (
clustering_key='cc1.asc,cc2.desc,cc3.asc',compaction='{class=SizeTieredCompactionStrategy,bucket_high=1001}'
)
在documentation中可以找到更多示例: