Spark 3.0和Cassandra Spark / Python连接器:在写之前未创建表

问题描述

我目前正在尝试将我的应用程序升级到Spark 3.0.1。对于表创建,我使用cassandra-driver(Python-Cassandra连接器)拖放并创建表。然后,我使用spark-cassandra连接器将数据帧写入表中。仅使用spark-cassandra连接器来创建和删除表并没有很好的选择。

使用Spark 2.4,drop-create-write流没有问题。但是对于Spark 3.0,应用程序似乎没有特定的顺序执行这些操作,通常尝试在删除和创建之前进行编写。我不知道如何确保首先删除和创建表。我知道即使应用程序在写入时出错也确实会发生删除和创建,因为当我通过cqlsh查询Cassandra时,我可以看到该表已删除并重新创建。关于Spark 3.0中此行为的任何想法吗?

注意:由于架构会发生变化,因此需要删除并重新创建此特定表,而不是直接覆盖。

所要求的代码段:

        session = self._get_python_cassandra_session(self.env_conf,self.database)
        # build drop table query
        drop_table_query = 'DROP TABLE IF EXISTS {}.{}'.format(self.database,tablename)
        session.execute(drop_table_query)

        df,table_columns,table_keys = self._create_table_Metadata(df,keys=keys)
        # build create query
        create_table_query = 'CREATE TABLE IF NOT EXISTS {}.{} ({} PRIMARY KEY({}),);'.format(self.database,tablename,table_keys)
        # execute table creation
        session.execute(create_table_query)
        session.shutdown()


        # spark-cassandra connection options
        copts = _cassandra_cluster_spark_options(self.env_conf)
        # set write mode
        copts['confirm.truncate'] = overwrite
        mode = 'overwrite' if overwrite else 'append'
        # write dataframe to cassandra
        get_dataframe_writer(df,'cassandra',keyspace=self.database,table=tablename,mode=mode,copts=copts).save()

解决方法

我最终建立了一个time.sleep(5)延迟,并具有100秒的超时时间,以便定期为该表ping Cassandra,然后写入是否找到该表。

,

在Spark Cassandra Connector 3.0+中,您可以使用新功能-通过Catalogs API操作键空间和表。您可以使用Spark SQL创建/更改/删除键空间和表。例如,您可以使用以下命令在Cassandra中创建表:

null

如您在此处看到的,您可以指定非常复杂的主键,还可以指定表选项。 foo件是链接到特定Cassandra群集的前缀(您可以同时使用多个)-在启动Spark作业时指定,例如:

CREATE TABLE casscatalog.ksname.table_name (
  key_1 Int,key_2 Int,key_3 Int,cc1 STRING,cc2 String,cc3 String,value String) 
USING cassandra
PARTITIONED BY (key_1,key_2,key_3)
TBLPROPERTIES (
    clustering_key='cc1.asc,cc2.desc,cc3.asc',compaction='{class=SizeTieredCompactionStrategy,bucket_high=1001}'
)

documentation中可以找到更多示例: