问题描述
我有以下生产者:
@Outgoing("platform-outcode-stats")
public Multi<OutcodeStats> produceOutCodeStats() {
return outcodeStatsResource
.getUkOutCodeStats()
.onFailure().retry().atMost(1)
.onOverflow().buffer(100);
}
将有效负载流发布到“ platform-outcode-stats ”主题。如docs中所述,此方法被调用一次以检索发布服务器。到目前为止一切顺利。
我想创建一个单元测试,以确保正确创建和填充了该主题。 因此,我正在使用Testcontainer通过以下方式创建单个Kafka服务器:
public class KafkaServerResource implements QuarkusTestResourceLifecycleManager {
KafkaContainer kafkaServer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1"));
@Override
public Map<String,String> start() {
kafkaServer.start();
return Collections.singletonMap("kafka.bootstrap.servers",kafkaServer.getBootstrapServers());
}
@Override
public void stop() {
kafkaServer.stop();
}
我创建了一个哑巴测试以查看会发生什么:
@Inject
@Channel("platform-outcode-stats")
@AckNowledgment(AckNowledgment.Strategy.PRE_PROCESSING)
Publisher<OutcodeStats> outcodeStats;
@Test
void topicShouldHaveStats(){
Flowable.frompublisher(outcodeStats).toList().blockingGet().forEach(el-> System.out.println(el.getoutcode()));
}
该测试实际上没有测试任何内容,但我想查看控制台中是否打印了某些内容。好吧,由于没有消息确认,我将卡夫卡永远卡在以下日志中:
2020-10-24 18:00:21,173 INFO [org.apa.kaf.cli.con.KafkaConsumer] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Subscribed to topic(s): platform-outcode-stats
2020-10-24 18:00:21,220 WARN [org.apa.kaf.cli.NetworkClient] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Error while fetching Metadata with correlation id 3 : {platform-outcode-stats=leader_NOT_AVAILABLE}
2020-10-24 18:00:21,220 INFO [org.apa.kaf.cli.Metadata] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Cluster ID: Yuo_aAylRgWyqJWKPV-zHQ
2020-10-24 18:00:21,311 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] discovered group coordinator localhost:32993 (id: 2147483646 rack: null)
2020-10-24 18:00:21,313 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] (Re-)joining group
2020-10-24 18:00:21,348 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Join group Failed with org.apache.kafka.common.errors.MemberIdrequiredException: The group member needs to have a valid member id before actually entering a consumer group
2020-10-24 18:00:21,376 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Finished assignment for group at generation 1: {consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1-970f67b0-7968-4995-b2c7-2bba95daf5d7=Assignment(partitions=[platform-outcode-stats-0])}
2020-10-24 18:00:21,416 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Successfully joined group with generation 1
2020-10-24 18:00:21,420 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Adding newly assigned partitions: platform-outcode-stats-0
2020-10-24 18:00:21,433 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Found no committed offset for partition platform-outcode-stats-0
2020-10-24 18:00:21,447 INFO [org.apa.kaf.cli.con.int.SubscriptionState] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Resetting offset for partition platform-outcode-stats-0 to offset 0.
2020-10-24 18:03:36,158 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Revoke prevIoUsly assigned partitions platform-outcode-stats-0
2020-10-24 18:03:36,158 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1,groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Member consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1-970f67b0-7968-4995-b2c7-2bba95daf5d7 sending LeaveGroup request to coordinator localhost:32993 (id: 2147483646 rack: null) due to the consumer is being closed
2020-10-24 18:03:36,200 INFO [org.apa.kaf.cli.pro.KafkaProducer] (vert.x-worker-thread-1) [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2020-10-24 18:03:36,227 INFO [io.sma.rea.mes.provider] (Quarkus Test Cleanup Shutdown task) SRMSG00207: Cancel subscriptions
2020-10-24 18:03:36,238 INFO [io.quarkus] (Quarkus Test Cleanup Shutdown task) Quarkus stopped in 0.100s
请确保我在这里遗漏了一些东西,或者只是在做我不应该做的事情。任何帮助将不胜感激。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)