Kafka 消费者的问题假设成员在当前一代中未知

问题描述

我使用带有 Kafka 消费者输入插件的 Telegraf 将消息转发到 InfluxDB。尝试从 Kafka 服务器消费时,Telegraf 日志显示以下内容

Apr 12 11:25:13 algtigtelegraf telegraf[848255]: 2021-04-12T08:25:13Z E! [inputs.kafka_consumer] Error in plugin: kafka: error while consuming telegraf/0: read tcp TELEGRAF_IP:38494->KAFKA_IP:9093: I/O timeout
Apr 12 11:25:28 algtigtelegraf telegraf[848255]: 2021-04-12T08:25:28Z E! [inputs.kafka_consumer] Error in plugin: kafka: error while consuming telegraf/0: kafka server: The provided member is not kNown in the current generation.
Apr 12 11:25:28 algtigtelegraf telegraf[848255]: 2021-04-12T08:25:28Z E! [inputs.kafka_consumer] Error in plugin: kafka: error while consuming telegraf/0: kafka server: The provided member is not kNown in the current generation.
Apr 12 11:25:28 algtigtelegraf telegraf[848255]: 2021-04-12T08:25:28Z E! [inputs.kafka_consumer] Error in plugin: kafka: error while consuming telegraf/0: kafka server: The provided member is not kNown in the current generation.
Apr 12 11:25:28 algtigtelegraf telegraf[848255]: 2021-04-12T08:25:28Z E! [inputs.kafka_consumer] Error in plugin: kafka: error while consuming telegraf/0: kafka server: The provided member is not kNown in the current generation.

通过查看 Kafka 日志,我可以看到与上述错误相关的以下内容

[2021-04-12 08:26:42,559] INFO [GroupCoordinator 0]: Preparing to rebalance group telegraf_metrics_consumers in state PreparingRebalance with old generation 238 (__consumer_offsets-6) (reason: removing member Telegraf-bd5ba2ac-e037-4d06-91d3-155d9cc63981 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2021-04-12 08:26:42,560] INFO [GroupCoordinator 0]: Group telegraf_metrics_consumers with generation 239 is Now empty (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)
[2021-04-12 08:27:01,424] INFO [GroupCoordinator 0]: Dynamic Member with unkNown member id joins group telegraf_metrics_consumers in Empty state. Created a new member id Telegraf-b1168722-a4f9-42cc-b0c3-10c78e2cf4d1 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2021-04-12 08:27:01,424] INFO [GroupCoordinator 0]: Preparing to rebalance group telegraf_metrics_consumers in state PreparingRebalance with old generation 239 (__consumer_offsets-6) (reason: Adding new member Telegraf-b1168722-a4f9-42cc-b0c3-10c78e2cf4d1 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2021-04-12 08:27:01,426] INFO [GroupCoordinator 0]: Stabilized group telegraf_metrics_consumers generation 240 (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)
[2021-04-12 08:27:01,441] INFO [GroupCoordinator 0]: Assignment received from leader for group telegraf_metrics_consumers for generation 240 (kafka.coordinator.group.GroupCoordinator)
[2021-04-12 08:27:11,459] INFO [GroupCoordinator 0]: Member Telegraf-b1168722-a4f9-42cc-b0c3-10c78e2cf4d1 in group telegraf_metrics_consumers has Failed,removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2021-04-12 08:27:11,459] INFO [GroupCoordinator 0]: Preparing to rebalance group telegraf_metrics_consumers in state PreparingRebalance with old generation 240 (__consumer_offsets-6) (reason: removing member Telegraf-b1168722-a4f9-42cc-b0c3-10c78e2cf4d1 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2021-04-12 08:27:11,459] INFO [GroupCoordinator 0]: Group telegraf_metrics_consumers with generation 241 is Now empty (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)
[2021-04-12 08:27:31,687] INFO [GroupCoordinator 0]: Dynamic Member with unkNown member id joins group telegraf_metrics_consumers in Empty state. Created a new member id Telegraf-96ac6751-4f2e-4a67-80ee-1175c30540e4 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)

我对 kafka_consumer 输入插件使用以下电报配置:

[[inputs.kafka_consumer]]
  ## Kafka brokers.
  brokers = ["KAFKA_IP:9093"]

  ## Topics to consume.
  topics = ["telegraf"]

  ## SSL parameters.
  insecure_skip_verify = true

  ## Data format
  data_format = "influx"

  • Kafka 版本:2.13-2.6.1
  • 电报版本:1.18.0

当我在网络内运行 Telegraf kafka_consumer 并通过其内部 IP 访问 Kafka 服务器时,它按预期工作。在这种情况下,我试图通过其公共 IP 访问 Kafka 服务器。 知道在这种情况下问题出在哪里吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...