问题描述
这是我的用例:
- 我从数据库中读取包含信息的行,以进行复杂的SOAP调用(我正在使用zeep进行这些调用)。
- 数据库中的一行对应于对服务的请求。
- 最多可以有2万行,所以我不想在打电话之前先读取内存中的所有内容。
- 我需要处理回复- 响应还可以,我需要将一些返回的信息存储回 我的数据库,当有异常时,我需要处理 该特定请求/响应对的异常。
- 我还需要在创建请求时捕获一些外部信息,以便我知道将请求的响应存储在哪里。在我当前的代码中,我使用了collect()的令人愉悦的属性,该属性使结果以相同的顺序出现。
我阅读了相关的PEP和Python文档,但我仍然很困惑,因为似乎有多种方法可以解决同一问题。
我还在网络上进行了无数次练习,但是这些示例都很简单-它要么是asyncio.sleep(),要么是使用有限的URL列表进行的网络抓取。
到目前为止,我提出的解决方案还行得通-asyncio.gather()方法非常非常有用,但是我无法从生成器中“获取”它。我目前只是在计算任意大小,然后启动.gather()操作。我已经抄写了代码,遗漏了一些无聊的部分,并且尝试对代码进行匿名处理
我尝试过涉及信号灯,队列,不同事件循环的解决方案,但每次都失败。理想情况下,我希望能够“连续”创建期货-我认为我缺少“将这个等待的呼叫转换为期货”的逻辑。
感谢您的帮助!
import asyncio
from asyncio import Future
import zeep
from zeep.plugins import HistoryPlugin
history = HistoryPlugin()
max_concurrent_calls = 5
provoke_errors = True
def export_data_async(db_variant: str,order_nrs: set):
st = time.time()
results = []
loop = asyncio.get_event_loop()
def get_client1(service_name: str,system: Systems = Systems.Acme) -> Tuple[zeep.Client,zeep.client.Factory]:
client1 = zeep.Client(wsdl=system.wsdl_url(service_name=service_name),transport=transport,plugins=[history],)
factory_ns2 = client1.type_factory(namespace='ns2')
return client1,factory_ns2
table = 'ZZZZ'
moveback_table = 'EEEEEE'
moveback_dict = create_default_empty_ordered_dict('attribute1 attribute2 attribute3 attribute3')
client,factory = get_client1(service_name='AcmeServiceName')
if log.isEnabledFor(logging.DEBUG):
client.wsdl.dump()
zeep_log = logging.getLogger('zeep.transports')
zeep_log.setLevel(logging.DEBUG)
with Db(db_variant) as db:
db.open_db(CON_STRING[db_variant])
db.init_table_for_read(table,order_list=order_nrs)
counter_failures = 0
tasks = []
sids = []
results = []
def handle_future(future: Future) -> None:
results.extend(future.result())
def process_tasks_concurrently() -> None:
nonlocal tasks,sids,counter_failures,results
futures = asyncio.gather(*tasks,return_exceptions=True)
futures.add_done_callback(handle_future)
loop.run_until_complete(futures)
for i,response_or_fault in enumerate(results):
if type(response_or_fault) in [zeep.exceptions.Fault,zeep.exceptions.TransportError]:
counter_failures += 1
log_webservice_fault(sid=sids[i],db=db,err=response_or_fault,object=table)
else:
db.write_dict_to_table(
moveback_table,{'sid': sids[i],'attribute1': response_or_fault['XXX']['XXX']['xxx'],'attribute2': response_or_fault['XXX']['XXX']['XXXX']['XXX'],'attribute3': response_or_fault['XXXX']['XXXX']['XXX'],}
)
db.commit_db_con()
tasks = []
sids = []
results = []
return
for row in db.rows(table):
if int(row.id) % 2 == 0 and provoke_errors:
payload = faulty_message_payload(row=row,factory=factory,)
else:
payload = message_payload(row=row,)
tasks.append(client.service.myRequest(
MessageHeader=factory.MessageHeader(**message_header_arguments(row=row)),myRequestPayload=payload,_soapheaders=[security_soap_header],))
sids.append(row.sid)
if len(tasks) == max_concurrent_calls:
process_tasks_concurrently()
if tasks: # this is the remainder of len(db.rows) % max_concurrent_calls
process_tasks_concurrently()
loop.run_until_complete(transport.session.close())
db.execute_this_statement(statement=update_sql)
db.commit_db_con()
log.info(db.activity_log)
if counter_failures:
log.info(f"{table :<25} Count Failed: {counter_failures}")
print("time async: %.2f" % (time.time() - st))
return results
尝试失败的队列:(在await client.service
处阻止)
loop = asyncio.get_event_loop()
counter = 0
results = []
async def payload_generator(db_variant: str,order_nrs: set):
# code that generates the data for the request
yield counter,row,payload
async def service_call_worker(queue,results):
while True:
counter,payload = await queue.get()
results.append(await client.service.myServicename(
MessageHeader=calculate_message_header(row=row)),myPayload=payload,)
)
print(colorama.Fore.BLUE + f'after result returned {counter}')
# Here do the relevant processing of response or error
queue.task_done()
async def main_with_q():
n_workers = 3
queue = asyncio.Queue(n_workers)
e = pprint.pformat(queue)
p = payload_generator(DB_VARIANT,order_list_from_args())
results = []
workers = [asyncio.create_task(service_call_worker(queue,results))
for _ in range(n_workers)]
async for c in p:
await queue.put(c)
await queue.join() # wait for all tasks to be processed
for worker in workers:
worker.cancel()
if __name__ == '__main__':
try:
loop.run_until_complete(main_with_q())
loop.run_until_complete(transport.session.close())
finally:
loop.close()
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)