问题描述
我应该从Kafka使用者那里得到一个 Map [String,String] ,但我真的不知道如何。我设法配置了使用者,它工作正常,但我不知道如何获取地图。
implicit val system: ActorSystem = ActorSystem()
val consumerConfig = system.settings.config.getConfig("akka.kafka.consumer")
val = kafkaConsumerSettings =
ConsumerSettings(consumerConfig,new StringDeserializer,new StringDeserializer)
.withBootstrapServers(localhost:9094)
.withGroupId(group1)
Consumer
.plainSource(kafkaConsumerSettings,Subscriptions.topics(entity.entity_name))
.toMat(Sink.foreach(println))(DrainingControl.apply)
.run()
解决方法
Lightbend's recommendation用于处理字节数组,同时反序列化来自Kafka的传入数据
消息反序列化的一般建议是使用字节数组(或字符串)作为值,并在Akka Stream中的映射操作中进行反序列化,而不是直接在Kafka反序列化器中实现。在Akka Stream中明确处理反序列化后,如下面的示例所示,更容易实现所需的错误处理策略。
为此,您可以使用以下设置来设置使用者:
val consumerSettings = ConsumerSettings(consumerConfig,new StringDeserializer,new ByteArrayDeserializer)
然后通过从Record类调用.value()
方法获得结果。为了反序列化,我建议使用circe +颚。这段代码可以解决问题。
import io.circe.jawn
import io.circe.generic.auto._
val bytes = record.value()
val data = jawn.parseByteBuffer(ByteBuffer.wrap(bytes)).flatMap(_.as[Map[String,String]])