问题描述
我正在使用 SCDF 开发一个流,允许将所有 MQTT 消息持久化到 sql 数据库。
这是用于创建流的代码
stream create --name mqtt-to-jdbc --deFinition "mqtt --qos=2 --topics='#' --username=admin --password=******** --url='tcp://192.168.1.153:60065' | jdbc --username=sa --password=******** --driver-class-name=com.microsoft.sqlserver.jdbc.sqlServerDriver --url='jdbc:sqlserver://192.168.1.18;databaseName=test_db;schema=dbo' --table-name=mqtt_message --columns=\"headers:headers.toString(),payload:payload.toString(),created_at:new java.sql.Timestamp(T(System).currentTimeMillis()).toString()\"" --deploy
mqtt_message 表包含几列,其中包括 headers、payload、receive_topic。 流已成功部署并且数据被持久化,但是: headers 列是使用 SpEL headers.toString() 提取的:
{b3=d4840635cb8c968c-381e88a613735a05-1,nativeHeaders={},errorChannel=,id=e34b08d5-eafe-decd-4aa1-634cb187889a,timestamp=1622532820049}
payload 列使用 SpEL payload.toString() 很好地提取:
Test payload
标题中的值不包括假定的标题,包括消息的主题 (mqtt_receivedTopic)。
如果我为生产者和接收器提供实现,我可以访问以下消息头:
Headers:{
mqtt_id=0,deliveryAttempt=1,kafka_timestampType=CREATE_TIME,kafka_receivedTopic=reaper.reaper-source,mqtt_receivedRetained=false,kafka_offset=31,mqtt_duplicate=false,scst_nativeHeadersPresent=true,kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@19da7b97,id=cc423e5f-af1e-173a-a9ac-c229ab544738,kafka_receivedPartitionId=0,mqtt_receivedTopic=test/topic,contentType=application/json,kafka_receivedTimestamp=1622453609998,mqtt_receivedQos=0,kafka_groupId=reaper,timestamp=1622453610004
}
我还测试了以下属性,但它们都没有改变结果:
制作人
- spring.cloud.stream.bindings.output.producer.headerMode=embeddedHeaders
- spring.cloud.stream.bindings.output.producer.useNativeEncoding=true
消费者
- spring.cloud.stream.default.consumer.headerMode=embeddedHeaders
有没有办法在生产者和接收器之间传递本机标头并将它们写入目的地列(从接收到的主题中提取值)。
谢谢。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)