问题描述
我想要做的是,我从不同的来源获取数据并进行分组并加入它们,然后我在 BatchStage 中得到结果,现在我必须在这个函数上执行 python 代码,但我不是能够做到。
这是我的以下代码:
BatchStage<List<Map<String,Object>>> bd = AggregateData.aggregate(data,Object);
bd1 = bd1.filter(k -> {
// some filters
});
// Now here after this I want to execute python code:
bd1.apply(mapUsingPython(new PythonServiceConfig()
.setBaseDir("D:/")
.setHandlerModule("take_sqrt")))
.setLocalParallelism(1)
.writeTo(Sinks.logger());
但它现在允许,因为我是新手,我没有得到确切的语法,请帮帮我
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)