问题描述
新出现在Spark中,我正在寻找一种使用Spark 结构化流在数据集的所有元素中执行动作的方法:
我知道这是一个特定用途的情况,我想要遍历数据集的所有元素,对其进行操作,然后继续使用数据集。
示例:
我有val df = Dataset[Person]
,我希望能够做类似的事情:
def execute(df: Dataset[Person]): Dataset[Person] = {
df.foreach((p: Person) => {
someHttpClient.doRequest(httpPostRequest(p.asstring)) // this is pseudo code / not compiling
})
df
}
不幸的是,foreach
不适用于结构化流,因为出现错误“必须使用writeStream.start执行流源查询”
我尝试使用map()
,但随后出现错误“任务无法序列化” ,我认为是因为http请求或http客户端无法序列化。
我知道Spark主要用于过滤和转换,但是有没有一种方法可以很好地处理此特定用例?
谢谢:)
解决方法
val conf = new SparkConf().setMaster(“local[*]").setAppName(“Example")
val jssc = new JavaStreamingContext(conf,Durations.seconds(1)) // second option tell about The time interval at which streaming data will be divided into batches
在确定解决方案是否存在之前 让我们少问几个问题
Spark Streaming如何工作? Spark Streaming接收来自输入源的实时输入数据流,并将数据分成批处理,然后由Spark引擎处理,并将最终批处理结果下推至下游应用程序
批处理如何开始? Spark对Dstream上应用的所有转换都进行了懒惰的评估。它将对操作(即,仅当您开始流式传输上下文时)应用转换。
jssc.start(); // Start the computation
jssc.awaitTermination(); // Wait for the computation to terminate.
注意:每批Dstream都包含多个分区(就像运行spark-batch作业的顺序,直到输入源停止生成数据为止)
因此您可以具有如下的自定义逻辑。
dStream.foreachRDD(new VoidFunction[JavaRDD[Object]] {
override def call(t: JavaRDD[Object]): Unit = {
t.foreach(new VoidFunction[Object] {
override def call(t: Object): Unit = {
//pseudo code someHttpClient.doRequest(httpPostRequest(t.asString))
}
})
}
})
但再次确保您的someHttpClient可序列化或
您可以创建该对象,如下所述。
dStream.foreachRDD(new VoidFunction[JavaRDD[Object]] {
override def call(t: JavaRDD[Object]): Unit = {
// create someHttpClient object
t.foreach(new VoidFunction[Object] {
override def call(t: Object): Unit = {
//pseudo code someHttpClient.doRequest(httpPostRequest(t.asString))
}
})
}
})
与Spark结构化流有关
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql._;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQuery
import java.util.Arrays;
import java.util.Iterator;
val spark = SparkSession
.builder()
.appName("example")
.getOrCreate();
val lines = spark.readStream.format("socket").option("host","localhost").option("port",9999).load(); // this is example source load copied from spark-streaming doc
lines.foreach(new ForeachFunction[Row] {
override def call(t: Row): Unit = {
//someHttpClient.doRequest(httpPostRequest(p.asString))
OR
// create someHttpClient object here and use it to tackle serialization errors
}
})
// Start running the query foreach and do mention downstream sink below/
val query = lines.writeStream.start
query.awaitTermination()