在数据集的元素中执行“非转换”操作的最佳方法是什么?

问题描述

新出现在Spark中,我正在寻找一种使用Spark 结构化在数据集的所有元素中执行动作的方法

我知道这是一个特定用途的情况,我想要遍历数据集的所有元素,对其进行操作,然后继续使用数据集。

示例:

我有val df = Dataset[Person],我希望能够做类似的事情:

def execute(df: Dataset[Person]): Dataset[Person] = {
 df.foreach((p: Person) => {
   someHttpClient.doRequest(httpPostRequest(p.asstring)) // this is pseudo code / not compiling
 })
 df
}

不幸的是,foreach不适用于结构化流,因为出现错误“必须使用writeStream.start执行流源查询

我尝试使用map(),但随后出现错误“任务无法序列化” ,我认为是因为http请求或http客户端无法序列化。

我知道Spark主要用于过滤和转换,但是有没有一种方法可以很好地处理此特定用例?

谢谢:)

解决方法

val conf = new SparkConf().setMaster(“local[*]").setAppName(“Example")
val jssc = new JavaStreamingContext(conf,Durations.seconds(1)) // second option tell about The time interval at which streaming data will be divided into batches

在确定解决方案是否存在之前 让我们少问几个问题

Spark Streaming如何工作? Spark Streaming接收来自输入源的实时输入数据流,并将数据分成批处理,然后由Spark引擎处理,并将最终批处理结果下推至下游应用程序

批处理如何开始? Spark对Dstream上应用的所有转换都进行了懒惰的评估。它将对操作(即,仅当您开始流式传输上下文时)应用转换。

   jssc.start();              // Start the computation
   jssc.awaitTermination();   // Wait for the computation to terminate. 

注意:每批Dstream都包含多个分区(就像运行spark-batch作业的顺序,直到输入源停止生成数据为止)

因此您可以具有如下的自定义逻辑。

dStream.foreachRDD(new VoidFunction[JavaRDD[Object]] {
  override def call(t: JavaRDD[Object]): Unit = {
    t.foreach(new VoidFunction[Object] {
      override def call(t: Object): Unit = {
        //pseudo code someHttpClient.doRequest(httpPostRequest(t.asString))
      }
    })
  }
})

但再次确保您的someHttpClient可序列化

您可以创建该对象,如下所述。

dStream.foreachRDD(new VoidFunction[JavaRDD[Object]] {
      override def call(t: JavaRDD[Object]): Unit = {
        // create someHttpClient object
        t.foreach(new VoidFunction[Object] {
          override def call(t: Object): Unit = {
            //pseudo code someHttpClient.doRequest(httpPostRequest(t.asString))
          }
        })
      }
    })

与Spark结构化流有关

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql._;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQuery
import java.util.Arrays;
import java.util.Iterator;

val spark = SparkSession
  .builder()
  .appName("example")
  .getOrCreate();

val lines = spark.readStream.format("socket").option("host","localhost").option("port",9999).load(); // this is example source load copied from spark-streaming doc

lines.foreach(new ForeachFunction[Row] {
  override def call(t: Row): Unit = {
    //someHttpClient.doRequest(httpPostRequest(p.asString))
       OR
    // create someHttpClient object here and use it to tackle serialization errors
  }
})

// Start running the query foreach and do mention downstream sink below/
val query = lines.writeStream.start
query.awaitTermination()