spring-cloud-stream、kafka 和 avro 问题侦探

问题描述

当我将 Sleuth 依赖项置于函数的 OUTPUT 中时,我使用 kafka 云流和 avro 实现了微服务,但生产者无法序列化 avro。

gradle.build 中的依赖:

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

buildscript {
    dependencies {
        classpath("com.commercehub.gradle.plugin:gradle-avro-plugin:0.16.0")
    }
}

plugins {
    id("org.springframework.boot") version "2.4.2"
    id("io.spring.dependency-management") version "1.0.11.RELEASE"
    kotlin("jvm") version "1.4.21"
    kotlin("plugin.spring") version "1.4.21"
}

apply(plugin = "com.commercehub.gradle.plugin.avro")

group = "com.example"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_15

repositories {
    mavenCentral()
    maven { url = uri("https://packages.confluent.io/maven/") }
}

extra["springCloudVersion"] = "2020.0.1"

dependencies {
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.springframework.cloud:spring-cloud-stream")
    implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
    implementation("org.apache.kafka:kafka-streams")
    //if you take off sleuth dependency of the project the AVRO is converted the json
    implementation("org.springframework.cloud:spring-cloud-starter-sleuth")
    implementation("io.confluent:kafka-avro-serializer:5.5.0")
    implementation("org.apache.avro:avro:1.10.0")
    implementation("com.github.javafaker:javafaker:1.0.2")
    testImplementation("org.springframework.boot:spring-boot-starter-test")
}

dependencyManagement {
    imports {
        mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
    }
}

tasks.withType<KotlinCompile> {
    kotlinoptions {
        freeCompilerArgs = listof("-Xjsr305=strict")
        jvmTarget = "15"
    }
}

tasks.withType<Test> {
    useJUnitPlatform()
}

遵循我使用的小 AVRO 模式:

{
  "namespace": "com.example.demo.events","type": "record","name": "Person","fields": [
    {
      "name": "name","type": "string","avro.java.string": "String"
    }
  ]
}

application.yml 中的配置:

server.port: 9091
spring:
  application.name: demo-processor
  cloud:
    stream:
      function:
        deFinition: transform
      bindings:
        transform-in-0:
          destination: topic-a
          contentType: application/*+avro
          group: 'process-uppercase'
        transform-out-0:
          destination: topic-b
          contentType: application/*+avro
          producer:
            useNativeEncoding: true
      kafka:
        binder:
          brokers: localhost:9092
        bindings:
          transform-in-0:
            consumer:
              configuration:
                schema.registry.url: http://localhost:8081
                value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
                specific.avro.reader: true
          transform-out-0:
            producer:
              configuration:
                schema.registry.url: http://localhost:8081
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer

这是代码

package com.example.demo

import com.example.demo.events.Person
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.messaging.support.MessageBuilder
import java.util.function.Function

@SpringBootApplication
class DemoProcessorApplication {
    //this function work very well without Sleuth dependency
    @Bean
    fun transform() = Function<Message<Person>,Message<Person>> {
        var person = it.payload
        var name = person.getName()
        person.setName(name.toupperCase())
        println("Transform ${name} to ${name.toupperCase()}")
        MessageBuilder.withPayload(person).build()
    }

}

fun main(args: Array<String>) {
    runApplication<DemoProcessorApplication>(*args)
}

感谢您的帮助!

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)