问题描述
我在DataStream API上有一项工作,它运行良好,但是我需要使用计算所得的DataStream<Event>
并将其传递给TableAPI来调用注册python函数,然后将结果传递回新的DataStream
以重新处理该调用的结果。这里有两个问题,一个是我可以像这样工作:
/*DataStream Job*/
StreamExecutionEnvironment env = EnvironmentConfiguration.getEnv();
final DataStream<Event> eventsstream = RabbitMQConnector.eventStreamObject(env)
.flatMap(new RabbitMQConsumer())
.uid("cep.objects_mapper_id")
.name("Event Mapper")
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event,timestamp) -> event.timestamp.getTime()))
.name("Watermarks Added");
/*TableAPI job*/
final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(TableEnvironmentConfiguration.getEnv(),fsSettings);
fsTableEnv.getConfig().getConfiguration().setString("python.files","test.py");
fsTableEnv.getConfig().getConfiguration().setString("python.client.executable","python.exe");
fsTableEnv.getConfig().getConfiguration().setString("python.executable","python.exe");
fsTableEnv.executesql("CREATE TEMPORARY SYstem FUNCTION func1 AS 'test.func1' LANGUAGE PYTHON");
SingleOutputStreamOperator<String> stream = eventsstream.map(x -> x.name);
Table source = fsTableEnv.fromDataStream(stream).as("name");
Table result = source.select("func1(name)");
DataStream<String> finalRes = fsTableEnv.toAppendStream(result,String.class);
finalRes.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value) {
LOG.info("Record from table: " + value);
}
});
env.execute(job_name);
在这个示例中,我一点问题都没有,但是python函数从不返回,我担心除非我执行result.exeute();
,否则它将永远不会被调用,那么当我从上面应用相同的示例时和之后
finalRes.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value) {
LOG.info("Record from table: " + value);
}
});
执行result.execute();
执行表,python函数起作用,但是直到TableAPI完成后才执行DataStreamAPI作业,但是由于从未初始化DataStreamAPI作业,因此使用者无法使用因此,应先发送到TableAPI然后再发送到python函数的流始终为空。
我的问题是:有什么办法可以并行运行两个作业,还是一个接一个地运行? 注意:我将创建一个TimerTask,以便在DataStreamAPI作业启动后等待一段时间,然后启动TableAPI作业(并行度为1),它似乎可以正常工作,但是TableAPI作业已创建并停止了很多次。
还有更好的方法吗? 希望有人理解我的问题。
谢谢!
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)