根据 Consumer.committablePartitionedSource 中分配的分区数调整并行度

问题描述

我正在尝试使用 Consumer.committablePartitionedSource() 并为每个分区创建流,如下所示

    public void setup() {
        control = Consumer.committablePartitionedSource(consumerSettings,Subscriptions.topics("chat").withPartitionAssignmentHandler(new PartitionAssignmentListener()))
                .mapAsyncUnordered(Integer.MAX_VALUE,pair -> setupsource(pair,committerSettings))
                .toMat(Sink.ignore(),Consumer::createDrainingControl)
                .run(Materializer.matFromSystem(actorSystem));
    }

    private CompletionStage<Done> setupsource(Pair<TopicPartition,Source<ConsumerMessage.CommittableMessage<String,String>,NotUsed>> pair,CommitterSettings committerSettings) {
        LOGGER.info("SETTING UP PARTITION-{} SOURCE",pair.first().partition());
        return pair.second().mapAsync(16,msg -> CompletableFuture.supplyAsync(() -> consumeMessage(msg),actorSystem.dispatcher())
                .thenApply(param -> msg.committableOffset()))
                .withAttributes(ActorAttributes.supervisionStrategy(ex -> Supervision.restart()))
                .runWith(Committer.sink(committerSettings),Materializer.matFromSystem(actorSystem));
    }

在为每个分区设置源时,我使用了并行性,我想根据分配给节点的分区数进行更改。我可以在第一次将分区分配给节点时做到这一点。但是随着新节点加入集群,分配的分区将被撤销和分配。这次流不发出已经存在的分区(由于 kafka 协作重新平衡协议)以重新配置并行性。

在这里,我在所有源中共享同一个调度程序,如果我在重新平衡时保持相同的并行性,我觉得每个分区消息处理的公平机会是不可能的。我对么?请指正

解决方法

如果我理解正确,您希望在 Kafka 重新平衡主题分区时动态变化的 Source 数量具有固定的并行性。

查看 Alpakka Kafka 文档 here 中的第一个示例。可以像这样根据您的示例进行调整:

 Consumer.DrainingControl<Done> control =
      Consumer.committablePartitionedSource(consumerSettings,Subscriptions.topics("chat"))
              .wireTap(p -> LOGGER.info("SETTING UP PARTITION-{} SOURCE",p.first().partition()))
              .flatMapMerge(Integer.MAX_VALUE,Pair::second)
              .mapAsync(
                16,msg -> CompletableFuture
                         .supplyAsync(() -> consumeMessage(msg),actorSystem.dispatcher())
                         .thenApply(param -> msg.committableOffset()))
              .withAttributes(
                ActorAttributes.supervisionStrategy(
                  ex -> Supervision.restart()))
              .toMat(Committer.sink(committerSettings),Consumer::createDrainingControl)
              .run(Materializer.matFromSystem(actorSystem));

因此,基本上 Consumer.committablePartitionedSource() 将在 Kafka 将分区分配给此消费者时发出 Source,并在先前分配的分区重新平衡并从此消费者中移除时终止此类 Source。>

flatMapMerge 将获取这些 Source 并合并它们输出的消息。

所有这些消息都将在 mapAsync 阶段竞争以得到处理。这种竞争的公平性实际上取决于上面的 flatMapMerge,它应该为所有 Source 提供平等的机会来发出它们的消息。无论有多少 Source 输出消息,它们都将在这里共享固定的并行性,我相信这就是您所追求的。

所有这些消息最终都会到达处理偏移提交的 Commiter.sink

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...