无法在 Scala 中使用来自 kafka 主题的 json 数据

问题描述

我需要从 kafka 主题中读取 json 数据。我无法从 kafka 主题使用 json 数据。它会无限循环。

到目前为止,我想我需要:

实现自定义序列化器将 JSON 转换为字节数组 实现自定义反序列化器将字节数组转换为 JSON 对象 产生消息 读取Consumer类中的消息

package services

import java.util
import java.util.Properties

import constants.ConfigValues
import models.Device
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._

class ConsumerService {


  def consumeDeviceData(topic: String) = {
    val props = new Properties()
    props.put("bootstrap.servers",ConfigValues.BOOTSTARP_SERVER)
    props.put("key.deserializer",ConfigValues.KEY_DESERIALIZER)
    //props.put("value.deserializer",ConfigValues.VALUE_DESERIALIZER)
    props.put("value.deserializer",classOf[CustomDeserializer] )
    props.put("auto.offset.reset","latest")
    props.put("group.id","device-consumer")
    val consumer = new KafkaConsumer[String,Device](props)
    println(consumer)
    consumer.subscribe(util.Arrays.asList(topic))
    while (true) {
      val record = consumer.poll(1000).asScala
      println("inside")
      println(record)
      for (device <- record.iterator)
        println(device.value())
       // GetDeviceResponse.fromDeviceObject(value.value())
    }
  }
}

CustomDeserializer 类:

package services

import java.io.{ByteArrayInputStream,ObjectInputStream}
import java.util

import models.Device
import org.apache.kafka.common.serialization.Deserializer

class CustomDeserializer extends Deserializer[Device] {

  override def configure(configs: util.Map[String,_],isKey: Boolean): Unit = {
  }

  override def deserialize(topic: String,bytes: Array[Byte]): Device = {

    val byteIn = new ByteArrayInputStream(bytes)
    val objIn = new ObjectInputStream(byteIn)
    val obj = objIn.readobject().asInstanceOf[Device]
    println(obj)
    byteIn.close()
    objIn.close()
    obj
  }

  override def close(): Unit = {

  }
}

设备类:

package models


class Device() {
  var DATE: Int=0
  var SOURCETYPE: String=""
  var deviceid: String=""
  var EVENTTIME: String=""
  var TIME: Int=0
  var COUNTRYCODE: String=""
  var COUNTRYNAME: String=""
  var COUNTY: String=""
  var DEVICELAT: String=""

  var DEVICELOGCREATEDAT: Double=0

  var DEVICELOGSTATUS: String=""

  var DEVICELONG: String=""

  var DEVICEHPE: String=""

  var DEVICERAWLociD: String=""

  var DEVICERAWSTATE: String=""

  def this(DATE: Int,SOURCETYPE: String,deviceid: String,EVENTTIME: String,TIME: Int,COUNTRYCODE: String,COUNTRYNAME: String,COUNTY: String,DEVICELAT: String,DEVICELOGCREATEDAT: Double,DEVICELOGSTATUS: String,DEVICELONG: String,DEVICEHPE: String,DEVICERAWLociD: String,DEVICERAWSTATE: String)
{
  this()
  this.DATE=DATE
  this.COUNTRYCODE = COUNTRYCODE
  this.COUNTRYNAME =COUNTRYNAME
  this.DATE=DATE
  this.COUNTY=COUNTY
  this.DEVICEHPE=DEVICEHPE
  this.deviceid=deviceid
  this.DEVICELAT=DEVICELAT
  this.DEVICELOGCREATEDAT=DEVICELOGCREATEDAT
  this.DEVICELOGSTATUS=DEVICELOGSTATUS
  this.DEVICELONG=DEVICELONG
  this.DEVICERAWLociD=DEVICERAWLociD
  this.DEVICERAWSTATE=DEVICERAWSTATE
  this.EVENTTIME=EVENTTIME
  this.soURCETYPE=SOURCETYPE
  this.TIME=TIME

}
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)