如何并行运行TableAPI和DataStreamAPI以使用Python定义的功能

问题描述

我在DataStream API上有一项工作,它运行良好,但是我需要使用计算所得的DataStream<Event>并将其传递给TableAPI来调用注册python函数,然后将结果传递回新的DataStream以重新处理该调用的结果。这里有两个问题,一个是我可以像这样工作:

/*DataStream Job*/
StreamExecutionEnvironment env = EnvironmentConfiguration.getEnv();
final DataStream<Event> eventsstream = RabbitMQConnector.eventStreamObject(env)
                .flatMap(new RabbitMQConsumer())
                .uid("cep.objects_mapper_id")
                .name("Event Mapper")
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((event,timestamp) -> event.timestamp.getTime()))
                .name("Watermarks Added");

/*TableAPI job*/
final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(TableEnvironmentConfiguration.getEnv(),fsSettings);
                    fsTableEnv.getConfig().getConfiguration().setString("python.files","test.py");
                    fsTableEnv.getConfig().getConfiguration().setString("python.client.executable","python.exe");
                    fsTableEnv.getConfig().getConfiguration().setString("python.executable","python.exe");
                    fsTableEnv.executesql("CREATE TEMPORARY SYstem FUNCTION func1 AS 'test.func1' LANGUAGE PYTHON");

SingleOutputStreamOperator<String> stream = eventsstream.map(x -> x.name);

Table source = fsTableEnv.fromDataStream(stream).as("name");
Table result = source.select("func1(name)");

DataStream<String> finalRes = fsTableEnv.toAppendStream(result,String.class);
finalRes.addSink(new SinkFunction<String>() {
                        @Override
                        public void invoke(String value) {
                            LOG.info("Record from table: " + value);
                        }
                    });

env.execute(job_name);

在这个示例中,我一点问题都没有,但是python函数从不返回,我担心除非我执行result.exeute();,否则它将永远不会被调用,那么当我从上面应用相同的示例时和之后

finalRes.addSink(new SinkFunction<String>() {
                        @Override
                        public void invoke(String value) {
                            LOG.info("Record from table: " + value);
                        }
                    });

执行result.execute();执行表,python函数起作用,但是直到TableAPI完成后才执行DataStreamAPI作业,但是由于从未初始化DataStreamAPI作业,因此使用者无法使用因此,应先发送到TableAPI然后再发送到python函数的流始终为空。

我的问题是:有什么办法可以并行运行两个作业,还是一个一个地运行? 注意:我将创建一个TimerTask,以便在DataStreamAPI作业启动后等待一段时间,然后启动TableAPI作业(并行度为1),它似乎可以正常工作,但是TableAPI作业已创建并停止了很多次。

还有更好的方法吗? 希望有人理解我的问题。

谢谢!

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)