批量将标签数据写入hbase中


文章目录


引入pom文件依赖

    <dependency>
            <groupId>org.apache.hbasegroupId>
            <artifactId>hbase-serverartifactId>
            <version>1.1.1version>
        dependency>
        <dependency>
            <groupId>org.apache.hbasegroupId>
            <artifactId>hbaseartifactId>
            <version>1.1.1version>
        dependency>

        <dependency>
            <groupId>org.apache.hbasegroupId>
            <artifactId>hbase-commonartifactId>
            <version>1.1.1version>
        dependency>

        <dependency>
            <groupId>org.apache.hbasegroupId>
            <artifactId>hbase-clientartifactId>
            <version>1.1.1version>
        dependency>

标签按日输出到hbase表中

package com.dmp.tags

import com.dmp.utils.TagsUtils
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.sqlContext
import org.apache.spark.{SparkConf, SparkContext}

/*
C:\Users\admin\Desktop\result1
C:\Users\admin\Desktop\data\app_mapping.txt
C:\Users\admin\Desktop\data\stops.txt
C:\Users\admin\Desktop\data\resultTags1
 */
object Tags4Ctx {
  def main(args: Array[String]): Unit = {
    // 0 校验参数个数
    if (args.length != 4) {
      println(
        """
          |com.dmp.tags.Tags4Ctx
          |参数:
          | logInputPath
          | dictionaryPath
          | stopwordpath
          | resultOutputPath
          | day
        """.stripMargin)
      sys.exit()
    }

    // 1 接受程序参数
    val Array(logInputPath, dictionaryPath, stopwordpath, resultOutputPath,day) = args

    // 2 创建sparkconf->sparkContext
    val sparkConf = new SparkConf()
    sparkConf.setAppName(s"${this.getClass.getSimpleName}")
    sparkConf.setMaster("local[*]")
    // RDD 序列化到磁盘 worker与worker之间的数据传输
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(sparkConf)

    val sqlContext = new sqlContext(sc)
    //字典文件 appMapping
    val dicMap = sc.textFile(dictionaryPath)
      .map(line => {
        val fields = line.split("\t", -1)
        (fields(4), fields(5))
      }).collect().toMap

    // 停用词
    val stopWordsMap = sc.textFile(stopwordpath)
      .map((_, 0))
      .collect().toMap
    //广播出去
    val broadcastAppDict = sc.broadcast(dicMap)
    val broadcastStopWordsDict = sc.broadcast(stopWordsMap)
    //判断hbase的表是否存在.不存在则创建
    val configuration = sc.hadoopConfiguration
    val load = ConfigFactory.load()
    val hbaseTabName = load.getString("hbase.table.name")
    configuration.set("hbase.zookeeper.quorum", load.getString("hbase.zookeeper.host"))
    val hbConn = ConnectionFactory.createConnection(configuration)
    val hbAdmin = hbConn.getAdmin

    if (!hbAdmin.tableExists(TableName.valueOf(hbaseTabName))){
      println(s"$hbaseTabName is not exists")
      println(s"$hbaseTabName is creating")

      val tabNameDescriptor = new  HTableDescriptor(TableName.valueOf(hbaseTabName))
      //创建列族
      val columnDescriptor = new HColumnDescriptor("cf")
      tabNameDescriptor.addFamily(columnDescriptor)
      //创建表
      hbAdmin.createTable(tabNameDescriptor)
      //释放资源
      hbAdmin.close()
      hbConn.close()
    }

    //类似于mr 指定key的输出类型

    val jobConf = new JobConf(configuration)
    jobConf.setoutputFormat(classOf[TableOutputFormat])
    //指定表名称
    jobConf.set(TableOutputFormat.OUTPUT_TABLE,hbaseTabName)


    //读取日志的parquet文件
    sqlContext.read.parquet(logInputPath)
      .where(TagsUtils.hasSomeUserIdCondition) //过滤
      .map(row => {
      //行数据进行标签化处理
      //广告
      val ads = Tags4Ads.makeTags(row)
      //媒介
      val apps = TagsApp.makeTags(row, broadcastAppDict.value)
      //设备
      val devices = Tags4Devices.makeTags(row)
      //停用词
      val keywords = Tags4KeyWords.makeTags(row, broadcastStopWordsDict.value)

      val allUserId = TagsUtils.getAllUserId(row)
      (allUserId(0), (ads ++ apps ++ devices).toList)
    }).reduceByKey((a, b) => {
      // List(("K电视剧",1),("K电视剧",1))  => groupBy => Map["K电视剧",List(....)]
      //foldLeft(0)(_+_._2) 表示前一个加上一个值
      //第一种写法
      //      (a ++ b).groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2)).toList
      (a ++ b).groupBy(_._1).map {
        //使用偏函数
        case (k, smaTags) => (k, smaTags.map(_._2).sum)
      }.toList
    })
      //      .saveAsTextFile(resultOutputPath)
      .map {
      case (userId, userTags) => {
        //以用户id为rowkey
        val put = new Put(Bytes.toBytes(userId))
        //list里面是元组,key是标签值 value是 个数   map转换成List(String) 类型  然后mkString转换成字符串类型,
        //按,分割
        val tags = userTags.map(t => t._1 + ":" + t._2).mkString(",")
        //列族 cf 列day + 日期 值 标签
        put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes(s"day$day"),Bytes.toBytes(tags))
        //转换成hadoop的输出类型
        (new ImmutableBytesWritable(),put)
      }
    }.saveAsHadoopDataset(jobConf)
    sc.stop()
  }
}

               

相关文章

超详细的记录了HBase 集群搭建的整个过程,以及搭建过程出现...
头歌 HBase(相关的五个实验)
1.创建一个学生信息表,用来存储学生的姓名(姓名作为行键,...
大数据课程综合实验案例1 案例简介1.1 案例目的1.2 适用对象...
HBase从浅入深,(初级)什么是HBase,模型,NOSQL,架构,n...
Hadoop之Hbase安装和配置