使用python编译的protobuf pb2作为键和值序列化器

问题描述

我正在尝试从已经使用Google的protobuf序列化的kafka topiv中读取数据。

我使用protoc编译了原始文件生成pb2文件

现在,我正在尝试使用“浮士德”和创建流处理器,但是我找不到正确的方法将pb2文件用作key_serializervalue_serializer

这是我尝试过的:

import faust
from proto.topic_pb2 import topic


app = faust.App(
    'faust-consumer',broker='kafka://',store="memory://",cache="memory://",)

schema = faust.Schema(
    ## key_type=topic.PK,## value_type=topic,key_serializer=topic.PK,value_serializer=topic,)

topic = app.topic(
    'topic',schema=schema
)


@app.agent(topic)
async def consume(topic):
    async for event in topic:
        print(event)


if __name__ == "__main__":
    app.main()

有人知道如何在串行器中使用pb2吗?

解决方法

伙计,过去一周我一直在尝试这样做。经过努力,我终于找到了可行的方法-不是最好的方法-但效果很好。

因此,最初我使用以下python编译器:https://github.com/danielgtaylor/python-betterproto来生成具有数据类/类型提示的*.py文件。

然后,我能够使用助手来动态创建Faust.Record类:

import abc
import inspect
from typing import Type

import betterproto
import faust

GENERATED_SUFFIX = "__FaustRecord_Auto"


def _import_relative_class(module: str,klass_name: str):
    resolved_import = __import__(module,fromlist=[klass_name])
    klass = getattr(resolved_import,klass_name)
    return klass


def _is_record(attype: Type):
    return (
        inspect.isclass(attype)
        and isinstance(attype,betterproto.Message)
        or isinstance(attype,abc.ABCMeta)
    )


def _build_record_annotations(klass: Type):
    annotations = {}
    for atname,attype in klass.__annotations__.items():
        if _is_record(attype):
            annotations[atname] = make_faust_record(attype)
        elif isinstance(attype,str):
            subklass = _import_relative_class(klass.__module__,attype)
            annotations[atname] = make_faust_record(subklass)
        else:
            annotations[atname] = attype

    return annotations


def make_faust_record(klass: Type):
    type_name = f"{klass.__name__}{GENERATED_SUFFIX}"
    record_type = type(type_name,(faust.Record,klass),{})
    record_type.__annotations__ = _build_record_annotations(klass)
    record_type._init_subclass()

    return record_type

现在您可以像这样使用它:

import faust
from proto.your_models import YourModel # Import your generated proto here
from faust_converter import make_faust_record


app = faust.App(
    'faust-consumer',broker='kafka://',store="memory://",cache="memory://",)

model_record = make_faust_record(YourModel)

topic = app.topic(
    'topic',value_type=model_record
)


@app.agent(topic)
async def consume(topic):
    async for event in topic:
        print(event)


if __name__ == "__main__":
    app.main()
,

我能够通过这样创建一个Serializer类来做到这一点:

import faust
from abc import ABCMeta,abstractmethod
from google.protobuf.json_format import MessageToDict
from faust.serializers.codecs import Codec
from importlib import import_module


def get_proto(topic_name,only_pk=False):
    if not hasattr(get_proto,"topics"):
        setattr(get_proto,"topics",dict())

    get_proto.topics[topic_name] = import_module(
        "protodef.{}_pb2".format(topic_name)
    ).__getattribute__(topic_name.split(".")[-1])

    if only_pk:
        return getattr(get_proto,"topics").get(topic_name).PK
    else:
        return getattr(get_proto,"topics").get(topic_name)


class ProtoSerializer(Codec,metaclass=ABCMeta):
    @abstractmethod
    def only_key(self):
        ...

    def as_proto(self,topic_name):
        self._proto = get_proto(topic_name,self.only_key())
        return self

    def _loads(self,b):
        data = MessageToDict(
            self._proto.FromString(b),preserving_proto_field_name=True,including_default_value_fields=True,)

        # remove the key object from the unserialized message
        data.pop("key",None)
        return data

    def _dumps(self,o):
        # for deletes
        if not o:
            return None

        obj = self._proto()

        # add the key object to them message before serializing
        if hasattr(obj,"PK"):
            for k in obj.PK.DESCRIPTOR.fields_by_name.keys():
                if k not in o:
                    raise Exception(
                        "Invalid object `{}` for proto `{}`".format(o,self._proto)
                    )
                setattr(obj.key,k,o[k])

        for k,v in o.items():
            if hasattr(obj,k):
                setattr(obj,v)
            else:
                ghost.debug(
                    "Invalid value-attribute `%s` for proto `%s`",self._proto
                )

        return obj.SerializeToString()


class ProtoValue(ProtoSerializer):
    def only_key(self):
        return False


class ProtoKey(ProtoSerializer):
    def only_key(self):
        return True

,然后按如下所示使用它:

import faust
from utils.serializer import ProtoKey,ProtoValue


app = faust.App(
    'faust-consumer',)


topic = app.topic(
    'topic',key_serializer=ProtoKey().as_proto('topic'),value_serializer=ProtoValue().as_proto('topic')
)


@app.agent(topic)
async def consume(topic):
    async for event in topic:
        print(event)


if __name__ == "__main__":
    app.main()
,

我也在尝试将Protobuf与Faust一起使用。

下面提到的是使用Faust Serialiser编解码器的解决方案。 faust-protobuf https://github.com/hemantkashniyal/faust-protobuf

proto_serializer.py

from faust.serializers import codecs
from typing import Any

from google.protobuf import json_format
from google.protobuf.json_format import MessageToJson
from google.protobuf.json_format import MessageToDict
from google.protobuf import text_format
from google.protobuf.text_format import MessageToString
from google.protobuf.text_format import MessageToBytes

class ProtobufSerializer(codecs.Codec):
    def __init__(self,pb_type: Any):
        self.pb_type = pb_type
        super(self.__class__,self).__init__()

    def _dumps(self,pb: Any) -> bytes:
        return pb.SerializeToString()

    def _loads(self,s: bytes) -> Any:
        pb = self.pb_type()
        pb.ParseFromString(s)
        return pb

app.py

import faust
from google.protobuf.json_format import MessageToJson

from .proto.greetings_pb2 import Greeting

from .proto_serializer import ProtobufSerializer

app = faust.App(
    'faust-consumer',# TODO: update kafka endpoint
    store="memory://",)

greetings_schema = faust.Schema(
    key_serializer=ProtobufSerializer(pb_type=Greeting),value_serializer=ProtobufSerializer(pb_type=Greeting),)

topic = app.topic(
    'greetings',schema=greetings_schema
)

@app.agent(topic)
async def consume(topic):
    async for event in topic:
        print(MessageToJson(event))

@app.timer(5)
async def produce():
    for i in range(10):
        data = Greeting(hello="world",message=i)
        await consume.send(value=data)

if __name__ == "__main__":
    app.main()

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...