集成DataStreamAPI和TableAPI

问题描述

除了这个question之外,我还创建了这个示例来集成DataStreamAPITableAPI,这次我没有错误我有两个工作,而不是一个一个是为运行完美的DataStreamAPI创建的,另一个工作也为运行完美的TableAPI创建的,但是唯一的问题是永远不会从{{1}获得任何值},例如:

DataStreamAPI

这样做,我可以在记录器中看到以下行:

        /*FILTERING NULL IDs*/
        final SingleOutputStreamOperator<Event> stream_filtered = eventsstream
                .filter(new NullidEventsFilterFunction())
                .uid("id_filter_operator")
                .name("Event Filter");

        final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(TableEnvironmentConfiguration.getEnv(),fsSettings);
        SingleOutputStreamOperator<String> toTable = stream_filtered.map(x -> x.id).name("Map for table");
        Table source = fsTableEnv.fromDataStream(toTable);
        source.execute(); /*without this line the TableAPI job is not started,but nothing happens if is not there either*/
        DataStream<String> finalRes = fsTableEnv.toAppendStream(source,String.class);
        finalRes.map((MapFunction<String,String>) value -> value)
                .name("Mapping after table")
                .addSink(new SinkFunction<String>() {
            @Override
            public void invoke(String value) {
                LOG.info("Record from table: " + value);
            }
        }).name("Sink after map from table");
        
        /*STARTING TRANSFORMATIONS*/
        Init.init(stream_filtered);
        env.execute(job_name);

但没有记录被接收或发送。

查看INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Event Mapper -> Watermarks Added -> Event Filter -> Map for table -> SourceConversion(table=[Unregistered_DataStream_5],fields=[f0]) -> SinkConversionToRow -> Sink: Select table sink (1/1) (0d3cd78d35480c44f09603786bf775e7) switched from deploying to RUNNING. 作业的图像

enter image description here

,然后查看DataStream作业的图像

enter image description here

有什么主意吗? 提前致谢。 亲切的问候!

解决方法

如果您要编写一个以DataStream API开头和结尾并在中间使用Table API的作业,那么这是您可以建立的一个简单示例。

请注意,所涉及的详细信息在各个发行版之间有所更改,并且此特定示例按Flink 1.11编写。 FLIP-136: Improve interoperability between DataStream and Table API正在努力使这一过程变得更加容易。

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

public class BackAndForth {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        SingleOutputStreamOperator<Tuple2<String,Long>> rawInput = env.fromElements(
                new Tuple2<>("u2",0L),new Tuple2<>("u1",5L),new Tuple2<>("u2",1L),new Tuple2<>("u3",3L),2L));

        Table events = tableEnv.fromDataStream(rawInput,$("userId"),$("value"));

        Table results = events
                .select($("userId"),$("value"))
                .where($("value").isGreater(0));

        tableEnv
                .toAppendStream(results,Row.class)
                .print();

        env.execute();
    }
}

您可能会担心在Web UI中显示“已发送记录:0”和“已接收记录:0”。这是非常误导的。这些Flink度量标准仅测量Flink中的记录和字节流,并且不报告与外部系统的任何I / O。这些度量标准也不报告链接在一起的运算符之间的记录和字节。这两个作业中的所有内容都是链接在一起的,因此在这种情况下,发送/接收的记录/字节将始终为零。