问题描述
我有一个基于Kafka流的应用程序(使用低级API),简而言之,它执行以下操作:
流处理(具有process()
方法的流处理)
-
从Kafka主题读取数据
-
将数据放入
StateStore
(由RocksDB
支持)
Stream处理器在初始化期间生成一个线程(在init()方法中,称为工作线程):
-
工作线程是执行程序服务,每1分钟唤醒一次
-
从同一StateStore读取数据,然后进行处理。
我有一个要求,需要每2小时(而不是几分钟)唤醒此Worker executor服务。
由于流线程正在同一StateStore上执行读写操作,Worker执行程序服务将尝试从该StateStore进行读/写操作,因此我如何确保Worker线程确实起作用而不会永远被阻塞;很有可能,流线程将在StateStore上保持锁定/监视。
任何人都可以提供有关如何解决此问题的信息吗?
解决方法
我相信RocksDB API是线程安全的,因此这不是问题。
顺便说一句,您可以考虑使用Punctuator而不是生成其他线程。标点符号由Stream线程调用,因此它将是单个线程。