问题描述
我正在尝试获取Spark结构化流媒体中的Parentgroup,childgroup和MountingType组的唯一ID。
.withWatermark("timestamp","1 minutes")
val aggDF = JSONDF.groupBy("Parentgroup","childgroup","MountingType")
.agg(countdistinct("id"))
Error:
Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark
请有人帮我如何在结构化流中进行聚合并写入csv。 非常感谢
数据:
{"id":"7CE3A7CA","Faulttime":1544362500,"name":"Sony","Parentgroup":"TV","childgroup":"Other","MountingType":"SurfaceMount"}
{"id":"7CE3A7CA","Faulttime":1544362509,"MountingType":"SurfaceMount"}
{"id":"010004FF,"Faulttime":1551339188,"name":"Philips","Parentgroup":"Light","MountingType":"Solder"}
{"id":"010004FF","MountingType":"Solder"}
{"id":"010004FF,"Faulttime":1551339191,"name":"Sansui","Parentgroup":"AC","MountingType":"SurfaceMount"}
{"id":"CE361405","Faulttime":1552159061,"name":"Hyndai","Parentgroup":"SBAR","name":"sony","MountingType":"SurfaceMount"}
{"id":"7BE446C0","Faulttime":1553022095,"MountingType":"Solder"}
{"id":"7BE446C0","Parentgroup":"LIGHT","MountingType":"Solder"}
解决方法
“分组依据”操作需要在Spark流中指定窗口或时间段。
尝试一下
psuedo code
val JSONDF = df.withWatermark("timestamp","1 minutes")
val aggDF = JSONDF.groupBy(window("timestamp","5 minutes","1 minutes")).agg(countDistinct("id"),$"Parentgroup",$"childgroup",$"MountingType")