问题描述
我想要做的是,我从不同的来源获取数据并进行分组并加入它们,然后我在 BatchStage 中得到结果,现在我必须在这个函数上执行 python 代码,但我不是能够做到。
这是我的以下代码:
BatchStage<List<Map<String,Object>>> bd = AggregateData.aggregate(data,Object);
bd1 = bd1.filter(k -> {
// some filters
});
// Now here after this I want to execute python code:
bd1.apply(mapUsingPython(new PythonServiceConfig()
.setBaseDir("D:/")
.setHandlerModule("take_sqrt")))
.setLocalParallelism(1)
.writeTo(Sinks.logger());
但它现在允许,因为我是新手,我没有得到确切的语法,请帮帮我
解决方法
::A_CONSTANT
将 mapUsingPython
转换为 BatchStage<String>
,因此您首先必须将 BatchStage<String>
显式转换为字符串表示。