Apache Flink无需重新启动即可动态更新sql

问题描述

我对Flink的行为有疑问。以下是我的代码段。如您所见,某些服务正在提供Flink将要逐一执行的sql标准列表(例如大约10k sql)。 我的问题是,每当sql更新时,如何指示flink与新sql一起使用?我看到的一种方法是停止并启动flink服务,这是我想要避免的,因为其他sql条件需要一直运行,并且只有正在更新的条件才需要停止/启动/或动态更新。另外,我不想将10k sqls提交为10k不同的工作。因此,我要寻找的行为是否可以在Flink 1.11版中实现?

env is StreamExecutionEnvironment... 

Psudo-code:

List<String> allConditionssqls = get_sql_FROM_some_Service();
for(String sql : allConditionssqls)
{
    Table table = env.sqlQuery(sql);
    env.toRetractStream(table,Row.class)
     .process(new ProcessFunction <Tuple2<Boolean,Row>,Object>() {
         @Override
         public void processElement(Tuple2<Boolean,Row> value,Context ctx,Collector<Object> out) throws Exception {
             Row ev = value.f1;
             log.info(ev);
             // more code here
         }    
     });
}

解决方法

否,唯一的方法是将每个查询作为单独的作业运行。 (就其价值而言,有些人每天动态地生成10000个Flink作业-可以做到。)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...