问题描述
我试图从https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html#_usage_2网站运行Spring boot Kafka Stream示例。
能够成功构建它。但是在运行它时出现如下所示的错误( java.lang.IllegalStateException:org.springframework.cloud.stream.function.FunctionConfiguration 上的错误处理条件)。 谁能帮助找出代码中的问题?
ReplicateKafkaStreamApplication.java
package com.example.kafka.replicateKafka;
import org.apache.kafka.streams.keyvalue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor;
import org.springframework.messaging.handler.annotation.SendTo;
import java.util.Arrays;
import java.util.Date;
@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class ReplicateKafkaStreamApplication {
public static void main(String[] args) {
SpringApplication.run(ReplicateKafkaStreamApplication.class,args);
}
@StreamListener("input")
@SendTo("output")
public KStream<?,WordCount> process(KStream<?,String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key,value) -> value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-multi"))
.toStream()
.map((key,value) -> new keyvalue<>(null,new WordCount(key.key(),value,new Date(key.window().start()),new Date(key.window().end()))));
}
}
class WordCount {
private String word;
private long count;
private Date start;
private Date end;
public WordCount(String word,long count,Date start,Date end) {
this.word = word;
this.count = count;
this.start = start;
this.end = end;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
public Date getStart() {
return start;
}
public void setStart(Date start) {
this.start = start;
}
public Date getEnd() {
return end;
}
public void setEnd(Date end) {
this.end = end;
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example.kafka</groupId>
<artifactId>replicateKafkaStream</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>replicateKafkaStream</name>
<description>Demo project for replicating kafka</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>3.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kstream</artifactId>
<version>1.3.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__,| / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.3.3.RELEASE)
2020-08-19 18:48:26.339 INFO 13724 --- [ main] c.e.k.r.ReplicateKafkaStreamApplication : Starting ReplicateKafkaStreamApplication on DESKTOP-NS4292D with PID 13724 (C:\JavaReskillgit\replicateKafkaStream\target\classes started by BishwajeetNaik in C:\JavaReskillgit\replicateKafkaStream)
2020-08-19 18:48:26.344 INFO 13724 --- [ main] c.e.k.r.ReplicateKafkaStreamApplication : No active profile set,falling back to default profiles: default
2020-08-19 18:48:27.786 ERROR 13724 --- [ main] o.s.boot.SpringApplication : Application run Failed
java.lang.IllegalStateException: Error processing condition on org.springframework.cloud.stream.function.FunctionConfiguration
at org.springframework.boot.autoconfigure.condition.SpringBootCondition.matches(SpringBootCondition.java:60) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.context.annotation.ConditionEvaluator.shouldSkip(ConditionEvaluator.java:108) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.annotation.ConfigurationClassBeanDeFinitionReader$TrackedConditionEvaluator.shouldSkip(ConfigurationClassBeanDeFinitionReader.java:469) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.annotation.ConfigurationClassBeanDeFinitionReader.loadBeanDeFinitionsForConfigurationClass(ConfigurationClassBeanDeFinitionReader.java:131) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.annotation.ConfigurationClassBeanDeFinitionReader.loadBeanDeFinitions(ConfigurationClassBeanDeFinitionReader.java:120) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.annotation.ConfigurationClasspostProcessor.processConfigBeanDeFinitions(ConfigurationClasspostProcessor.java:331) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.annotation.ConfigurationClasspostProcessor.postProcessBeanDeFinitionRegistry(ConfigurationClasspostProcessor.java:236) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanDeFinitionRegistryPostProcessors(PostProcessorRegistrationDelegate.java:280) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.support.PostProcessorRegistrationDelegate.invokebeanfactoryPostProcessors(PostProcessorRegistrationDelegate.java:96) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.invokebeanfactoryPostProcessors(AbstractApplicationContext.java:707) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:533) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at com.example.kafka.replicateKafka.ReplicateKafkaStreamApplication.main(ReplicateKafkaStreamApplication.java:22) ~[classes/:na]
Caused by: java.lang.IllegalStateException: Failed to introspect Class [org.springframework.cloud.stream.binder.kstream.config.KStreamBinderSupportAutoConfiguration] from ClassLoader [jdk.internal.loader.ClassLoaders$AppClassLoader@368239c8]
at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:481) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:358) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.util.ReflectionUtils.getUniqueDeclaredMethods(ReflectionUtils.java:414) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapablebeanfactory.lambda$getTypeForFactoryMethod$2(AbstractAutowireCapablebeanfactory.java:742) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705) ~[na:na]
at org.springframework.beans.factory.support.AbstractAutowireCapablebeanfactory.getTypeForFactoryMethod(AbstractAutowireCapablebeanfactory.java:741) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapablebeanfactory.determinetargettype(AbstractAutowireCapablebeanfactory.java:680) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapablebeanfactory.predictBeanType(AbstractAutowireCapablebeanfactory.java:648) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.beans.factory.support.Abstractbeanfactory.isfactorybean(Abstractbeanfactory.java:1614) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.beans.factory.support.DefaultListablebeanfactory.doGetBeanNamesForType(DefaultListablebeanfactory.java:523) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.beans.factory.support.DefaultListablebeanfactory.getBeanNamesForType(DefaultListablebeanfactory.java:495) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.boot.autoconfigure.condition.OnBeanCondition.collectBeanNamesForType(OnBeanCondition.java:238) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.autoconfigure.condition.OnBeanCondition.getBeanNamesForType(OnBeanCondition.java:231) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.autoconfigure.condition.OnBeanCondition.getBeanNamesForType(OnBeanCondition.java:221) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.autoconfigure.condition.OnBeanCondition.getMatchingBeans(OnBeanCondition.java:169) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.autoconfigure.condition.OnBeanCondition.getMatchOutcome(OnBeanCondition.java:119) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.autoconfigure.condition.SpringBootCondition.matches(SpringBootCondition.java:47) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
... 17 common frames omitted
Caused by: java.lang.NoClassDefFoundError: org/springframework/kafka/core/KStreamBuilderfactorybean
at java.base/java.lang.class.getDeclaredMethods0(Native Method) ~[na:na]
at java.base/java.lang.class.privateGetDeclaredMethods(Class.java:3166) ~[na:na]
at java.base/java.lang.class.getDeclaredMethods(Class.java:2309) ~[na:na]
at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:463) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
... 33 common frames omitted
Caused by: java.lang.classNotFoundException: org.springframework.kafka.core.KStreamBuilderfactorybean
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[na:na]
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[na:na]
at java.base/java.lang.classLoader.loadClass(ClassLoader.java:521) ~[na:na]
... 37 common frames omitted
2020-08-19 18:48:27.801 WARN 13724 --- [ main] o.s.boot.SpringApplication : Unable to close ApplicationContext
java.lang.IllegalStateException: Failed to introspect Class [org.springframework.cloud.stream.binder.kstream.config.KStreamBinderSupportAutoConfiguration] from ClassLoader [jdk.internal.loader.ClassLoaders$AppClassLoader@368239c8]
at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:481) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:358) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.util.ReflectionUtils.getUniqueDeclaredMethods(ReflectionUtils.java:414) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapablebeanfactory.lambda$getTypeForFactoryMethod$2(AbstractAutowireCapablebeanfactory.java:742) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705) ~[na:na]
at org.springframework.beans.factory.support.AbstractAutowireCapablebeanfactory.getTypeForFactoryMethod(AbstractAutowireCapablebeanfactory.java:741) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapablebeanfactory.determinetargettype(AbstractAutowireCapablebeanfactory.java:680) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapablebeanfactory.predictBeanType(AbstractAutowireCapablebeanfactory.java:648) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.beans.factory.support.Abstractbeanfactory.isfactorybean(Abstractbeanfactory.java:1614) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.beans.factory.support.DefaultListablebeanfactory.doGetBeanNamesForType(DefaultListablebeanfactory.java:523) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.beans.factory.support.DefaultListablebeanfactory.getBeanNamesForType(DefaultListablebeanfactory.java:495) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.beans.factory.support.DefaultListablebeanfactory.getBeansOfType(DefaultListablebeanfactory.java:620) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.beans.factory.support.DefaultListablebeanfactory.getBeansOfType(DefaultListablebeanfactory.java:612) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.getBeansOfType(AbstractApplicationContext.java:1243) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.boot.SpringApplication.getExitCodeFromMappedException(SpringApplication.java:880) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.getExitCodeFromException(SpringApplication.java:868) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.handleExitCode(SpringApplication.java:855) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.handleRunFailure(SpringApplication.java:806) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:325) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at com.example.kafka.replicateKafka.ReplicateKafkaStreamApplication.main(ReplicateKafkaStreamApplication.java:22) ~[classes/:na]
Caused by: java.lang.NoClassDefFoundError: org/springframework/kafka/core/KStreamBuilderfactorybean
at java.base/java.lang.class.getDeclaredMethods0(Native Method) ~[na:na]
at java.base/java.lang.class.privateGetDeclaredMethods(Class.java:3166) ~[na:na]
at java.base/java.lang.class.getDeclaredMethods(Class.java:2309) ~[na:na]
at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:463) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
... 21 common frames omitted
Caused by: java.lang.classNotFoundException: org.springframework.kafka.core.KStreamBuilderfactorybean
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[na:na]
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[na:na]
at java.base/java.lang.classLoader.loadClass(ClassLoader.java:521) ~[na:na]
... 25 common frames omitted
Process finished with exit code 1
解决方法
您需要使用匹配版本的spring才能正确加载依赖项
真正的错误是NoClassDefFoundError
删除依赖项部分中的所有版本,并更新您的父级以使用
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
并从依赖项中删除