问题描述
我正在尝试从 Kafkaspout 读取 kafka 消息,并从从该消息解析的 json 中设置元组值。实际上,我正在创建一个额外的 Bolt,它使用来自 Kafkaspout 的 json 字符串解析一个名为“value”的元组字段。是否可以在 spout 中设置这些值?
class ScanConfigKafkaspout(kafkaUrl: String,kafkaGroup: String,kafkaTopic: String) : Kafkaspout<String,String>(
KafkaspoutConfig
.builder(kafkaUrl,kafkaTopic)
.setProp(KEY_KAFKA_GROUP,"grp1")
.setProcessingGuarantee(KafkaspoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
.build()
),ComponentId {
override fun open(conf: MutableMap<String,Any>?,context: TopologyContext?,collector: spoutOutputCollector?) {
try {
logger.debug("<${id()}> opening ScanConfigKafkaspout with ${conf.toString()}")
super.open(conf,context,collector)
logger.debug("<${id()}> ScanConfigKafkaspout opened")
} catch (t: Throwable) {
logger.error("<${id()}> Error during opening CrawlScanConfigKafkaspout",t)
}
}
override fun id(): String = SCAN_CONfig_KAFKA_spout
companion object {
private val logger = LoggerFactory.getLogger(ScanConfigKafkaspout::class.java)
}
}
解决方法
您可能需要实现 declareOutputFields(OutputFieldsDeclarer declarer
中的方法 IComponent
。
Storm 使用它来序列化您的属性值和元组配置。
如数据模型部分所述的here,它说:
拓扑中的每个节点都必须为它发出的元组声明输出字段。
还为该方法提供了一个 java 示例。
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("double","triple"));
}