使用Apache骆驼和Kafka时如何处理背压?

问题描述

我正在尝试编写一个使用Camel与Kafka集成的应用程序。 (版本-3.4.2)

我从这个question的答案中借鉴了一种方法

我有一条路由,可以监听来自Kafka主题的消息。通过使用简单的执行程序,该消息的处理与使用脱离了。每个处理都作为任务提交给此执行器。消息的顺序并不重要,唯一相关的因素是消息的处理速度和效率。一旦任务提交给执行者,我将禁用自动提交并手动提交消息。可以丢失当前正在处理的消息(由于崩溃/关闭),但是Kafka中从未提交进行处理的消息不应丢失(由于偏移的提交)。现在问问题,

  1. 如何有效处理负载?例如,有1000条消息,但一次只能并行处理100条消息。

现在,我的解决方案是阻止使用者轮询线程并尝试连续提交作业。但是暂停轮询将是更好的方法,但是我找不到在骆驼中实现此目的的任何方法

  1. 是否有更好的方法(骆驼方法)将处理与消耗分离并处理背压?

public static void main(String[] args) throws Exception {
        String consumerId = System.getProperty("consumerId","1");
        ExecutorService executor = new ThreadPoolExecutor(100,100,0L,TimeUnit.MILLISECONDS,new SynchronousQueue<>());
        LOGGER.info("Consumer {} starting....",consumerId);

        Main main = new Main();
        main.init();

        CamelContext context = main.getCamelContext();
        ComponentsBuilderFactory.kafka().brokers("localhost:9092").MetadataMaxAgeMs(120000).groupId("consumer")
                .autoOffsetReset("earliest").autoCommitEnable(false).allowManualCommit(true).maxPollRecords(100)
                .register(context,"kafka");

        ConsumerBean bean = new ConsumerBean();
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() {
                from("kafka:test").process(exchange -> {
                    LOGGER.info("Consumer {} - Exhange is {}",consumerId,exchange.getIn().getHeaders());
                    processtask(exchange);
                    commitOffset(exchange);
                });
            }

            private void processtask(Exchange exchange) throws InterruptedException {
                try {
                    executor.submit(() -> bean.execute(exchange.getIn().getBody(String.class)));
                } catch (Exception e) {
                    LOGGER.error("Exception occured {}",e.getMessage());
                    Thread.sleep(1000);
                    processtask(exchange);
                }
            }

            private void commitOffset(Exchange exchange) {
                boolean lastOne = exchange.getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT,Boolean.class);
                if (lastOne) {
                    KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT,KafkaManualCommit.class);
                    if (manual != null) {
                        LOGGER.info("manually committing the offset for batch");
                        manual.commitSync();
                    }
                } else {
                    LOGGER.info("NOT time to commit the offset yet");
                }
            }
        });

        main.run();
    }

解决方法

您可以为此目的使用throttle EIP。

from("your uri here")
.throttle(maxRequestCount)
.timePeriodMillis(inTimePeriodMs)
.to(yourProcessorUri)
.end()

请查看原始文档here