Flink Table-API 和 DataStream ProcessFunction

问题描述

我想加入一个大表,不可能包含在TM内存和一个流(kakfa)中。我成功地加入了我的测试,将 table-api 与 datastream api 混合。我做了以下事情:

val stream: DataStream[MyEvent] = env.addSource(...)
stream
   .timeWindowAll(...)
   .trigger(...)
   .process(new ProcessAllWindowFunction[MyEvent,MyEvent,TimeWindow] {
        
        var tableEnv: StreamTableEnvironment = _
        
        override def open(parameters: Configuration): Unit = {
          //init table env
        }

        override def process(context: Context,elements: Iterable[MyEvent],out: Collector[MyEvent]): Unit = {
          val table = tableEnv.sqlQuery(...)
          elements.map(e => {
            //do process
            out.collect(...)
          })
        }
      })

它正在工作,但我从未在任何地方看到过这种类型的实现。可以吗?有什么缺点?

解决方法

不应在 Flink 函数中使用 StreamExecutionEnvironmentTableEnvironment。一个环境用于构建一个提交给集群的管道。

您的示例向集群作业中的集群提交了一个作业。

这可能适用于某些用例,但通常不鼓励这样做。想象一下,您的外部流包含数千个事件,您的函数将为每个事件创建一个作业,它可能会对您的集群进行 DDoS。