问题描述
我想通过转换API my reference guide处理kafka流。
运行我的应用mvn spring-boot:run
时,出现o.s.cloud.stream.binding.BindingService : Failed to create producer binding; retrying in 30 seconds
错误。我相信transform API既是消费者又是生产者。我该怎么办?
错误:
2020-11-03 18:19:29.956 INFO 57290 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1604445569955
2020-11-03 18:21:29.969 INFO 57290 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager : [AdminClient clientId=adminclient-1] Metadata update Failed
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
2020-11-03 18:21:30.073 INFO 57290 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager : [AdminClient clientId=adminclient-1] Metadata update Failed
org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited.
2020-11-03 18:21:30.079 ERROR 57290 --- [ main] o.s.cloud.stream.binding.BindingService : Failed to create producer binding; retrying in 30 seconds
org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception; nested exception is java.util.concurrent.TimeoutException
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createtopic(KafkaTopicProvisioner.java:332) ~[spring-cloud-stream-binder-kafka-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:148) ~[spring-cloud-stream-binder-kafka-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.cloud.stream.binder.kafka.streams.KStreamBinder.dobindProducer(KStreamBinder.java:118) ~[spring-cloud-stream-binder-kafka-streams-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.cloud.stream.binder.kafka.streams.KStreamBinder.dobindProducer(KStreamBinder.java:52) ~[spring-cloud-stream-binder-kafka-streams-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:152) ~[spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.cloud.stream.binding.BindingService.dobindProducer(BindingService.java:313) [spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:282) [spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:291) [spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:136) [spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58) [spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:1.8.0_251]
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34) [spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at com.metromile.test.Application.main(Application.java:11) ~[classes/:na]
Caused by: java.util.concurrent.TimeoutException: null
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272) ~[kafka-clients-2.3.1.jar:na]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createtopicAndPartitions(KafkaTopicProvisioner.java:368) ~[spring-cloud-stream-binder-kafka-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createtopicIfNecessary(KafkaTopicProvisioner.java:342) ~[spring-cloud-stream-binder-kafka-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createtopic(KafkaTopicProvisioner.java:319) ~[spring-cloud-stream-binder-kafka-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
... 27 common frames omitted
我的应用程序:
@Bean
public Function<KStream<String,String>,KStream<String,String>> mytransformer() {
return input -> input.transform(() -> new Transformer<String,String,keyvalue<String,String>>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public keyvalue<String,String> transform(String key,String value) {
log.info("key={}",key);
return keyvalue.pair(key,value);
}
@Override
public void close() {}
});
}
appilication.yaml的一部分
spring:
application:
id: mts
name: transformer-service
profiles:
active: local
kafka:
bootstrap-servers: ${CONFLUENT_BOOTSTRAP_SERVERS}
jaas:
enabled: true
control-flag: required
login-module: "org.apache.kafka.common.security.plain.PlainLoginModule"
options:
username: ${CONFLUENT_CLUSTER_API_KEY}
password: ${CONFLUENT_CLUSTER_API_SECRET}
security:
protocol: "SASL_SSL"
properties:
ssl.endpoint.identification.algorithm: "https"
sasl.mechanism: "PLAIN"
# Schema Registry
basic.auth.credentials.source: "USER_INFO"
basic.auth.user.info: ${CONFLUENT_SR_API_KEY}:${CONFLUENT_SR_API_SECRET}
schema.registry.url: ${SCHEMA_REGISTRY_URL}
# Schema SerDe
derive.type: true
delivery.timeout.ms: 360000
compression.type: gzip
cloud:
stream:
function:
deFinition: ${mts.functions.mytransformer.name}}
kafka:
streams:
binder:
configuration:
application:
server: ${KUBERNETES_SERVICE_HOST:localhost}:${KUBERNETES_SERVICE_PORT:8080}
replication:
factor: 3
functions:
mytransformer:
applicationId: ${spring.application.id}.${mts.functions.mytransformer.name}
min-partition-count: 6
bindings:
# enrich bindings
mytransformer-in-0:
destination: ${mts.topic.smooth.ingress}
mytransformer-out-0:
destination: ${mts.topic.smooth.outgress}
mts:
functions:
mytransformer:
name: mytransformer
topic:
smooth:
ingress: <topic 1>
outgress: <topic 2>
pom.xml的一部分
<properties>
<project.artifact.name>${project.artifactId}</project.artifact.name>
<java.version>1.8</java.version>
<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
<spring.boot.version>2.2.4.RELEASE</spring.boot.version>
<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
<springdoc-openapi.version>1.2.32</springdoc-openapi.version>
<confluent.version>5.5.1</confluent.version>
<swagger-annotations.version>2.1.5</swagger-annotations.version>
<lombok-version>1.18.4</lombok-version>
</properties>
<distributionManagement>
<repository>
<id>metromile-repo</id>
<url>https://mvn.internal.metromile.com/repository/metromile-repo</url>
</repository>
</distributionManagement>
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<!-- API documentation -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-ui</artifactId>
<version>${springdoc-openapi.version}</version>
</dependency>
<dependency>
<groupId>io.swagger.core.v3</groupId>
<artifactId>swagger-annotations</artifactId>
<version>${swagger-annotations.version}</version>
</dependency>
<!-- Kafka Streams -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
</dependencies>
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)