问题描述
我正在尝试在本地设置一个简单的 kafka 堆栈,现在我需要创建一个玩具生产商。这:https://lombardo-chcg.github.io/tools/2017/09/29/kafka-avro-producer-in-scala.html(请参阅下面我感兴趣的一段代码)几乎正是我想要的,除了:
这里生产者发送一个 GenericData.Record 对象,所以整个模式被发送,它不利用模式注册表。我想发送一个 Array[Byte] ,前几个字节是架构的 id,后面的字节是数据,没有架构(或者我认为这是最好的方法)
我说的那段代码:
import java.util.Properties
import org.apache.avro.Schema.Parser
import org.apache.avro.generic.GenericData
import org.apache.kafka.clients.producer.{KafkaProducer,ProducerRecord}
import org.slf4j.LoggerFactory
case class User(name: String,favoriteNumber: Int,favoriteColor: String)
class AvroProducer {
val logger = LoggerFactory.getLogger(getClass)
val kafkaBootstrapServer = sys.env("KAFKA_BOOTSTRAP_SERVER")
val schemaRegistryUrl = sys.env("SCHEMA_REGISTRY_URL")
val props = new Properties()
props.put("bootstrap.servers",kafkaBootstrapServer)
props.put("schema.registry.url",schemaRegistryUrl)
props.put("key.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer")
props.put("value.serializer","io.confluent.kafka.serializers.KafkaAvroSerializer")
props.put("acks","1")
val producer = new KafkaProducer[String,GenericData.Record](props)
val schemaParser = new Parser
val key = "key1"
val valueSchemaJson =
s"""
{
"namespace": "com.avro.junkie","type": "record","name": "User2","fields": [
{"name": "name","type": "string"},{"name": "favoriteNumber","type": "int"},{"name": "favoriteColor","type": "string"}
]
}
"""
val valueSchemaAvro = schemaParser.parse(valueSchemaJson)
val avroRecord = new GenericData.Record(valueSchemaAvro)
val mary = new User("Mary",840,"Green")
avroRecord.put("name",mary.name)
avroRecord.put("favoriteNumber",mary.favoriteNumber)
avroRecord.put("favoriteColor",mary.favoriteColor)
def start = {
try {
val record = new ProducerRecord("users",key,avroRecord)
val ack = producer.send(record).get()
// grabbing the ack and logging for visibility
logger.info(s"${ack.toString} written to partition ${ack.partition.toString}")
}
catch {
case e: Throwable => logger.error(e.getMessage,e)
}
}
}
问题:
- 我不知道如何从 schema-registry 中检索 schema 的 id
- 我不知道如何只发送没有架构的数据 + id 作为 Array[Byte]
我知道如何将整个 avro 写入 Array[Byte]:
val writer = new SpecificDatumWriter[GenericData.Record](valueSchemaAvro)
val out = new ByteArrayOutputStream
val encoder = EncoderFactory.get.binaryEncoder(out,null)
writer.write(avroRecord,encoder) // but here I am also writing the schema,right?
encoder.flush
out.close
out.toByteArray
非常感谢
解决方法
第一个代码确实使用架构注册表,并在 KafkaAvroSerializer
如果您想绕过架构注册表,请使用 ByteArraySerializer
并将第二个代码块中 out.toByteArray
的结果发送给生产者。