问题描述
我们有一个flink作业,其中包含一些IO资源,例如tcp连接等。我们始终在连接函数周围插入一个finally块来关闭连接,并在所有运算符的close()方法中释放该连接。但是我们发现,由于我们在服务器端发现了很多CLOSE_WAIT状态,因此当故障切换不时发生时,连接不会被释放。我们猜测flink可能会使用某种中断方法来重新启动作业,以使代码不会进入finally块。当flink执行故障转移时,释放资源的正确方法是什么?
解决方法
为了能够访问函数的flink生命周期,您应该在用户定义的函数内实现逻辑,并实现RichFunction
接口。此类定义了函数生命周期的方法,以及访问执行函数上下文的方法。
在其他接口中,此接口公开了close()
方法,该方法应用于清理工作:
/**
* Tear-down method for the user code. It is called after the last call to the main working methods
* (e.g. <i>map</i> or <i>join</i>). For functions that are part of an iteration,this method will
* be invoked after each iteration superstep.
*
* <p>This method can be used for clean up work.
*
* @throws Exception Implementations may forward exceptions,which are caught by the runtime. When the
* runtime catches an exception,it aborts the task and lets the fail-over logic
* decide whether to retry the task execution.
*/
void close() throws Exception;
因此,我相信close()
函数是适当释放资源的适当位置。