合并到增量表中,不适用于Java foreachbatch

问题描述

我已经创建了一个增量表,现在我正尝试使用foreachBatch()将数据插入到该表中。我已遵循此example。唯一的区别是我使用Java而不是在笔记本中使用Java,但我认为那应该没有什么区别?

我的代码如下:

spark.sql("CREATE TABLE IF NOT EXISTS example_src_table(id int,load_date timestamp) USING DELTA LOCATION '/mnt/delta/events/example_src_table'");

Dataset<Row> exampleDF = spark.sql("SELECT e.id as id,e.load_date as load_date FROM example e");

        try {
            exampleDF
                    .writeStream()
                    .format("delta")
                    .foreachBatch((dataset,batchId) -> {
                        dataset.persist();
                        // Set the dataframe to view name
                        dataset.createOrReplaceTempView("updates");
                        // Use the view name to apply MERGE
                        // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
                        dataset.sparkSession().sql("MERGE INTO example_src_table e" +
                                " USING updates u" +
                                " ON e.id = u.id" +
                                " WHEN NOT MATCHED THEN INSERT (e.id,e.load_date) VALUES (u.id,u.load_date)");
                    })
                    .outputMode("update")
                    .option("checkpointLocation","/mnt/delta/events/_checkpoints/example_src_table")
                    .start();
        } catch (TimeoutException e) {
            e.printstacktrace();
        }

代码运行时没有任何问题,但是没有将数据写入带有url'/ mnt / delta / events / example_src_table'的delta表中。有人知道我在做什么错吗?

我正在使用Spark 3.0和Java 8。

编辑

使用Scala在Databricks Notebook上进行了测试,然后效果很好。

解决方法

如果要使用新数据更新数据,请尝试遵循以下语法:

WHEN NOT MATCHED THEN 
    UPDATE SET e.load_date = u.load_date AND  e.id = u.id
    

如果您只想添加数据,则它将占用类似的内容

WHEN NOT MATCHED THEN INSERT *