问题描述
我需要从 kafka 主题中读取 json 数据。我无法从 kafka 主题使用 json 数据。它会无限循环。
到目前为止,我想我需要:
实现自定义序列化器将 JSON 转换为字节数组 实现自定义反序列化器将字节数组转换为 JSON 对象 产生消息 读取Consumer类中的消息
package services
import java.util
import java.util.Properties
import constants.ConfigValues
import models.Device
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._
class ConsumerService {
def consumeDeviceData(topic: String) = {
val props = new Properties()
props.put("bootstrap.servers",ConfigValues.BOOTSTARP_SERVER)
props.put("key.deserializer",ConfigValues.KEY_DESERIALIZER)
//props.put("value.deserializer",ConfigValues.VALUE_DESERIALIZER)
props.put("value.deserializer",classOf[CustomDeserializer] )
props.put("auto.offset.reset","latest")
props.put("group.id","device-consumer")
val consumer = new KafkaConsumer[String,Device](props)
println(consumer)
consumer.subscribe(util.Arrays.asList(topic))
while (true) {
val record = consumer.poll(1000).asScala
println("inside")
println(record)
for (device <- record.iterator)
println(device.value())
// GetDeviceResponse.fromDeviceObject(value.value())
}
}
}
CustomDeserializer 类:
package services
import java.io.{ByteArrayInputStream,ObjectInputStream}
import java.util
import models.Device
import org.apache.kafka.common.serialization.Deserializer
class CustomDeserializer extends Deserializer[Device] {
override def configure(configs: util.Map[String,_],isKey: Boolean): Unit = {
}
override def deserialize(topic: String,bytes: Array[Byte]): Device = {
val byteIn = new ByteArrayInputStream(bytes)
val objIn = new ObjectInputStream(byteIn)
val obj = objIn.readobject().asInstanceOf[Device]
println(obj)
byteIn.close()
objIn.close()
obj
}
override def close(): Unit = {
}
}
设备类:
package models
class Device() {
var DATE: Int=0
var SOURCETYPE: String=""
var deviceid: String=""
var EVENTTIME: String=""
var TIME: Int=0
var COUNTRYCODE: String=""
var COUNTRYNAME: String=""
var COUNTY: String=""
var DEVICELAT: String=""
var DEVICELOGCREATEDAT: Double=0
var DEVICELOGSTATUS: String=""
var DEVICELONG: String=""
var DEVICEHPE: String=""
var DEVICERAWLociD: String=""
var DEVICERAWSTATE: String=""
def this(DATE: Int,SOURCETYPE: String,deviceid: String,EVENTTIME: String,TIME: Int,COUNTRYCODE: String,COUNTRYNAME: String,COUNTY: String,DEVICELAT: String,DEVICELOGCREATEDAT: Double,DEVICELOGSTATUS: String,DEVICELONG: String,DEVICEHPE: String,DEVICERAWLociD: String,DEVICERAWSTATE: String)
{
this()
this.DATE=DATE
this.COUNTRYCODE = COUNTRYCODE
this.COUNTRYNAME =COUNTRYNAME
this.DATE=DATE
this.COUNTY=COUNTY
this.DEVICEHPE=DEVICEHPE
this.deviceid=deviceid
this.DEVICELAT=DEVICELAT
this.DEVICELOGCREATEDAT=DEVICELOGCREATEDAT
this.DEVICELOGSTATUS=DEVICELOGSTATUS
this.DEVICELONG=DEVICELONG
this.DEVICERAWLociD=DEVICERAWLociD
this.DEVICERAWSTATE=DEVICERAWSTATE
this.EVENTTIME=EVENTTIME
this.soURCETYPE=SOURCETYPE
this.TIME=TIME
}
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)