java – Kafka KStreams – 处理超时

我试图使用< KStream> .process()与Time Windows.of(“name”,30000)批量处理一些KTable值并发送它们.似乎30秒超过了消费者超时间隔,之后Kafka认为该消费者已经解散并释放分区.

我已经尝试提高轮询频率和提交间隔以避免这种情况:

config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONfig,"5000");
config.put(StreamsConfig.POLL_MS_CONfig,"5000");

不幸的是,这些错误仍在发生:

(很多这些)

ERROR  o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog 
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting Metadata from brokers for kafka_test1-write_aggregate2-changelog-0

其次是:

INFO   o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1
WARN   o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]: 
  org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms,which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)

显然,我需要更频繁地将心跳发送回服务器.怎么样?

我的拓扑结构是:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String,String> lines = kStreamBuilder.stream(TOPIC);
KTable<Windowed<String>,String>  kt = lines.aggregateByKey(
            new DBAggregateInit(),new DBAggregate(),TimeWindows.of("write_aggregate2",30000));

DBProcessorsupplier dbProcessorsupplier = new DBProcessorsupplier();

kt.toStream().process(dbProcessorsupplier);
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder,streamsConfig);

kafkaStreams.start();

KTable每隔30秒按键对值进行分组.在Processor.init()中,我调用context.schedule(30000).

DBProcessorsupplier提供DBProcessor的实例.这是AbstractProcessor的一个实现,其中提供了所有覆盖.他们只做LOG,所以我知道每个人都被击中.

这是一个非常简单的拓扑结构,但很明显我在某个地方错过了一个步骤.

编辑:

我知道我可以在服务器端进行调整,但我希望有一个客户端解决方案.我喜欢在客户端退出/死亡时很快就可以使用分区的概念.

编辑:

为了简化问题,我从图中删除了聚合步骤.它现在只是消费者 – >处理器(). (如果我将消费者直接发送到.print(),它会很快工作,所以我知道没关系). (类似地,如果我通过.print()输出聚合(KTable),它似乎也可以.

我发现.process() – 应该每隔30秒调用一次.punctuate()实际上阻塞了可变长度的时间并且随机输出(如果有的话).

> Main program
> Debug output
> Processor Supplier
> Processor

进一步:

我将调试级别设置为’debug’并重新启动.我看到很多消息:

DEBUG  o.a.k.s.p.internals.StreamTask - Start processing one record [ConsumerRecord <info>

但是.punctuate()函数中的断点没有被击中.所以它做了很多工作,但没有让我有机会使用它.

解决方法

一些澄清:

> StreamsConfig.COMMIT_INTERVAL_MS_CONfig是提交间隔的下限,即在提交之后,下一次提交不会在此时间之前发生.基本上,Kafka Stream试图在这段时间过后尽快提交,但无法保证下次提交实际需要多长时间.
> StreamsConfig.POLL_MS_CONfig用于内部KafkaConsumer#poll()调用,以指定poll()调用的最大阻塞时间.

因此,这两个值对心跳更有帮助.

Kafka Streams在处理记录时遵循“深度优先”策略.这意味着,在每个记录的poll()之后,将执行拓扑的所有运算符.假设你有三个连续的地图,那么在下一个/第二个记录被处理之前,将为第一个记录调用所有三个地图.

因此,在第一次poll()的所有记录被完全处理之后,将进行下一次poll()调用.如果你想更频繁地心跳,你需要确保一个poll()调用获取更少的记录,这样处理所有记录所需的时间更少,下一个poll()将更早被触发.

您可以使用可通过StreamsConfig指定的KafkaConsumer配置参数来完成此操作(请参阅https://kafka.apache.org/documentation.html#consumerconfigs):

streamConfig.put(ConsumerConfig.XXX,VALUE);

> max.poll.records:如果减小此值,将轮询更少的记录
> session.timeout.ms:如果增加此值,则有更多时间处理数据(为了完整性而添加此项,因为它实际上是客户端设置而不是服务器/代理端配置 – 即使您知道此解决方案并且不喜欢它:))

EDIT

As of Kafka 0.10.1 it is possible (and recommended) to prefix consumer and procuder configs within streams config. This avoids parameter conflicts as some parameter names are used for consumer and producer and cannot be distinguiesh otherwise (and would be applied to consumer and producer at the same time).
To prefix a parameter you can use StreamsConfig#consumerPrefix() or StreamsConfig#producerPrefix(),respectively. For example:
streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.ParaMETER),VALUE);

还有一件事要提到:这个问题中描述的场景是一个已知问题,已经有KIP-62为KafkaConsumer引入了一个发送心跳的后台线程,从而将heartbeats与poll()调用分离. Kafka Streams将在即将发布的版本中利用这一新功能.

相关文章

# 前言 现有主流消息中间件都是生产者-消费者模型,主要角色...
错误的根源是:kafka版本过高所致,2.2+=的版本,已经不需要...
DWS层主要是存放大宽表数据,此业务中主要是针对Kafka topic...
不多BB讲原理,只教你怎么用,看了全网没有比我更详细的了,...
终于写完了,其实最开始学kafka的时候是今年2月份,那时候还...
使用GPKafka实现Kafka数据导入Greenplum数据库踩坑问题记录(...