Apache Storm Kafka spout 在 X 元组后崩溃

问题描述

我目前遇到一个问题,即我们的大多数拓扑在 Kafka spout 发出约 100.000.000 个元组后崩溃。我收到以下日志消息:

2021-06-09 05:58:26.288 o.a.k.c.c.i.Fetcher Thread-15-kafka_fennec-executor[5,5] [INFO] [Consumer clientId=consumer-scylla-real-time-values-uniques-topology-1,groupId=scylla-real-time-values-uniques-topology] Fetch offset 4647977280 is out of range for partition fennec-2,resetting offset
2021-06-09 05:58:26.296 o.a.k.c.c.i.SubscriptionState Thread-15-kafka_fennec-executor[5,groupId=scylla-real-time-values-uniques-topology] Resetting offset for partition fennec-2 to offset 4550519581.
2021-06-09 05:58:39.959 o.a.s.u.Utils Thread-15-kafka_fennec-executor[5,5] [ERROR] Async loop died!
java.lang.IllegalStateException: The offset [4550519581] is below the current nextCommitOffset [4647975756] for [fennec-2]. This should not be possible,and likely indicates a bug in the spout's acking or emit logic.
    at org.apache.storm.kafka.spout.internal.OffsetManager.findNextCommitOffset(OffsetManager.java:141) ~[stormjar.jar:?]
    at org.apache.storm.kafka.spout.Kafkaspout.commitOffsetsForAckedTuples(Kafkaspout.java:508) ~[stormjar.jar:?]
    at org.apache.storm.kafka.spout.Kafkaspout.nextTuple(Kafkaspout.java:282) ~[stormjar.jar:?]
    at org.apache.storm.executor.spout.spoutExecutor$2.call(spoutExecutor.java:192) ~[storm-client-2.1.0.jar:2.1.0]
    at org.apache.storm.executor.spout.spoutExecutor$2.call(spoutExecutor.java:159) ~[storm-client-2.1.0.jar:2.1.0]
    at org.apache.storm.utils.Utils$1.run(Utils.java:392) [storm-client-2.1.0.jar:2.1.0]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
2021-06-09 05:58:39.968 o.a.s.e.e.ReportError Thread-15-kafka_fennec-executor[5,5] [ERROR] Error
java.lang.RuntimeException: java.lang.IllegalStateException: The offset [4550519581] is below the current nextCommitOffset [4647975756] for [fennec-2]. This should not be possible,and likely indicates a bug in the spout's acking or emit logic.
    at org.apache.storm.utils.Utils$1.run(Utils.java:407) ~[storm-client-2.1.0.jar:2.1.0]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
Caused by: java.lang.IllegalStateException: The offset [4550519581] is below the current nextCommitOffset [4647975756] for [fennec-2]. This should not be possible,and likely indicates a bug in the spout's acking or emit logic.
    at org.apache.storm.kafka.spout.internal.OffsetManager.findNextCommitOffset(OffsetManager.java:141) ~[stormjar.jar:?]
    at org.apache.storm.kafka.spout.Kafkaspout.commitOffsetsForAckedTuples(Kafkaspout.java:508) ~[stormjar.jar:?]
    at org.apache.storm.kafka.spout.Kafkaspout.nextTuple(Kafkaspout.java:282) ~[stormjar.jar:?]
    at org.apache.storm.executor.spout.spoutExecutor$2.call(spoutExecutor.java:192) ~[storm-client-2.1.0.jar:2.1.0]
    at org.apache.storm.executor.spout.spoutExecutor$2.call(spoutExecutor.java:159) ~[storm-client-2.1.0.jar:2.1.0]
    at org.apache.storm.utils.Utils$1.run(Utils.java:392) ~[storm-client-2.1.0.jar:2.1.0]
    ... 1 more
2021-06-09 05:58:39.993 o.a.s.u.Utils Thread-15-kafka_fennec-executor[5,5] [ERROR] Halting process: Worker died
java.lang.RuntimeException: Halting process: Worker died
    at org.apache.storm.utils.Utils.exitProcess(Utils.java:512) [storm-client-2.1.0.jar:2.1.0]
    at org.apache.storm.utils.Utils$3.run(Utils.java:835) [storm-client-2.1.0.jar:2.1.0]
    at org.apache.storm.executor.error.ReportErrorAndDie.uncaughtException(ReportErrorAndDie.java:41) [storm-client-2.1.0.jar:2.1.0]
    at java.lang.Thread.dispatchUncaughtException(Thread.java:1959) [?:1.8.0_275]2021-06-09 05:58:26.288 o.a.k.c.c.i.Fetcher Thread-15-kafka_fennec-executor[5,5] [ERROR] Halting process: Worker died
java.lang.RuntimeException: Halting process: Worker died
    at org.apache.storm.utils.Utils.exitProcess(Utils.java:512) [storm-client-2.1.0.jar:2.1.0]
    at org.apache.storm.utils.Utils$3.run(Utils.java:835) [storm-client-2.1.0.jar:2.1.0]
    at org.apache.storm.executor.error.ReportErrorAndDie.uncaughtException(ReportErrorAndDie.java:41) [storm-client-2.1.0.jar:2.1.0]
    at java.lang.Thread.dispatchUncaughtException(Thread.java:1959) [?:1.8.0_275]

Kafka spout 配置:

KafkaspoutConfig kafkaConfiguration = KafkaspoutConfig.builder(bootstrapServers,topic)
    .setProp(ConsumerConfig.GROUP_ID_CONfig,groupId)
    .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,ByteArrayDeserializer.class.getName())
    .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,ByteArrayDeserializer.class.getName())
    .setoffsetCommitPeriodMs(10_000)
    .setRetry(getRetryService())
    .setRecordTranslator(
        producerRecord -> new Values(producerRecord.value()),new Fields("capnp")
    )
    .build();

return new Kafkaspout(kafkaConfiguration);

将消息保证设置为 NONE,可解决此问题。但正如日志所说,它不应该发生。减少提交周期时间和最大未提交消息似乎并不能解决这个问题。 https://issues.apache.org/jira/browse/STORM-2666 这似乎是同样的问题,但是我们正在运行不应该发生这种情况的修补版本。有人知道如何正确解决这个问题吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)