问题描述
我正在将protobuf消息发送到Kafka MQTT代理,但是我无法获得Kafka解析的消息值。我已经将原型消息注册到Schema注册表中,可以看到传入的消息,但是值显示为乱码。
以下行描述了我当前的架构
移动-> Kafka MQTT代理-> Kafka broker->主题
还有this is the output I see。我不知道是否缺少配置,但是找不到有关它的任何信息。
原始消息
Syntax = "proto2";
message Location {
optional double latitude = 1;
optional double longitude = 2;
}
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.confluent</groupId>
<artifactId>rest-utils-parent</artifactId>
<version>5.5.1</version>
</parent>
<artifactId>register-schema</artifactId>
<properties>
<confluent.version>5.5.0</confluent.version>
</properties>
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-provider</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>${confluent.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-maven-plugin</artifactId>
<version>5.5.0</version>
<configuration>
<schemaRegistryUrls>
<param>http://localhost:8081</param>
</schemaRegistryUrls>
<subjects>
<location-value>location.proto</location-value>
</subjects>
<schemaTypes>
<location-value>PROTOBUF</location-value>
</schemaTypes>
</configuration>
<goals>
<goal>register</goal>
</goals>
</plugin>
</plugins>
</build>
</project>
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
final builder = MqttClientPayloadBuilder();
final loc = locationPb.Location();
loc.latitude = location.latitude;
loc.longitude = location.longitude;
final buf = new Uint8Buffer();
buf.addAll(loc.writetoBuffer());
builder.addBuffer(buf);
this.client.publishMessage(
"android/location",MqttQos.exactlyOnce,builder.payload);
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)