如何在Spark Streaming中合并两个流?

问题描述

我正在尝试结合两个流媒体火花。主要流是由多个参数值组成的数据记录,另一个流是模型上下文。因此,在加入这两个流之后,我们可以使用模型上下文来计算新的单个值。请注意,在我的场景中,主流是连续数据记录,每秒钟来自源,而另一流是事件触发的流。这意味着只要用户通过网络单击,模型上下文就会进入流中。

我可以通过join方法来实现自己的方案。但是,我想做的是我想在火花流中的各个小批处理中保持模型上下文。 join方法有效,但是直到下一批开始执行。

我已经搜索了这个问题,发现mapWithState是我的案例的解决方案,我使用命令对其进行了编码,但似乎效果不佳。

假定所有数据都是从kafka主题接收的,如下所示。

下面是我的代码

JavaStreamingContext streamingContext = SparkConfig.initSparkConfig();

//two different topic
Map<String,Object> kafkaParams = KafkaConfig.createKafkaConfig();
        Collection<String> tracetopic = Arrays.asList("trace_test");
        Collection<String> modelTopic = Arrays.asList("model_test");


        //two different streams.
        JavaInputDStream<ConsumerRecord<String,String>> traceStream = KafkaUtils.createDirectStream(
                streamingContext,LocationStrategies.PreferConsistent(),ConsumerStrategies.<String,String>Subscribe(tracetopic,kafkaParams));

        JavaInputDStream<ConsumerRecord<String,String>> modelStream = KafkaUtils.createDirectStream(
                streamingContext,String>Subscribe(modelTopic,kafkaParams));


        //transformation
        JavaPairDStream<String,String> tracePairstream = traceStream.mapToPair(new ConsumerRecordToTupleString());
        JavaPairDStream<String,String> modelPairstream = modelStream.mapToPair(new ConsumerRecordToTupleString());


        //Applying mapWithState
        StateSpec<String,String,Tuple2<String,String>> stateSpec = StateSpec.function(modelUpdateFunc).timeout(Durations.seconds(3600));

      
        JavaMapWithStateDStream<String,String>> modelStateStream = modelPairstream.mapWithState(stateSpec);


        JavaPairDStream<String,String> modelPairWithStateStream = modelStateStream.mapToPair(new PairFunction<Tuple2<String,String>,String>() {
            @Override
            public Tuple2<String,String> call(Tuple2<String,String> stringStringTuple2) throws Exception {
                return new Tuple2<>(stringStringTuple2._1,stringStringTuple2._2);
            }
        });


        //joining two streams
        JavaPairDStream<String,String>> traceModelStream = modelPairWithStateStream.join(tracePairstream);
        traceModelStream.print();

但是,当我在模型流中打印上下文时,仅当我通过Kafka生成数据时才打印第一批的值。我希望它显示批次之间的模型上下文,并在控制台中为每个批次打印模型上下文,但这没用。

如果您能帮助我,我非常感谢:)

谢谢。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)