需要有关Spring Boot Kafka Stream Binder应用程序错误的建议

问题描述

我试图从https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html#_usage_2网站运行Spring boot Kafka Stream示例。

能够成功构建它。但是在运行它时出现如下所示的错误 java.lang.IllegalStateException:org.springframework.cloud.stream.function.FunctionConfiguration 上的错误处理条件)。 谁能帮助找出代码中的问题?

ReplicateKafkaStreamApplication.java

package com.example.kafka.replicateKafka;

import org.apache.kafka.streams.keyvalue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor;
import org.springframework.messaging.handler.annotation.SendTo;

import java.util.Arrays;
import java.util.Date;

@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class ReplicateKafkaStreamApplication {

    public static void main(String[] args) {
        SpringApplication.run(ReplicateKafkaStreamApplication.class,args);
    }

    @StreamListener("input")
    @SendTo("output")
    public KStream<?,WordCount> process(KStream<?,String> input) {
        return input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key,value) -> value)
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("WordCounts-multi"))
                .toStream()
                .map((key,value) -> new keyvalue<>(null,new WordCount(key.key(),value,new Date(key.window().start()),new Date(key.window().end()))));
    }

}

class WordCount {

    private String word;

    private long count;

    private Date start;

    private Date end;

    public WordCount(String word,long count,Date start,Date end) {
        this.word = word;
        this.count = count;
        this.start = start;
        this.end = end;
    }

    public String getWord() {
        return word;
    }

    public void setWord(String word) {
        this.word = word;
    }

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }

    public Date getStart() {
        return start;
    }

    public void setStart(Date start) {
        this.start = start;
    }

    public Date getEnd() {
        return end;
    }

    public void setEnd(Date end) {
        this.end = end;
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example.kafka</groupId>
    <artifactId>replicateKafkaStream</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>replicateKafkaStream</name>
    <description>Demo project for replicating kafka</description>

    <properties>
        <java.version>11</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
            <version>3.0.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kstream</artifactId>
            <version>1.3.4.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
            <version>2.0.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

错误

.   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__,| / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.3.3.RELEASE)

2020-08-19 18:48:26.339  INFO 13724 --- [           main] c.e.k.r.ReplicateKafkaStreamApplication  : Starting ReplicateKafkaStreamApplication on DESKTOP-NS4292D with PID 13724 (C:\JavaReskillgit\replicateKafkaStream\target\classes started by BishwajeetNaik in C:\JavaReskillgit\replicateKafkaStream)
2020-08-19 18:48:26.344  INFO 13724 --- [           main] c.e.k.r.ReplicateKafkaStreamApplication  : No active profile set,falling back to default profiles: default
2020-08-19 18:48:27.786 ERROR 13724 --- [           main] o.s.boot.SpringApplication               : Application run Failed

java.lang.IllegalStateException: Error processing condition on org.springframework.cloud.stream.function.FunctionConfiguration
    at org.springframework.boot.autoconfigure.condition.SpringBootCondition.matches(SpringBootCondition.java:60) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.context.annotation.ConditionEvaluator.shouldSkip(ConditionEvaluator.java:108) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassBeanDeFinitionReader$TrackedConditionEvaluator.shouldSkip(ConfigurationClassBeanDeFinitionReader.java:469) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassBeanDeFinitionReader.loadBeanDeFinitionsForConfigurationClass(ConfigurationClassBeanDeFinitionReader.java:131) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassBeanDeFinitionReader.loadBeanDeFinitions(ConfigurationClassBeanDeFinitionReader.java:120) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.annotation.ConfigurationClasspostProcessor.processConfigBeanDeFinitions(ConfigurationClasspostProcessor.java:331) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.annotation.ConfigurationClasspostProcessor.postProcessBeanDeFinitionRegistry(ConfigurationClasspostProcessor.java:236) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanDeFinitionRegistryPostProcessors(PostProcessorRegistrationDelegate.java:280) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.PostProcessorRegistrationDelegate.invokebeanfactoryPostProcessors(PostProcessorRegistrationDelegate.java:96) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.invokebeanfactoryPostProcessors(AbstractApplicationContext.java:707) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:533) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at com.example.kafka.replicateKafka.ReplicateKafkaStreamApplication.main(ReplicateKafkaStreamApplication.java:22) ~[classes/:na]
