问题描述
我正在尝试结合两个流媒体火花。主要流是由多个参数值组成的数据记录,另一个流是模型上下文。因此,在加入这两个流之后,我们可以使用模型上下文来计算新的单个值。请注意,在我的场景中,主流是连续数据记录,每秒钟来自源,而另一流是事件触发的流。这意味着只要用户通过网络单击,模型上下文就会进入流中。
我可以通过join
方法来实现自己的方案。但是,我想做的是我想在火花流中的各个小批处理中保持模型上下文。 join方法有效,但是直到下一批开始执行。
我已经搜索了这个问题,发现mapWithState
是我的案例的解决方案,我使用命令对其进行了编码,但似乎效果不佳。
假定所有数据都是从kafka主题接收的,如下所示。
下面是我的代码。
JavaStreamingContext streamingContext = SparkConfig.initSparkConfig();
//two different topic
Map<String,Object> kafkaParams = KafkaConfig.createKafkaConfig();
Collection<String> tracetopic = Arrays.asList("trace_test");
Collection<String> modelTopic = Arrays.asList("model_test");
//two different streams.
JavaInputDStream<ConsumerRecord<String,String>> traceStream = KafkaUtils.createDirectStream(
streamingContext,LocationStrategies.PreferConsistent(),ConsumerStrategies.<String,String>Subscribe(tracetopic,kafkaParams));
JavaInputDStream<ConsumerRecord<String,String>> modelStream = KafkaUtils.createDirectStream(
streamingContext,String>Subscribe(modelTopic,kafkaParams));
//transformation
JavaPairDStream<String,String> tracePairstream = traceStream.mapToPair(new ConsumerRecordToTupleString());
JavaPairDStream<String,String> modelPairstream = modelStream.mapToPair(new ConsumerRecordToTupleString());
//Applying mapWithState
StateSpec<String,String,Tuple2<String,String>> stateSpec = StateSpec.function(modelUpdateFunc).timeout(Durations.seconds(3600));
JavaMapWithStateDStream<String,String>> modelStateStream = modelPairstream.mapWithState(stateSpec);
JavaPairDStream<String,String> modelPairWithStateStream = modelStateStream.mapToPair(new PairFunction<Tuple2<String,String>,String>() {
@Override
public Tuple2<String,String> call(Tuple2<String,String> stringStringTuple2) throws Exception {
return new Tuple2<>(stringStringTuple2._1,stringStringTuple2._2);
}
});
//joining two streams
JavaPairDStream<String,String>> traceModelStream = modelPairWithStateStream.join(tracePairstream);
traceModelStream.print();
但是,当我在模型流中打印上下文时,仅当我通过Kafka生成数据时才打印第一批的值。我希望它显示批次之间的模型上下文,并在控制台中为每个批次打印模型上下文,但这没用。
如果您能帮助我,我非常感谢:)
谢谢。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)