通过kafka mqtt-proxy的Protobuf消息

问题描述

我正在将protobuf消息发送到Kafka MQTT代理,但是我无法获得Kafka解析的消息值。我已经将原型消息注册到Schema注册表中,可以看到传入的消息,但是值显示为乱码。


以下行描述了我当前的架构

移动-> Kafka MQTT代理-> Kafka broker->主题


还有this is the output I see。我不知道是否缺少配置,但是找不到有关它的任何信息。


原始消息

Syntax = "proto2";

message Location {
  optional double latitude = 1;
  optional double longitude = 2;
}

用于将原型消息注册到架构注册表的Maven pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
    http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
    <groupId>io.confluent</groupId>
    <artifactId>rest-utils-parent</artifactId>
    <version>5.5.1</version>
</parent>
<artifactId>register-schema</artifactId>
<properties>
        <confluent.version>5.5.0</confluent.version>
</properties>
<repositories>
    <repository>
        <id>confluent</id>
        <url>http://packages.confluent.io/maven/</url>
    </repository>
</repositories>

<pluginRepositories>
    <pluginRepository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url>
    </pluginRepository>
</pluginRepositories>
<dependencies>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-protobuf-serializer</artifactId>
        <version>${confluent.version}</version>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-protobuf-provider</artifactId>
        <version>${confluent.version}</version>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-schema-registry-client</artifactId>
        <version>${confluent.version}</version>
    </dependency>
</dependencies>
<build>
    <plugins>

        <plugin>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-schema-registry-maven-plugin</artifactId>
            <version>5.5.0</version>
            <configuration>
                <schemaRegistryUrls>
                    <param>http://localhost:8081</param>
                </schemaRegistryUrls>
                <subjects>
                    <location-value>location.proto</location-value>
                </subjects>
                <schemaTypes>
                    <location-value>PROTOBUF</location-value>
                </schemaTypes>
            </configuration>
            <goals>
                <goal>register</goal>
            </goals>
        </plugin>
    </plugins>
</build>
</project>

用于发布消息的Flutter代码

import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';

final builder = MqttClientPayloadBuilder();
        final loc = locationPb.Location();
        loc.latitude = location.latitude;
        loc.longitude = location.longitude;
        final buf = new Uint8Buffer();
        buf.addAll(loc.writetoBuffer());
        builder.addBuffer(buf);
        this.client.publishMessage(
            "android/location",MqttQos.exactlyOnce,builder.payload);

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)