问题描述
当kafka-streams应用程序正在运行并且Kafka突然关闭时,该应用程序进入“等待”模式,在其上发送警告日志的使用者和生产者线程无法连接,而当Kafka返回时,所有操作都应(理论上)恢复正常。 我正试图在这种情况下获得警报,但无法找到捕获该事件并发送日志/指标的地方。 我尝试了以下方法:
-
--host 0.0.0.0
,但仅在例外情况下会发生这种情况, - 扩展
streams.setUncaughtExceptionHandler
并将ProductionExceptionHandler
属性更改为扩展该接口的类。再次,与default.production.exception.handler
一样,这里没有抛出异常,因此什么也没有发生。
我知道Kafka有自己的指标,我可以听一下并确定经纪人是否破产。但是在某些情况下,Kafka经纪人就可以了,而我的kafka-streams应用无法连接(即错误的身份验证配置或vpn / vpc问题)
我该怎么做才能发现这些问题并记录/报告?
更新
在没有kafka的情况下,查看消费者/生产者日志:
setUncaughtExceptionHandler
解决方法
这种情况很难通过程序检测。问题在于,客户端没有真正向Kafka Streams公开其状态,因此Kafka Streams并不真正了解断开连接。有一些KIP建议添加一个DISCONNECT
状态,但是实现起来并不容易(参见https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams)。
您提到的异常处理程序在这种情况下无济于事,因为不会引发异常(至少不在Kafka Streams代码库中)。
您可以尝试监视消费者滞后或某些Kafka Streams指标(例如处理率)。他们可能提供足够好的代理。 cf https://docs.confluent.io/current/streams/monitoring.html