Kotlin应用程序配置avro的kafka使用者和produser失败

问题描述

当我尝试通过application.yaml配置kafka时,在配置使用者和生产者以及avro序列化程序和序列化程序时遇到错误。该应用程序是用kotlin编写的。 application.yaml:

spring:
  application:
    name: test
  cloud:
    consul:
      enabled: false
    stream:
      bindings:
        input:
          destination: dest
          group: group
          consumer:
            concurrency: 12
            instanceCount: 2
            partitioned: true
            useNativeDecoding: true
        output:
          destination: dest
          group: grout
          producer:
            partitionKeyExpression: headers['header']
            partitionCount: 24
            headerMode: headers
            useNativeEncoding: true
      kafka:
        bindings:
          input:
            consumer:
              startOffset: latest
              resetoffsets: true
              configuration:
                client.id: test
                key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
                value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
                schema.registry.url: urls..
          output:
            producer:
              configuration:
                client.id: test
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                schema.registry.url: urls..
        binder:
          brokers: brokers..
          configuration:
            sasl.mechanism: PLAIN
            security.protocol: SASL_PLAINTEXT
            sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
          required-acks: -1
          replication-factor: 3

错误

Producer

    org.apache.

kafka.common.KafkaException: Failed to construct kafka producer
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:457)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:318)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:305)
        at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.lambda$createProducerMessageHandler$0(KafkaMessageChannelBinder.java:287)
        at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.lambda$getPartitionsForTopic$2(KafkaTopicProvisioner.java:410)
        at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
        at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:164)
        at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:405)
        at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:284)
        at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:132)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.dobindProducer(AbstractMessageChannelBinder.java:184)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.dobindProducer(AbstractMessageChannelBinder.java:90)
        at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:151)
        at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleProducerBinding$2(BindingService.java:290)
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
    Caused by: org.apache.kafka.common.KafkaException: Could not instantiate class io.confluent.kafka.serializers.KafkaAvroSerializer
        at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:310)
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:302)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:370)
        ... 22 common frames omitted
    Caused by: java.lang.reflect.InvocationTargetException: null
        at jdk.internal.reflect.GeneratedConstructorAccessor58.newInstance(UnkNown Source)
        at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
        at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:306)
        ... 24 common frames omitted
    Caused by: java.lang.NoClassDefFoundError: io/confluent/common/Configurable
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerDe.<init>(AbstractKafkaAvroSerDe.java:46)
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.<init>(AbstractKafkaAvroSerializer.java:37)
        at io.confluent.kafka.serializers.KafkaAvroSerializer.<init>(KafkaAvroSerializer.java:32)
        ... 28 common frames omitted

对于消费者类似的用户,只能使用反序列化器。

Gradle中的

依赖项

implementation ("org.springframework.cloud:spring-cloud-stream-binder-kafka:2.1.1.RELEASE")
implementation ("io.confluent:kafka-schema-registry-client:4.1.1")
implementation ("io.confluent:kafka-avro-serializer:4.1.1")
implementation ("org.apache.avro:avro-compiler:1.8.2")

可能与什么有关?

解决方法

首先,要在build.gradle.kts中生成avro,请添加:

sourceSets {
    main {
        java {
            srcDirs("src/main/kotlin","scripts","build/avro")
        }
    }
}

repositories {
    mavenCentral()
    maven { url = uri("http://packages.confluent.io/maven/") }
}

dependencies {
//.......another dependencies 
    implementation ("org.springframework.cloud:spring-cloud-stream-binder-kafka:2.1.1.RELEASE")
    implementation ("io.confluent:kafka-schema-registry-client:4.1.1") {
        exclude(group = "org.apache.kafka",module = "kafka-clients")
    }
    implementation ("io.confluent:kafka-avro-serializer:4.1.1")
    implementation ("org.apache.avro:avro:1.8.2")
    implementation ("io.confluent:common:4.1.1")
}

    buildscript {
        repositories {
            maven {
                url = uri("https://plugins.gradle.org/m2/")
            }
        }
        dependencies {
            classpath("com.commercehub.gradle.plugin:gradle-avro-plugin:0.16.0")
        }
    }
    
    apply(plugin = "com.commercehub.gradle.plugin.avro")
    apply(plugin = "com.commercehub.gradle.plugin.avro-base")
    
    tasks.withType<com.commercehub.gradle.plugin.avro.GenerateAvroJavaTask> {
        source("${rootDir}/src/main/resources/avro")
        setOutputDir(file("${rootDir}/src/main/java"))
    }

此步骤将使我们能够生成avro类。

第二,配置生产者/消费者(如果我们不通过yaml文件使用配置):

private fun createProducer(): Producer<String,TrxnAvroSrc> {
        val props = Properties()
        props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = ""//servers
        props[ProducerConfig.CLIENT_ID_CONFIG] = "client-id-test"
        props["schema.registry.url"] = ""//urls for schema registry
        props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.canonicalName
        props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java.canonicalName
        props[SaslConfigs.SASL_MECHANISM] = "PLAIN"
        props["security.protocol"] = "SASL_PLAINTEXT"
        props[SaslConfigs.SASL_JAAS_CONFIG] = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";"
        return KafkaProducer<String,TrxnAvroSrc>(props)
    }

错误的主要原因是依赖关系

implementation ("org.apache.avro:avro-compiler:1.8.2")

由于我未知的原因,这种依赖性不允许您创建带有空构造函数和错误的KafkaProducer

java.lang.NoClassDefFoundError: io / confluent / common / Configurable

如果有人遇到过这种情况,或者知道这种行为的原因是什么,请赐教。