Apache Flink有状态功能-序列化问题?

问题描述

我正在尝试使用Python中的Apache Flink有状态功能来构建项目,但是我似乎无法使其正常工作。我将问题缩小到的范围是,当我通过protobuf模式将请求发送到有状态函数时,序列化程序无法将消息序列化为期望的类。这是我想要做的:

import json
from statefun import StatefulFunctions,RequestReplyHandler
from jobs.session_event_pb2 import Event

functions = StatefulFunctions()


@functions.bind("namespace/funcname")
def funcname(context,session: Event):
    print("hello world")


handler = RequestReplyHandler(functions)

if __name__ == '__main__':
    inputFile = open("my_file.json","r")
    for line in inputFile:
        data = json.loads(line).get('properties')
        if data is not None and data.get('prop1') is not None and data.get('prop2') is not None:
            request = Event()
            request.prop1 = data["prop1"]
            request.prop2 = data["prop2"]
            request = request.SerializetoString()
            handler(request)

这是我的Protobuf模式:

Syntax = "proto3";

package mypackage;

message Event {
    string prop1 = 1;
    string prop2 = 2;
}

我在做什么错了?

解决方法

那是因为RequestReply处理程序不接受直接的protobuf消息。 Flink运行时发送一个名为ToFunction的类型,并接收一个FromFunction类型的响应。此有效负载包含您的呼叫者消息以及持久值和其他元信息。

如果您不能直接调用函数(例如在测试中),我鼓励您这样做并且根本不使用处理程序。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...