Caused by: java.lang.IllegalStateException: Failed to introspect Class [org.springframework.cloud.stream.binder.kstream.config.KStreamBinderSupportAutoConfiguration] from ClassLoader [jdk.internal.loader.ClassLoaders$AppClassLoader@368239c8]
    at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:481) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:358) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.util.ReflectionUtils.getUniqueDeclaredMethods(ReflectionUtils.java:414) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapablebeanfactory.lambda$getTypeForFactoryMethod$2(AbstractAutowireCapablebeanfactory.java:742) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705) ~[na:na]
    at org.springframework.beans.factory.support.AbstractAutowireCapablebeanfactory.getTypeForFactoryMethod(AbstractAutowireCapablebeanfactory.java:741) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapablebeanfactory.determinetargettype(AbstractAutowireCapablebeanfactory.java:680) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapablebeanfactory.predictBeanType(AbstractAutowireCapablebeanfactory.java:648) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.Abstractbeanfactory.isfactorybean(Abstractbeanfactory.java:1614) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.DefaultListablebeanfactory.doGetBeanNamesForType(DefaultListablebeanfactory.java:523) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.DefaultListablebeanfactory.getBeanNamesForType(DefaultListablebeanfactory.java:495) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.boot.autoconfigure.condition.OnBeanCondition.collectBeanNamesForType(OnBeanCondition.java:238) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.autoconfigure.condition.OnBeanCondition.getBeanNamesForType(OnBeanCondition.java:231) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.autoconfigure.condition.OnBeanCondition.getBeanNamesForType(OnBeanCondition.java:221) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.autoconfigure.condition.OnBeanCondition.getMatchingBeans(OnBeanCondition.java:169) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.autoconfigure.condition.OnBeanCondition.getMatchOutcome(OnBeanCondition.java:119) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.autoconfigure.condition.SpringBootCondition.matches(SpringBootCondition.java:47) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    ... 17 common frames omitted
Caused by: java.lang.NoClassDefFoundError: org/springframework/kafka/core/KStreamBuilderfactorybean
    at java.base/java.lang.class.getDeclaredMethods0(Native Method) ~[na:na]
    at java.base/java.lang.class.privateGetDeclaredMethods(Class.java:3166) ~[na:na]
    at java.base/java.lang.class.getDeclaredMethods(Class.java:2309) ~[na:na]
    at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:463) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    ... 33 common frames omitted
Caused by: java.lang.classNotFoundException: org.springframework.kafka.core.KStreamBuilderfactorybean
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[na:na]
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[na:na]
    at java.base/java.lang.classLoader.loadClass(ClassLoader.java:521) ~[na:na]
    ... 37 common frames omitted

2020-08-19 18:48:27.801  WARN 13724 --- [           main] o.s.boot.SpringApplication               : Unable to close ApplicationContext

java.lang.IllegalStateException: Failed to introspect Class [org.springframework.cloud.stream.binder.kstream.config.KStreamBinderSupportAutoConfiguration] from ClassLoader [jdk.internal.loader.ClassLoaders$AppClassLoader@368239c8]
    at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:481) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:358) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.util.ReflectionUtils.getUniqueDeclaredMethods(ReflectionUtils.java:414) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapablebeanfactory.lambda$getTypeForFactoryMethod$2(AbstractAutowireCapablebeanfactory.java:742) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705) ~[na:na]
    at org.springframework.beans.factory.support.AbstractAutowireCapablebeanfactory.getTypeForFactoryMethod(AbstractAutowireCapablebeanfactory.java:741) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapablebeanfactory.determinetargettype(AbstractAutowireCapablebeanfactory.java:680) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapablebeanfactory.predictBeanType(AbstractAutowireCapablebeanfactory.java:648) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.Abstractbeanfactory.isfactorybean(Abstractbeanfactory.java:1614) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.DefaultListablebeanfactory.doGetBeanNamesForType(DefaultListablebeanfactory.java:523) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.DefaultListablebeanfactory.getBeanNamesForType(DefaultListablebeanfactory.java:495) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.DefaultListablebeanfactory.getBeansOfType(DefaultListablebeanfactory.java:620) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.DefaultListablebeanfactory.getBeansOfType(DefaultListablebeanfactory.java:612) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.getBeansOfType(AbstractApplicationContext.java:1243) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.boot.SpringApplication.getExitCodeFromMappedException(SpringApplication.java:880) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.getExitCodeFromException(SpringApplication.java:868) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.handleExitCode(SpringApplication.java:855) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.handleRunFailure(SpringApplication.java:806) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:325) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at com.example.kafka.replicateKafka.ReplicateKafkaStreamApplication.main(ReplicateKafkaStreamApplication.java:22) ~[classes/:na]
Caused by: java.lang.NoClassDefFoundError: org/springframework/kafka/core/KStreamBuilderfactorybean
    at java.base/java.lang.class.getDeclaredMethods0(Native Method) ~[na:na]
    at java.base/java.lang.class.privateGetDeclaredMethods(Class.java:3166) ~[na:na]
    at java.base/java.lang.class.getDeclaredMethods(Class.java:2309) ~[na:na]
    at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:463) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    ... 21 common frames omitted
Caused by: java.lang.classNotFoundException: org.springframework.kafka.core.KStreamBuilderfactorybean
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[na:na]
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[na:na]
    at java.base/java.lang.classLoader.loadClass(ClassLoader.java:521) ~[na:na]
    ... 25 common frames omitted


Process finished with exit code 1

解决方法

您需要使用匹配版本的spring才能正确加载依赖项

真正的错误是NoClassDefFoundError

删除依赖项部分中的所有版本,并更新您的父级以使用

<artifactId>spring-cloud-starter-stream-kafka</artifactId>

并从依赖项中删除

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...