问题描述
我一直在关注有关如何设置和使用 Apache Arrow Flight 的 this 教程。
从例子中,server.py:
import pyarrow as pa
import pyarrow.flight as fl
def create_table_int():
data = [
pa.array([1,2,3]),pa.array([4,5,6])
]
return pa.Table.from_arrays(data,names=['column1','column2'])
def create_table_dict():
keys = pa.array(["x","y","z"],type=pa.utf8())
data = [
pa.chunked_array([
pa.DictionaryArray.from_arrays([0,1,2],keys),pa.DictionaryArray.from_arrays([0,keys)
]),pa.chunked_array([
pa.DictionaryArray.from_arrays([1,1],pa.DictionaryArray.from_arrays([2,keys)
])
]
return pa.Table.from_arrays(data,'column2'])
class FlightServer(fl.FlightServerBase):
def __init__(self,location="grpc://0.0.0.0:8815",**kwargs):
super(FlightServer,self).__init__(location,**kwargs)
self.tables = {
b'table_int': create_table_int(),b'table_dict': create_table_dict(),}
def do_get(self,context,ticket):
table = self.tables[ticket.ticket]
return fl.RecordBatchStream(table)
# return fl.GeneratorStream(table.schema,table.to_batches(max_chunksize=1024))
def main():
FlightServer().serve()
if __name__ == '__main__':
main()
客户端.py
import argparse
import sys
import pyarrow as pa
import pyarrow.flight as fl
def get_by_ticket(args,client):
ticket_name = args.name
response = client.do_get(fl.Ticket(ticket_name)).read_all()
print_response(response)
def get_by_ticket_pandas(args,client):
ticket_name = args.name
response = client.do_get(fl.Ticket(ticket_name)).read_pandas()
print_response(response)
def print_response(data):
print("=== Response ===")
print(data)
print("================")
def main():
parser = argparse.ArgumentParser()
subcommands = parser.add_subparsers()
cmd_get_by_t = subcommands.add_parser('get_by_ticket')
cmd_get_by_t.set_defaults(action='get_by_ticket')
cmd_get_by_t.add_argument('-n','--name',type=str,help="Name of the ticket to fetch.")
cmd_get_by_tp = subcommands.add_parser('get_by_ticket_pandas')
cmd_get_by_tp.set_defaults(action='get_by_ticket_pandas')
cmd_get_by_tp.add_argument('-n',help="Name of the ticket to fetch.")
args = parser.parse_args()
if not hasattr(args,'action'):
parser.print_help()
sys.exit(1)
commands = {
'get_by_ticket': get_by_ticket,'get_by_ticket_pandas': get_by_ticket_pandas,}
client = fl.connect("grpc://0.0.0.0:8815")
commands[args.action](args,client)
if __name__ == '__main__':
main()
我在通过服务访问的 k8s 集群中运行服务器,其他各种 pod 调用服务器。这工作正常,除非在第一次调用返回之前对服务器进行第二次调用。在这种情况下,我没有从第一次调用中得到正确的响应,但我似乎也没有收到任何错误。我不确定正确的术语是什么,但是有没有办法让服务器“阻塞”,以便它在开始第二个调用之前完成第一个调用的处理,或者其他一些解决方法?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)