问题描述
-
我们正在调查与FLink群集中的GC问题有关的一些内存泄漏,在此过程中,我们发现了以下发现
-
我们正在Flink集群上运行Beam作业,我们正在利用侧面输入功能将一些其他数据馈送到用于某些转换的事件主流中
-
由于某种原因,我们的Flink作业已重新启动,并且在重新启动后进行堆转储后,我们可以看到侧面输入数据的多个副本处于Flink作业的状态(我们使用RicksDB作为状态后端)
-
主流有一个小时的窗口
// sudo code example
const ws = Ws(url,options)
ws('open',() => {
const chat = ws.subscribe('chat')
chat.on('ready',() => {
console.log('READY')
})
chat.on('message',() => {
console.log('Got a message')
}
chat.on('error',(error) => {
// eventually this will throw a jwt expired error
// at that point all the subscriptions close but I'm not sure what happens to
// the ws connection itself (since it's the object with the withJwtToken() below function)
// Not a wonderful method of kNowing the token has expired but all you get is a long human readable string in the error message
if (error.message.includes('E_JWT_TOKEN_EXPIRED')) {
const newToken = doTokenRefresh();
// Now what?
// close the ws connection and reconnect withJwtToken(newToken)?
// just re-open the channels
// is there a ws.reconnect(newToken) function or something?
}
})
chat.on('close',() => {
console.log('closed....probably due to expired jwt')
})
};
// Actually connect - but that's the only place to send a token
ws.withJwtToken(token).connect()
- 侧面输入流也有窗口
Window<KV<String,EventDetails>> window = Window.<KV<String,EventDetails>>into(FixedWindows.of(Duration.standardHours(hrs)))
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(500))))
.withAllowedLateness(Duration.ZERO,Window.ClosingBehavior.FIRE_ALWAYS)
.discardingFiredPanes();
- 请提出建议,代码逻辑是否存在问题?还是Beam的侧面输入有问题?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)