如何确保Executor服务能够处理StateStore中的数据而不会永远被阻止

问题描述

我有一个基于Kafka流的应用程序(使用低级API),简而言之,它执行以下操作:

流处理(具有process()方法的流处理)

  • 从Kafka主题读取数据

  • 将数据放入StateStore(由RocksDB支持)

Stream处理器在初始化期间生成一个线程(在init()方法中,称为工作线程):

  • 工作线程是执行程序服务,每1分钟唤醒一次

  • 从同一StateStore读取数据,然后进行处理。

我有一个要求,需要每2小时(而不是几分钟)唤醒此Worker executor服务。

由于流线程正在同一StateStore上执行读写操作,Worker执行程序服务将尝试从该StateStore进行读/写操作,因此我如何确保Worker线程确实起作用而不会永远被阻塞;很有可能,流线程将在StateStore上保持锁定/监视。

任何人都可以提供有关如何解决此问题的信息吗?

解决方法

我相信RocksDB API是线程安全的,因此这不是问题。

顺便说一句,您可以考虑使用Punctuator而不是生成其他线程。标点符号由Stream线程调用,因此它将是单个线程。

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...