如何在 Kafka 生产者中从外部 API 读取数据并将其发送给 Scala 中的 Kafka 消费者

问题描述

我是 Apache Kafka 的新手,我想从 https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol=MSFT&interval=5min&outputsize=full&apikey=demo API 读取生产者内部的数据,然后将其发送到主题并从我的消费者内部的主题读取此数据以将其保存到数据库。

我不知道如何发送 JSON 格式的数据。

我尝试了一个带有字符串值的 Kafka 消费者-生产者示例:

在我的例子中,我的 Producer.scala 是:

import java.util.Properties

import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.http.util.EntityUtils
import org.apache.kafka.clients.producer.{KafkaProducer,ProducerRecord}
import play.api.libs.json.{JsObject,JsValue,Json}
object Producer extends App {

  val url = "https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol=MSFT&interval=5min&outputsize=full&apikey=demo"
  val httpClient = HttpClientBuilder.create().build()
  val httpResponse = httpClient.execute(new HttpGet(url))
  val entity = httpResponse.getEntity
    val str = EntityUtils.toString(entity,"UTF-8")
    val content = Json.parse(str)
  val props:Properties = new Properties()
  props.put("bootstrap.servers","localhost:9092")
  props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
  props.put("acks","all")
  val producer = new KafkaProducer[Nothing,(String,JsValue)](props)
  val topic = "quick-start"
  try {
      val record = new ProducerRecord(topic,content.as[JsObject].fields(1))
      producer.send(record)
  }catch{
    case e:Exception => e.printStackTrace()
  }finally {
    producer.close()
  }
}

而我的 Consumer.scala 是:

import java.util.{Collections,Properties}
import java.util.regex.Pattern
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._
object Consumer extends App {

  val props:Properties = new Properties()
  props.put("group.id","test")
  props.put("bootstrap.servers","localhost:9092")
  props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
  props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
  props.put("enable.auto.commit","true")
  props.put("auto.commit.interval.ms","1000")
  val consumer = new KafkaConsumer(props)
  val topics = List("quick-start")
  try {
    consumer.subscribe(topics.asJava)
    while (true) {
      val records = consumer.poll(10)
      for (record <- records.asScala) {
        println("Topic: " + record.topic() +
          ",Key: " + record.key() +
          ",Value: " + record.value() +
          ",Offset: " + record.offset() +
          ",Partition: " + record.partition())
      }
    }
  }catch{
    case e:Exception => e.printStackTrace()
  }finally {
    consumer.close()
  }
}

我的built.sbt是:

name := "Kafka-AkkaPractice"

version := "0.1"

scalaVersion := "2.12.2"

libraryDependencies ++= Seq(
  "org.apache.kafka" %% "kafka" % "2.1.0","ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime,"org.apache.httpcomponents" % "httpclient" % "4.5.2","com.typesafe.play" %% "play-json" % "2.8.0"
)

我的理解是

props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

用于 Key 和 Value 的 String 类型。

谁能建议如何将这种类型的数据发送到我的 Kafka 主题,以便我可以从消费者读取它并将其保存到数据库中?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)