kafka云流:无法创建生产者绑定; 30秒后重试

问题描述

我想通过转换API my reference guide处理kafka流。 运行我的应用mvn spring-boot:run时,出现o.s.cloud.stream.binding.BindingService : Failed to create producer binding; retrying in 30 seconds错误。我相信transform API既是消费者又是生产者。我该怎么办?

错误

2020-11-03 18:19:29.956  INFO 57290 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1604445569955
2020-11-03 18:21:29.969  INFO 57290 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager         : [AdminClient clientId=adminclient-1] Metadata update Failed

org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

2020-11-03 18:21:30.073  INFO 57290 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager         : [AdminClient clientId=adminclient-1] Metadata update Failed

org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited.

2020-11-03 18:21:30.079 ERROR 57290 --- [           main] o.s.cloud.stream.binding.BindingService  : Failed to create producer binding; retrying in 30 seconds

org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception; nested exception is java.util.concurrent.TimeoutException
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createtopic(KafkaTopicProvisioner.java:332) ~[spring-cloud-stream-binder-kafka-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:148) ~[spring-cloud-stream-binder-kafka-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.streams.KStreamBinder.dobindProducer(KStreamBinder.java:118) ~[spring-cloud-stream-binder-kafka-streams-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.streams.KStreamBinder.dobindProducer(KStreamBinder.java:52) ~[spring-cloud-stream-binder-kafka-streams-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:152) ~[spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.dobindProducer(BindingService.java:313) [spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:282) [spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:291) [spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:136) [spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58) [spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:1.8.0_251]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34) [spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at com.metromile.test.Application.main(Application.java:11) ~[classes/:na]
Caused by: java.util.concurrent.TimeoutException: null
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272) ~[kafka-clients-2.3.1.jar:na]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createtopicAndPartitions(KafkaTopicProvisioner.java:368) ~[spring-cloud-stream-binder-kafka-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createtopicIfNecessary(KafkaTopicProvisioner.java:342) ~[spring-cloud-stream-binder-kafka-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createtopic(KafkaTopicProvisioner.java:319) ~[spring-cloud-stream-binder-kafka-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    ... 27 common frames omitted

我的应用程序:

 @Bean
    public Function<KStream<String,String>,KStream<String,String>> mytransformer() {
        return input -> input.transform(() -> new Transformer<String,String,keyvalue<String,String>>() {

            @Override
            public void init(ProcessorContext context) {
            }
        @Override
        public keyvalue<String,String> transform(String key,String value) {
            log.info("key={}",key);


            return keyvalue.pair(key,value);
        }


        @Override
        public void close() {}

    });
}

appilication.yaml的一部分

    spring:
  application:
    id: mts
    name: transformer-service
  profiles:
    active: local
  kafka:
    bootstrap-servers: ${CONFLUENT_BOOTSTRAP_SERVERS}
    jaas:
      enabled: true
      control-flag: required
      login-module: "org.apache.kafka.common.security.plain.PlainLoginModule"
      options:
        username: ${CONFLUENT_CLUSTER_API_KEY}
        password: ${CONFLUENT_CLUSTER_API_SECRET}
    security:
      protocol: "SASL_SSL"
    properties:
      ssl.endpoint.identification.algorithm: "https"
      sasl.mechanism: "PLAIN"
      # Schema Registry
      basic.auth.credentials.source: "USER_INFO"
      basic.auth.user.info: ${CONFLUENT_SR_API_KEY}:${CONFLUENT_SR_API_SECRET}
      schema.registry.url: ${SCHEMA_REGISTRY_URL}
      # Schema SerDe
      derive.type: true
      delivery.timeout.ms: 360000
      compression.type: gzip
  cloud:
    stream:
      function:
        deFinition: ${mts.functions.mytransformer.name}}
      kafka:
        streams:
          binder:
            configuration:
              application:
                server: ${KUBERNETES_SERVICE_HOST:localhost}:${KUBERNETES_SERVICE_PORT:8080}
              replication:
                factor: 3
            functions:
              mytransformer:
                applicationId: ${spring.application.id}.${mts.functions.mytransformer.name}
            min-partition-count: 6
      bindings:
        # enrich bindings
        mytransformer-in-0:
          destination: ${mts.topic.smooth.ingress}
        mytransformer-out-0:
          destination: ${mts.topic.smooth.outgress}
mts:
  functions:
    mytransformer:
      name: mytransformer

  topic:
    smooth:
      ingress: <topic 1>
      outgress: <topic 2>

pom.xml的一部分

    <properties>
    <project.artifact.name>${project.artifactId}</project.artifact.name>
    <java.version>1.8</java.version>
    <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
    <spring.boot.version>2.2.4.RELEASE</spring.boot.version>
    <spring-cloud.version>Hoxton.SR8</spring-cloud.version>
    <springdoc-openapi.version>1.2.32</springdoc-openapi.version>
    <confluent.version>5.5.1</confluent.version>
    <swagger-annotations.version>2.1.5</swagger-annotations.version>
    <lombok-version>1.18.4</lombok-version>
</properties>

<distributionManagement>
    <repository>
        <id>metromile-repo</id>
        <url>https://mvn.internal.metromile.com/repository/metromile-repo</url>
    </repository>
</distributionManagement>

<repositories>
    <repository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url>
    </repository>
</repositories>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>${spring.boot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-logging</artifactId>
    </dependency>

    <!-- API documentation -->
    <dependency>
        <groupId>org.springdoc</groupId>
        <artifactId>springdoc-openapi-ui</artifactId>
        <version>${springdoc-openapi.version}</version>
    </dependency>
    <dependency>
        <groupId>io.swagger.core.v3</groupId>
        <artifactId>swagger-annotations</artifactId>
        <version>${swagger-annotations.version}</version>
    </dependency>

    <!-- Kafka Streams -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
    </dependency>
</dependencies>

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)