问题描述
我需要将查询指标写入kafak接收器或hdfs文件。
我尝试在控制台上打印QueryProgressEvent.progress()及其工作原理,但是我需要将其写入某些磁盘以供以后分析。有人可以共享代码或适当的方法来编写相同的代码。以下是我向控制台的代码编写进度:-
public class kafka_stream {
public static void main(String[] args) {
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example").config("spark.master","local[*]").config("spark.streaming.stopGracefullyOnShutdown","true").getOrCreate();
spark.streams().addListener(new StreamingQueryListener() {
@Override
public void onQueryStarted(QueryStartedEvent queryStarted) {
System.out.println("Query started: " + queryStarted.id());
}
@Override
public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
System.out.println("Query terminated: " + queryTerminated.id());
}
@Override
public void onQueryProgress(QueryProgressEvent queryProgress) {
System.out.println("Query made progress: " + queryProgress.progress());
queryProgress.progress().write();
}
});
Dataset<Row> df = spark.readStream().format("kafka").option("kafka.bootstrap.servers","10.5.205.213:9092").option("subscribe","telemedia-clickstream").option("failOnDataLoss","false").option("checkpointLocation","/prod/data/telemedia/chckpoint").load().selectExpr("CAST(value AS STRING)","topic","partition","offset","timestamp","timestampType" );
StreamingQuery query = df.writeStream().outputMode("append").format("parquet").trigger(Trigger.ProcessingTime("30 seconds")).option("path","/prod/data/telemedia").option("checkpointLocation","/prod/data/telemedia/chckpoint").start();
try{ query.awaitTermination(); } catch(StreamingQueryException e) {System.exit(-1);}
}
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)