如何对protbuf使用自我描述消息

问题描述

我在使用协议缓冲区时正在使用的一种用例是反序列化在用户端(使用sarama库和Go)接收到的协议缓冲区Kafka消息。

我目前的工作方式是定义如下所示的示例pixel.proto文件。

syntax = "proto3";

package saramaprotobuf;

message Pixel {
  // Session identifier stuff
  string session_id = 2;
}

我正在通过sarama.Producer发送消息(通过封送处理),接收到sarama.Consumer(通过对已编译的pixel.proto.pb进行引用来对消息进行封送处理)。代码如下。

import (
    "github.com/Shopify/sarama"
    "github.com/golang/protobuf/proto"
    "log"
    "os"
    "os/signal"
    "protobuftest/example"
    "syscall"
    "time"
)

func main() {
    topic := "test_topic"
    brokerList := []string{"localhost:9092"}

    producer,err := newSyncProducer(brokerList)
    if err != nil {
        log.Fatalln("Failed to start Sarama producer:",err)
    }

    go func() {
        ticker := time.NewTicker(time.Second)
        for {
            select {
            case t := <-ticker.C:
                elliot := &example.Pixel{
                    SessionId: t.String(),}
                pixelToSend :=  elliot
                pixelToSendBytes,err := proto.Marshal(pixelToSend)
                if err != nil {
                    log.Fatalln("Failed to marshal example:",err)
                }

                msg := &sarama.ProducerMessage{
                    Topic: topic,Value: sarama.ByteEncoder(pixelToSendBytes),}

                producer.SendMessage(msg)
                log.Printf("Pixel sent: %s",pixelToSend)
            }
        }

    }()

    signals := make(chan os.Signal,1)
    signal.Notify(signals,syscall.SIGHUP,syscall.SIGINT,syscall.SIGTERM)

    partitionConsumer,err := newPartitionConsumer(brokerList,topic)
    if err != nil {
        log.Fatalln("Failed to create Sarama partition consumer:",err)
    }

    log.Println("Waiting for messages...")

    for {
        select {
        case msg := <-partitionConsumer.Messages():
            receivedPixel := &example.Pixel{}
            err := proto.Unmarshal(msg.Value,receivedPixel)
            if err != nil {
                log.Fatalln("Failed to unmarshal example:",err)
            }

            log.Printf("Pixel received: %s",receivedPixel)
        case <-signals:
            log.Print("Received termination signal. Exiting.")
            return
        }
    }
}

func newSyncProducer(brokerList []string) (sarama.SyncProducer,error) {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Retry.Max = 5
    config.Producer.Return.Successes = true
    // TODO configure producer

    producer,err := sarama.NewSyncProducer(brokerList,config)
    if err != nil {
        return nil,err
    }

    return producer,nil
}

func newPartitionConsumer(brokerList []string,topic string) (sarama.PartitionConsumer,error) {
    conf := sarama.NewConfig()
    // TODO configure consumer
    consumer,err := sarama.NewConsumer(brokerList,conf)
    if err != nil {
        return nil,err
    }

    partitionConsumer,err := consumer.ConsumePartition(topic,sarama.OffsetOldest)
    if err != nil {
        return nil,err
    }

    return partitionConsumer,err
}

在您看到的代码中,我已经导入了.proto文件并在主函数中对其进行了引用,以便发送和接收消息。这里的问题是,解决方案不是通用的。我将在用户端收到其他.proto类型的消息。

如何使其通用?我知道protobuf的一部分称为自描述消息(动态消息)。我引用了此链接https://developers.google.com/protocol-buffers/docs/techniques?csw=1#self-description。但这并没有任何关于如何将其作为pixel.proto的一部分进行嵌入的解释(我已使用的示例),因此在消费者端我直接将其反序列化为所需的类型。

解决方法

您将定义一个通用的容器消息类型,其中将包括DescriptorSet和Any字段。

在发送时,您将构建该通用消息类型的实例,使用Pixel消息的实例设置类型Any的字段,并使用Pixel类型的DescriptorSet设置DescriptorSet字段。

这将使此类消息的接收者可以使用要附加的DescriptorSet解析Any内容。实际上,这是在发送一条原始定义和消息。因此,接收者不需要预先共享的原型定义或生成的代码。

话虽如此,但我不确定这不是您真正想要的,因为如果您打算与客户共享原型定义或生成的代码,那么我建议仅在容器类型中使用oneof字段会更简单使用。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...