问题描述
让我们假设我有一个max.parallelism=4
的工作和一个RichFlatMapFunction
正在使用MapState
的工作。创建MapStateDescriptor
的最佳方法是什么?放入RichFlatMapFunction
中,这意味着对于该类的每个实例,我将拥有一个描述符,或创建该描述符的单个实例,例如:在单个类中的public static MapStateDescriptor descriptor
并从{{ 1}}?因为以这种方式进行操作,我只有一个RichFlatMapFunction
而不是4,还是我误解了某些东西?
亲切的问候!
解决方法
几点...
- 由于每个
RichFlatMapFunction
子任务都可以在不同服务器上的不同JVM中运行,因此它们将如何共享静态MapStateDescriptor
? - 请注意,Flink的“最大并行度”与默认环境并行度不同。通常,您只希望保留最大并行度值,并(如果需要)将环境并行度设置为等于群集中的插槽数。
-
MapStateDescriptor
不存储状态。它告诉Flink如何创建状态。在RichFlatMapFunction
运算符的open()
调用中,您将使用状态描述符 创建状态。
因此,net-net不会麻烦使用静态MapStateDescriptor
,这将无济于事。只需在open()
方法中创建状态(根据许多示例)即可。