Kafka 生产者:将 avro 作为数组 [byte] 发送而没有模式

问题描述

我正在尝试在本地设置一个简单的 kafka 堆栈,现在我需要创建一个玩具生产商。这:https://lombardo-chcg.github.io/tools/2017/09/29/kafka-avro-producer-in-scala.html(请参阅下面我感兴趣的一段代码)几乎正是我想要的,除了:

这里生产者发送一个 GenericData.Record 对象,所以整个模式被发送,它不利用模式注册表。我想发送一个 Array[Byte] ,前几个字节是架构的 id,后面的字节是数据,没有架构(或者我认为这是最好的方法

我说的那段代码

import java.util.Properties

import org.apache.avro.Schema.Parser
import org.apache.avro.generic.GenericData
import org.apache.kafka.clients.producer.{KafkaProducer,ProducerRecord}
import org.slf4j.LoggerFactory

case class User(name: String,favoriteNumber: Int,favoriteColor: String)

class AvroProducer {
  val logger = LoggerFactory.getLogger(getClass)

  val kafkaBootstrapServer = sys.env("KAFKA_BOOTSTRAP_SERVER")
  val schemaRegistryUrl = sys.env("SCHEMA_REGISTRY_URL")

  val props = new Properties()
  props.put("bootstrap.servers",kafkaBootstrapServer)
  props.put("schema.registry.url",schemaRegistryUrl)
  props.put("key.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer")
  props.put("value.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer")
  props.put("acks","1")

  val producer = new KafkaProducer[String,GenericData.Record](props)
  val schemaParser = new Parser

  val key = "key1"
  val valueSchemaJson =
  s"""
    {
      "namespace": "com.avro.junkie","type": "record","name": "User2","fields": [
        {"name": "name","type": "string"},{"name": "favoriteNumber","type": "int"},{"name": "favoriteColor","type": "string"}
      ]
    }
  """
  val valueSchemaAvro = schemaParser.parse(valueSchemaJson)
  val avroRecord = new GenericData.Record(valueSchemaAvro)

  val mary = new User("Mary",840,"Green")
  avroRecord.put("name",mary.name)
  avroRecord.put("favoriteNumber",mary.favoriteNumber)
  avroRecord.put("favoriteColor",mary.favoriteColor)

  def start = {
    try {
      val record = new ProducerRecord("users",key,avroRecord)
      val ack = producer.send(record).get()
      // grabbing the ack and logging for visibility
      logger.info(s"${ack.toString} written to partition ${ack.partition.toString}")
    }
    catch {
      case e: Throwable => logger.error(e.getMessage,e)
    }
  }
}

问题:

  • 我不知道如何从 schema-registry 中检索 schema 的 id
  • 我不知道如何只发送没有架构的数据 + id 作为 Array[Byte]

我知道如何将整个 avro 写入 Array[Byte]:

    val writer = new SpecificDatumWriter[GenericData.Record](valueSchemaAvro)
    val out = new ByteArrayOutputStream
    val encoder = EncoderFactory.get.binaryEncoder(out,null)
    writer.write(avroRecord,encoder) // but here I am also writing the schema,right?
    encoder.flush
    out.close
    out.toByteArray

非常感谢

解决方法

第一个代码确实使用架构注册表,并在 KafkaAvroSerializer

内为您计算 ID + 替换字节数组中的架构

如果您想绕过架构注册表,请使用 ByteArraySerializer 并将第二个代码块中 out.toByteArray 的结果发送给生产者。