重新启动flink作业/作业正在执行故障转移时,应如何优雅地释放所有资源?

问题描述

我们有一个flink作业,其中包含一些IO资源,例如tcp连接等。我们始终在连接函数周围插入一个finally块来关闭连接,并在所有运算符的close()方法中释放该连接。但是我们发现,由于我们在服务器端发现了很多CLOSE_WAIT状态,因此当故障切换​​不时发生时,连接不会被释放。我们猜测flink可能会使用某种中断方法来重新启动作业,以使代码不会进入finally块。当flink执行故障转移时,释放资源的正确方法是什么?

解决方法

为了能够访问函数的flink生命周期,您应该在用户定义的函数内实现逻辑,并实现RichFunction接口。此类定义了函数生命周期的方法,以及访问执行函数上下文的方法。

在其他接口中,此接口公开了close()方法,该方法应用于清理工作:


    /**
     * Tear-down method for the user code. It is called after the last call to the main working methods
     * (e.g. <i>map</i> or <i>join</i>). For functions that  are part of an iteration,this method will
     * be invoked after each iteration superstep.
     *
     * <p>This method can be used for clean up work.
     *
     * @throws Exception Implementations may forward exceptions,which are caught by the runtime. When the
     *                   runtime catches an exception,it aborts the task and lets the fail-over logic
     *                   decide whether to retry the task execution.
     */
    void close() throws Exception;

因此,我相信close()函数是适当释放资源的适当位置。