使用 Spark Structured Streaming Java 附加文件

问题描述

我正在尝试使用 spark 附加流文件夹中的所有文件内容,但每次触发微批处理时它都会创建很多部分文件。以下是我的代码

SparkSession session = SparkSession.builder().appName("SparkJava").getorCreate();
        JavaSparkContext sparkContext = new JavaSparkContext(session.sparkContext());

    StructType personSchema = new StructType().add("firstName","string").add("lastName","string").add("age","long");

//3 - 创建一个表示输入文件流的数据集

Dataset<Patient> personStream = session.readStream().schema(personSchema).json("file:///C:/jsons1")
                .as(Encoders.bean(Patient.class));

//当数据从流中到达时,将执行这些步骤

//4 - 创建一个临时表,以便我们可以使用 sql 查询

    personStream.createOrReplaceTempView("people");

    String sql = "SELECT * FROM people";
    Dataset<Row> ageAverage = session.sql(sql);



    StreamingQuery query = ageAverage.coalesce(1).writeStream().outputMode(OutputMode.Append())
            .option("path","file:///C:/output").format("json").trigger(Trigger.ProcessingTime("10 seconds")).option("checkpointLocation","file:///C:/output")
            .partitionBy("age").start();

请提出一种将源文件夹中的所有文件内容合并到输出文件夹中的一个文件方法

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)