将相同国家进行分组,然后将count相加sum(count), 对sum(count)进行排序,输出top5
val path="/Volumes/Data/BigData_code/data/flight-data/csv/2015-summary.csv"
val data = spark.read.option("inferSchema", "true").option("header", "true").csv(path)
//查询前5个count max 的国家
data.groupBy("DEST_COUNTRY_NAME").sum("count")
.withColumnRenamed("sum(count)", "destination_total")
.sort(desc("destination_total"))
.limit(5).show()
代码的执行如图:
查看用户在一天内进行采集所用费用最多的日期:
下面是表格的格式:
//添加一个列用于统计总费用,并查看用户话费最多的是哪个日期
val selectData = staticData.selectExpr("CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate")
selectData.show()
//进行分组:分组的标准:客户的ID 和 购买的时间(一天内为相同标准), 并对分组的内容进行统计
val groupData = selectData.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day")
).sum("total_cost")
groupData.show(5)
window函数:https://blog.csdn.net/weixin_38653290/article/details/83962789
使用流处理实现相同功能
//进行分组:分组的标准:客户的ID 和 购买的时间(一天内为相同标准), 并对分组的内容进行统计
val streamData = spark.readStream.schema(staticSchema) //设置分区
.option("maxFilesPerTrigger", 1) //设置一次读入的文件个数
.format("csv")
.option("header", "true")
.load(path)
//执行相同的逻辑操作
val streamGroupData = streamData.selectExpr(
"CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate"
).groupBy(
$"CustomerId", window($"InvoiceDate", "1 day")
).sum("total_cost")
注意由于流处理和静态处理不一样,所以无法使用静态处理中的动作操作。流处理是将流处理的结果放入内存的一个表中。每一次处理完,不断的更新这个表即可
//将结果存入内存中
streamGroupData.writeStream.format("memory") //表示存入内存中
.queryName("streamGroupData") //表示存入内存的表的名字
.outputMode("complete") //complete表示表中所有记录
.start()
然后查询
//对流处理后的结果进行查询
spark.sql(
"""
|select *
|from streamGroupData
|order by 'sum(total_cost)' desc
|""".stripMargin).show(5)