问题描述
我正在尝试使用Python中的Apache Flink有状态功能来构建项目,但是我似乎无法使其正常工作。我将问题缩小到的范围是,当我通过protobuf模式将请求发送到有状态函数时,序列化程序无法将消息序列化为期望的类。这是我想要做的:
import json
from statefun import StatefulFunctions,RequestReplyHandler
from jobs.session_event_pb2 import Event
functions = StatefulFunctions()
@functions.bind("namespace/funcname")
def funcname(context,session: Event):
print("hello world")
handler = RequestReplyHandler(functions)
if __name__ == '__main__':
inputFile = open("my_file.json","r")
for line in inputFile:
data = json.loads(line).get('properties')
if data is not None and data.get('prop1') is not None and data.get('prop2') is not None:
request = Event()
request.prop1 = data["prop1"]
request.prop2 = data["prop2"]
request = request.SerializetoString()
handler(request)
这是我的Protobuf模式:
Syntax = "proto3";
package mypackage;
message Event {
string prop1 = 1;
string prop2 = 2;
}
我在做什么错了?
解决方法
那是因为RequestReply处理程序不接受直接的protobuf消息。 Flink运行时发送一个名为ToFunction
的类型,并接收一个FromFunction
类型的响应。此有效负载包含您的呼叫者消息以及持久值和其他元信息。
如果您不能直接调用函数(例如在测试中),我鼓励您这样做并且根本不使用处理程序。