如何在KafkaSpout中解析kafka消息并设置元组值

问题描述

我正在尝试从 Kafkaspout 读取 kafka 消息,并从从该消息解析的 json 中设置元组值。实际上,我正在创建一个额外的 Bolt,它使用来自 Kafkaspout 的 json 字符串解析一个名为“value”的元组字段。是否可以在 spout 中设置这些值?

class ScanConfigKafkaspout(kafkaUrl: String,kafkaGroup: String,kafkaTopic: String) : Kafkaspout<String,String>(
    KafkaspoutConfig
        .builder(kafkaUrl,kafkaTopic)
        .setProp(KEY_KAFKA_GROUP,"grp1")
        .setProcessingGuarantee(KafkaspoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
        .build()
),ComponentId {
    override fun open(conf: MutableMap<String,Any>?,context: TopologyContext?,collector: spoutOutputCollector?) {
        try {
            logger.debug("<${id()}> opening ScanConfigKafkaspout  with ${conf.toString()}")
            super.open(conf,context,collector)
            logger.debug("<${id()}> ScanConfigKafkaspout opened")
        } catch (t: Throwable) {
            logger.error("<${id()}> Error during opening CrawlScanConfigKafkaspout",t)
        }
    }

    override fun id(): String = SCAN_CONfig_KAFKA_spout

    companion object {
        private val logger = LoggerFactory.getLogger(ScanConfigKafkaspout::class.java)
    }

}

解决方法

您可能需要实现 declareOutputFields(OutputFieldsDeclarer declarer 中的方法 IComponent

Storm 使用它来序列化您的属性值和元组配置。

数据模型部分所述的here,它说:

拓扑中的每个节点都必须为它发出的元组声明输出字段。

还为该方法提供了一个 java 示例。

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields("double","triple"));
}