kafka-streams警告kafka连接故障

问题描述

当kafka-streams应用程序正在运行并且Kafka突然关闭时,该应用程序进入“等待”模式,在其上发送警告日志的使用者和生产者线程无法连接,而当Kafka返回时,所有操作都应(理论上)恢复正常。 我正试图在这种情况下获得警报,但无法找到捕获该事件并发送日志/指标的地方。 我尝试了以下方法:

  1. --host 0.0.0.0,但仅在例外情况下会发生这种情况,
  2. 扩展streams.setUncaughtExceptionHandler并将ProductionExceptionHandler属性更改为扩展该接口的类。再次,与default.production.exception.handler一样,这里没有抛出异常,因此什么也没有发生。

我知道Kafka有自己的指标,我可以听一下并确定经纪人是否破产。但是在某些情况下,Kafka经纪人就可以了,而我的kafka-streams应用无法连接(即错误的身份验证配置或vpn / vpc问题)

我该怎么做才能发现这些问题并记录/报告?

更新

在没有kafka的情况下,查看消费者/生产者日志:

setUncaughtExceptionHandler

解决方法

这种情况很难通过程序检测。问题在于,客户端没有真正向Kafka Streams公开其状态,因此Kafka Streams并不真正了解断开连接。有一些KIP建议添加一个DISCONNECT状态,但是实现起来并不容易(参见https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams)。

您提到的异常处理程序在这种情况下无济于事,因为不会引发异常(至少不在Kafka Streams代码库中)。

您可以尝试监视消费者滞后或某些Kafka Streams指标(例如处理率)。他们可能提供足够好的代理。 cf https://docs.confluent.io/current/streams/monitoring.html

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...