HDFS在spark中的应用

安装好HDFS环境,输入测试命令查看是否安装成功:

hdfs dfs -help

spark中将MysqL表存放到HDFS:

def getAsciiPartitions(ID:String,num:Int):Array[String]={
        var sql_partition:String = ""
        val list = List("0","1","2","3","4","5","6","7","8","9","a","b","c","d","e","f")
        if(num == 1){
            for(i <- list){
                sql_partition = sql_partition + ID + " LIKE " + "\'"+ i  +"%\'" + "/"
            }
        }

        if(num == 2){
            for(i <- list){
                for( j <- list){
                    sql_partition = sql_partition + ID + " LIKE " + "\'"+ i + j +"%\'" + "/"
                }
            }
        }

        if(num == 3){
            for(i <- list){
                for( j <- list){
                    for(x <- list){
                        sql_partition = sql_partition + ID + " LIKE " + "\'"+ i + j + x +"%\'" + "/"
                    }
                }
            }
        }

        if(num == 4){
            for(i <- list){
                for(j <- list) {
                    for(x <- list){
                        for (y <- list ){
                            sql_partition = sql_partition + ID + " LIKE " + "\'"+ i + j + x + y +"%\'" + "/"
                            println(sql_partition)
                        }
                    }
                }
            }
        }


        sql_partition.split("/")
    }
单表数据分片
def loadDataWithTimePartion(tableName: String , predicates: Array[String] ):DataFrame = {
        val spark= SparkConfig._spark
        val properties: Properties = new Properties()
        properties.setProperty("user",SparkConfig.user)
        properties.setProperty("password",SparkConfig.pwd)
        properties.setProperty("driver","com.MysqL.cj.jdbc.Driver")
        properties.setProperty("isolationLevel","READ_UNCOMMITTED")
        var df:DataFrame = null
        df = spark.read.jdbc(SparkConfig.host,tableName,predicates,properties)
        df
    }

 

 

time hdfs dfs -get hdfs://mycluster/estee/test20210201/spark/BCONSUMER

相关文章

1.SparkStreaming是什么?SparkStreaming是SparkCore的扩展A...
本篇内容介绍了“Spark通讯录相似度计算怎么实现”的有关知识...
本篇文章给大家分享的是有关如何进行Spark数据分析,小编觉得...
本篇内容主要讲解“Spark Shuffle和Hadoop Shuffle有哪些区别...
这篇文章主要介绍“TSDB的数据怎么利用Hadoop/spark集群做数...
本篇内容介绍了“Hadoop与Spark性能原理是什么”的有关知识,...