问题描述
我有一个参数p
,它是只读的,就像机器学习模型一样。并假设我可以使用分布式缓存在每个任务管理器上缓存文件,以便每个任务都可以在本地加载它。
如果map(new MyMapFunction(p))
,p
将被序列化并反序列化给每个运算符,并且如果被缓存和加载,则每个任务都将加载p
的实例。假设我有4个任务管理器,每个8个插槽,我们可以flink run -p 32
使用所有资源,而p
将有32个实例。
理论上,它可以由p
完成,我想只有4个实例,并且每个线程可以在同一任务管理器中使用该实例吗?它可以在Flink中使用吗?
解决方法
使用静态变量在不同实例之间共享繁重的只读数据结构是完全可行的。确保使用某种锁来避免重新初始化并确保可见性。
class MyMap extends RichMapFunction {
private static Model model;
public void open() {
if (model == null) {
synchronized (MyMap.class) {
if (model == null) {
model = // read model ...
}
}
}
}
}
,
我使用以下方法初始化每个TM的通用结构:
class EventProcess extends ProcessFunction[Event,Event] {
...
override def open(parameters: Configuration): Unit = {
super.open(parameters)
EventProcess.init()
}
...
}
object EventProcess {
val lock = "1"
var data: Any = _
def init(config: Config): Unit = {
lock.synchronized {
if (data == null) {
// do init
}
}
}
}
在您的情况下,如果您需要从RuntimeCOntext
内部的open()
获取信息并以此初始化您的对象var
,则可以在open()
内部使用同步:>
override def open(parameters: Configuration): Unit = {
super.open(parameters)
EventProcess.lock.synchronized {
if (EventProcess.YOUR_VAR == null) {
EventProcess.init(getRuntimeContext()...)
}
}
}