卡夫卡消费者陷入困境

问题描述

我们使用Kafka流将其插入到Postgresql中,因为该流太高,避免了直接插入。消费者似乎工作得很好,但偶尔会卡住,找不到根本原因。

该消费者已经运行了大约6个月,已经消耗了数十亿条记录。我不明白为什么它最近会卡住。我什至不知道从哪里开始调试。

以下是用于处理记录的代码: `private static void readFromTopic(DataSource datasource,ConsumerOptions options){

    KafkaConsumer<String,String> consumer = KafkaConsumerConfig.createConsumerGroup(options);
    Producer<Long,String> producer = KafkaProducerConfig.createKafkaProducer(options);

    if (options.isReadFromAnOffset()) {
        // if want to assign particular offsets to consume from
        // will work for only a single partition for a consumer
        List<TopicPartition> tpartition = new ArrayList<TopicPartition>();
        tpartition.add(new TopicPartition(options.getTopicName(),options.getPartition()));
        consumer.assign(tpartition);
        consumer.seek(tpartition.get(0),options.getoffset());
    } else {
        // use auto assign partition & offsets
        consumer.subscribe(Arrays.asList(options.getTopicName()));
        log.debug("subscribed to topic {}",options.getTopicName());
    }

    List<Payload> payloads = new ArrayList<>();

    while (true) {

        // timer is the time to wait for messages to be received in the broker
        ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(50));

        if(records.count() != 0 )
        log.debug("poll size is {}",records.count());
        Set<TopicPartition> partitions = records.partitions();
        // reading normally as per round robin and the last committed offset
        for (ConsumerRecord<String,String> r : records) {

            log.debug(" Parition : {} Offset : {}",r.partition(),r.offset());

            try {

                JSONArray arr = new JSONArray(r.value());
                for (Object o : arr) {
                    Payload p = JsonIterator.deserialize(((JSONObject) o).toString(),Payload.class);
                    payloads.add(p);
                }

                List<Payload> steplist = new ArrayList<>();
                steplist.addAll(payloads);

                // Run a task specified by a Runnable Object asynchronously.
                CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
                    @Override
                    public void run() {

                        try {
                            Connection conn = datasource.getConnection();

                            PgInsert.insertIntoPg(steplist,conn,consumer,r,options.getTopicName(),options.getErrorTopic(),producer);

                        } catch (Exception e) {

                            log.error("error in processing future {}",e);
                        }

                    }
                },executorService);

                // used to combine all futures
                allfutures.add(future);

                payloads.clear();

            } catch (Exception e) {

                // pushing into new topic for records which have Failed
                log.debug("error in kafka consumer {}",e);
                ProducerRecord<Long,String> record = new ProducerRecord<Long,String>(options.getErrorTopic(),r.offset(),r.value());
                producer.send(record);

            }
        }

        // commiting after every poll
        consumer.commitSync();

        if (records.count() != 0) {
            Map<TopicPartition,OffsetAndMetadata> Metadata = consumer.committed(partitions);

            // reading the committed offsets for each partition after polling
            for (TopicPartition tpartition : partitions) {
                OffsetAndMetadata offsetdata = Metadata.get(tpartition);

                if (offsetdata != null && tpartition != null)
                    log.debug("committed offset is " + offsetdata.offset() + "  for topic partition "
                            + tpartition.partition());
            }
        }

        // waiting for all threads to complete after each poll
        try {
            waitForFuturesToEnd();
            allfutures.clear();
        } catch (InterruptedException e) {
            e.printstacktrace();
        } catch (ExecutionException e) {
            e.printstacktrace();
        }

    }
}`

我之前认为它卡住的原因是消耗的记录的大小,所以我将MAX_POLL_RECORDS_CONfig减小为10。这将确保在轮询中获取的记录不会超过200kb,因为每条记录的最大大小为20kb。

想用Spring框架解决此问题,但在此之前想知道为什么消费者会卡住。对此的任何见解都会有所帮助。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)