如何将Protobuf数据从Flink转发到Kafka和stdout?

问题描述

我想在此处添加一些代码,并从Flink中输出protobuf数据。

我正在使用Flink的Apache Kafka连接器,以便将Flink连接到Kafka。

这是我Flink的代码

val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers","localhost:9092").
val producer = FlinkKafkaProducer011(topic,new myProtobufSchema,props)
env.addSink(producer)
env.execute("To Kafka")

这是我的卡夫卡代码

  val props: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONfig,"protobuf-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONfig,"localhost:9092")
    p
  }

  val builder: StreamsBuilder = new StreamsBuilder
  // Todo: implement here to stdout 

  val streams: KafkaStreams = new KafkaStreams(builder.build(),props)
  streams.start()

  sys.ShutdownHookThread {
     streams.close(Duration.ofSeconds(10))
  }

解决方法

您需要设置https://whoer.net才能从某个主题中消费

val builder: StreamsBuilder = new StreamsBuilder()
    .stream(topic)
    .print(Printed.toSysOut());