RocksDB在Flink中遇到消费者问题

问题描述

我有一份工作要从RabbitMQ消费,我正在使用FS State Backend,但是状态的大小似乎变大了,然后我决定将状态移动到RocksDB。 问题是,在运行的最初几个小时内情况很好,如果流量变慢,则在更多时间后发生事件,但是当流量再次变高时,消费者开始遇到问题(事件被堆积为未解决的问题)反映在应用程序的其余部分。

我有: 4个cpu内核
本地磁盘
16GB RAM
Unix环境
Flink 1.11
Scala 2.11版
1个单一作业,几乎没有keyedStreams运行,并且进行了大约10个转换,并下沉到Postgres

一些配置

flink.buffer_timeout=50
flink.maxparallelism=4
flink.memory=16
flink.cpu.cores=4
#checkpoints
flink.checkpointing_compression=true
flink.checkpointing_min_pause=30000
flink.checkpointing_timeout=120000
flink.checkpointing_enabled=true
flink.checkpointing_time=60000
flink.max_current_checkpoint=1
#RocksDB configuration
state.backend.rocksdb.localdir=home/username/checkpoints (this is not working don't kNow why)
state.backend.rocksdb.thread.numfactory=4
state.backend.rocksdb.block.blocksize=16kb
state.backend.rocksdb.block.cache-size=512mb
#rocksdb or heap
state.backend.rocksdb.timer-service.factory=heap (I have test with rocksdb too and is the same)
state.backend.rocksdb.predefined-options=SPINNING_disK_OPTIMIZED

让我知道是否需要更多信息?

解决方法

state.backend.rocksdb.localdir应该是绝对路径,而不是相对路径。而且此设置不是用于指定检查点的位置(不应在本地磁盘上),而此设置用于指定将工作状态保留在何处(应在本地磁盘上)。

您的工作正在承受背压,这意味着管道的某些部分无法跟上。造成背压的最常见原因是:(1)接收器无法跟上,以及(2)资源不足(例如,并行度太低)。

您可以通过运行带有丢弃接收器的作业来测试postgres是否是问题。

通过查看各种指标,您可以了解哪些资源可能配置不足。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...