Quarkus-Smallrye Kafka连接器:确认有效负载流

问题描述

我有以下生产者:

   @Outgoing("platform-outcode-stats")
    public Multi<OutcodeStats> produceOutCodeStats() {
        return outcodeStatsResource
                .getUkOutCodeStats()
                .onFailure().retry().atMost(1)
                .onOverflow().buffer(100);
    }

将有效负载流发布到“ platform-outcode-stats 主题。如docs中所述,此方法调用一次以检索发布服务器。到目前为止一切顺利。

我想创建一个单元测试,以确保正确创建和填充了该主题。 因此,我正在使用Testcontainer通过以下方式创建单个Kafka服务器:

public class KafkaServerResource implements QuarkusTestResourceLifecycleManager {

KafkaContainer kafkaServer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1"));

@Override
public Map<String,String> start() {
    kafkaServer.start();
    return Collections.singletonMap("kafka.bootstrap.servers",kafkaServer.getBootstrapServers());
}

@Override
public void stop() {
    kafkaServer.stop();
}

我创建了一个哑巴测试以查看会发生什么:

@Inject
@Channel("platform-outcode-stats")
@AckNowledgment(AckNowledgment.Strategy.PRE_PROCESSING)
Publisher<OutcodeStats> outcodeStats;

@Test
void topicShouldHaveStats(){
    Flowable.frompublisher(outcodeStats).toList().blockingGet().forEach(el-> System.out.println(el.getoutcode()));
}

该测试实际上没有测试任何内容,但我想查看控制台中是否打印了某些内容。好吧,由于没有消息确认,我将卡夫卡永远卡在以下日志中:

2020-10-24 18:00:21,173 INFO  [org.apa.kaf.cli.con.KafkaConsumer] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Subscribed to topic(s): platform-outcode-stats
2020-10-24 18:00:21,220 WARN  [org.apa.kaf.cli.NetworkClient] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Error while fetching Metadata with correlation id 3 : {platform-outcode-stats=leader_NOT_AVAILABLE}
2020-10-24 18:00:21,220 INFO  [org.apa.kaf.cli.Metadata] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Cluster ID: Yuo_aAylRgWyqJWKPV-zHQ
2020-10-24 18:00:21,311 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] discovered group coordinator localhost:32993 (id: 2147483646 rack: null)
2020-10-24 18:00:21,313 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] (Re-)joining group
2020-10-24 18:00:21,348 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Join group Failed with org.apache.kafka.common.errors.MemberIdrequiredException: The group member needs to have a valid member id before actually entering a consumer group
2020-10-24 18:00:21,376 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Finished assignment for group at generation 1: {consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1-970f67b0-7968-4995-b2c7-2bba95daf5d7=Assignment(partitions=[platform-outcode-stats-0])}
2020-10-24 18:00:21,416 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Successfully joined group with generation 1
2020-10-24 18:00:21,420 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Adding newly assigned partitions: platform-outcode-stats-0
2020-10-24 18:00:21,433 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Found no committed offset for partition platform-outcode-stats-0
2020-10-24 18:00:21,447 INFO  [org.apa.kaf.cli.con.int.SubscriptionState] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Resetting offset for partition platform-outcode-stats-0 to offset 0.
2020-10-24 18:03:36,158 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Revoke prevIoUsly assigned partitions platform-outcode-stats-0
2020-10-24 18:03:36,158 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Member consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1-970f67b0-7968-4995-b2c7-2bba95daf5d7 sending LeaveGroup request to coordinator localhost:32993 (id: 2147483646 rack: null) due to the consumer is being closed
2020-10-24 18:03:36,200 INFO  [org.apa.kaf.cli.pro.KafkaProducer] (vert.x-worker-thread-1) [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2020-10-24 18:03:36,227 INFO  [io.sma.rea.mes.provider] (Quarkus Test Cleanup Shutdown task) SRMSG00207: Cancel subscriptions
2020-10-24 18:03:36,238 INFO  [io.quarkus] (Quarkus Test Cleanup Shutdown task) Quarkus stopped in 0.100s

请确保我在这里遗漏了一些东西,或者只是在做我不应该做的事情。任何帮助将不胜感激。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)