pyspark使用数据框直接连接

问题描述

我无法让我的直接联接在spark-cassandra-connector 2.11-2.5.1中的两个数据框之间工作。 我开始的火花是:

spark-2.4.5-bin-hadoop2.6/bin/spark-submit \
      --packages com.datastax.spark:spark-cassandra-connector_2.11:2.5.1  \
      --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions

我的桌子正在尝试加入我的桌子

    CREATE KEYSPACE keyspace WITH replication = {
    'class': 'NetworkTopologyStrategy','realtime': '3'}
AND durable_writes = false;

CREATE TABLE keyspace.table (
    id text,id_type int,region_type int,region_id int,foreign_id uuid,PRIMARY KEY ((id,id_type),region_type,region_id)
) WITH CLUSTERING ORDER BY (region_type ASC,region_id ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {
        'keys': 'ALL','rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {
        'class': 'SizeTieredCompactionStrategy','tombstone_compaction_interval': '86400','unchecked_tombstone_compaction': 'true'}
    AND compression = {
        'chunk_length_in_kb': '16','sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.001
    AND default_time_to_live = 0
    AND gc_grace_seconds = 3600
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99.9PERCENTILE';

并执行我的测试python

log.error(f"-----------TEST METHOD------------")
log.error(f"--------printing config-------------")
conf_dict = dict(spark_session.sparkContext.getConf().getAll())
log.error(f"spark.sql.extensions = {conf_dict['spark.sql.extensions']}")

log.error(f"--------loading cassandra-------------")
cassandra_table = spark_session\
    .read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="table",keyspace="keyspace",directJoinSetting="on")\
    .load()

log.error(f"--------generating test dataframe-------------")
test_data=[('77ce7199-0dd0-11eb-b419-17c19fe60001',1,1)]
test_schema = StructType([
    StructField("id",StringType()),StructField("id_type",IntegerType()),StructField("region_type",StructField("region_id",IntegerType())])
test_join_df = spark_session.createDataFrame(test_data,schema=test_schema)

log.error(f"-------printing test dataframe----------")
log.error(f"TYPE : {print_type(test_join_df)}")
test_join_df.explain()
log.error(f"Data types {test_join_df.dtypes}")
test_join_df.show(truncate=False)

log.error(f"-------doing join------")
match_join = test_join_df\
    .join(
        cassandra_table,on=["id","id_type","region_type","region_id"],how="left")

log.error(f"-------printing join results----------")
match_join.explain()
log.error(f"--------------------------------------")
log.error(f"TYPE : {print_type(match_join)}")
log.error(f"Data types {match_join.dtypes}")
log.error(f"------- finished ----------")

但是输出中显示的解释

2020-11-06 11:11:44 ERROR __main__:? - -----------TEST METHOD------------
2020-11-06 11:11:44 ERROR __main__:? - --------printing config-------------
2020-11-06 11:11:45 ERROR __main__:? - spark.sql.extensions = com.datastax.spark.connector.CassandraSparkExtensions
2020-11-06 11:11:45 ERROR __main__:? - --------loading cassandra-------------
2020-11-06 11:11:49 ERROR __main__:? - --------generating test dataframe-------------
2020-11-06 11:11:50 ERROR __main__:? - -------printing test dataframe----------
2020-11-06 11:11:50 ERROR __main__:? - TYPE : DataFrame

== Physical Plan ==
Scan ExistingRDD[id#10,id_type#11,region_type#12,region_id#13]

2020-11-06 11:11:50 ERROR __main__:? - Data types [('id','string'),('id_type','int'),('region_type',('region_id','int')]

2020-11-06 11:12:06 WARN  YarnScheduler:66 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
+------------------------------------+---------------+-----------+---------+
|id                                  |id_type        |region_type|region_id|
+------------------------------------+---------------+-----------+---------+
|77ce7199-0dd0-11eb-b419-17c19fe60001|1              |1          |1        |
+------------------------------------+---------------+-----------+---------+

2020-11-06 11:12:19 ERROR __main__:? - -------doing join------

2020-11-06 11:12:19 ERROR __main__:? - -------printing join results----------
== Physical Plan ==
*(4) Project [id#10,region_id#13,foreign_id#4]
+- SortMergeJoin [id#10,region_id#13],[id#0,id_type#1,region_type#2,region_id#3],Left
   :- *(1) Sort [id#10 ASC NULLS FIRST,id_type#11 ASC NULLS FIRST,region_type#12 ASC NULLS FIRST,region_id#13 ASC NULLS FIRST],false,0
   :  +- Exchange hashpartitioning(id#10,200)
   :     +- Scan ExistingRDD[id#10,region_id#13]
   +- *(3) Sort [id#0 ASC NULLS FIRST,id_type#1 ASC NULLS FIRST,region_type#2 ASC NULLS FIRST,region_id#3 ASC NULLS FIRST],0
      +- Exchange hashpartitioning(id#0,region_id#3,200)
         +- *(2) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [id#0,foreign_id#4] PushedFilters: [],ReadSchema: struct<id:string,id_type:int,region_type:int,region_id:int,foreign_id:string>

2020-11-06 11:12:20 ERROR __main__:? - --------------------------------------
2020-11-06 11:12:20 ERROR __main__:? - TYPE : DataFrame
2020-11-06 11:12:20 ERROR __main__:? - Data types [('id',('foreign_id','string')]
2020-11-06 11:12:20 ERROR __main__:? - ------- finished ----------

它仍在执行全表扫描,而不是 Cassandra Direct Join 。 我的测试设置不正确还是我遗漏了一些东西?即使cassandra中的foreign_id类型不是键的一部分,也存在问题吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)