使用zeep

问题描述

这是我的用例:

  • 我从数据库中读取包含信息的行,以进行复杂的SOAP调用(我正在使用zeep进行这些调用)。
  • 数据库中的一行对应于对服务的请求。
  • 最多可以有2万行,所以我不想在打电话之前先读取内存中的所有内容
  • 我需要处理回复- 响应还可以,我需要将一些返回的信息存储回 我的数据库,当有异常时,我需要处理 该特定请求/响应对的异常。
  • 我还需要在创建请求时捕获一些外部信息,以便我知道将请求的响应存储在哪里。在我当前的代码中,我使用了collect()的令人愉悦的属性,该属性使结果以相同的顺序出现。

我阅读了相关的PEP和Python文档,但我仍然很困惑,因为似乎有多种方法可以解决同一问题。

我还在网络上进行了无数次练习,但是这些示例都很简单-它要么是asyncio.sleep(),要么是使用有限的URL列表进行的网络抓取。

到目前为止,我提出的解决方案还行得通-asyncio.gather()方法非常非常有用,但是我无法从生成器中“获取”它。我目前只是在计算任意大小,然后启动.gather()操作。我已经抄写了代码,遗漏了一些无聊的部分,并且尝试对代码进行匿名处理

我尝试过涉及信号灯,队列,不同事件循环的解决方案,但每次都失败。理想情况下,我希望能够“连续”创建期货-我认为我缺少“将这个等待的呼叫转换为期货”的逻辑。

感谢您的帮助!

import asyncio
from asyncio import Future

import zeep
from zeep.plugins import HistoryPlugin


history = HistoryPlugin()

max_concurrent_calls = 5

provoke_errors = True

def export_data_async(db_variant: str,order_nrs: set):
    st = time.time()
    results = []

    loop = asyncio.get_event_loop()


    def get_client1(service_name: str,system: Systems = Systems.Acme) -> Tuple[zeep.Client,zeep.client.Factory]:
        client1 = zeep.Client(wsdl=system.wsdl_url(service_name=service_name),transport=transport,plugins=[history],)
        factory_ns2 = client1.type_factory(namespace='ns2')

        return client1,factory_ns2

    table = 'ZZZZ'
    moveback_table = 'EEEEEE'
    moveback_dict = create_default_empty_ordered_dict('attribute1 attribute2 attribute3 attribute3')

    client,factory = get_client1(service_name='AcmeServiceName')

    if log.isEnabledFor(logging.DEBUG):
        client.wsdl.dump()
        zeep_log = logging.getLogger('zeep.transports')
        zeep_log.setLevel(logging.DEBUG)

    with Db(db_variant) as db:

        db.open_db(CON_STRING[db_variant])

        db.init_table_for_read(table,order_list=order_nrs)        

        counter_failures = 0

        tasks = []
        sids = []
        results = []

        def handle_future(future: Future) -> None:
            results.extend(future.result())

        def process_tasks_concurrently() -> None:
            nonlocal tasks,sids,counter_failures,results

            futures = asyncio.gather(*tasks,return_exceptions=True)
            futures.add_done_callback(handle_future)
            loop.run_until_complete(futures)
            for i,response_or_fault in enumerate(results):
                if type(response_or_fault) in [zeep.exceptions.Fault,zeep.exceptions.TransportError]:
                    counter_failures += 1
                    log_webservice_fault(sid=sids[i],db=db,err=response_or_fault,object=table)
                else:

                    db.write_dict_to_table(
                        moveback_table,{'sid': sids[i],'attribute1': response_or_fault['XXX']['XXX']['xxx'],'attribute2': response_or_fault['XXX']['XXX']['XXXX']['XXX'],'attribute3': response_or_fault['XXXX']['XXXX']['XXX'],}
                    )
            db.commit_db_con()
            tasks = []
            sids = []
            results = []
            return

        for row in db.rows(table):
            if int(row.id) % 2 == 0 and provoke_errors:
                payload = faulty_message_payload(row=row,factory=factory,)
            else:
                payload = message_payload(row=row,)

            tasks.append(client.service.myRequest(
                MessageHeader=factory.MessageHeader(**message_header_arguments(row=row)),myRequestPayload=payload,_soapheaders=[security_soap_header],))
            sids.append(row.sid)
            if len(tasks) == max_concurrent_calls:
                process_tasks_concurrently()

        if tasks:  # this is the remainder of len(db.rows) % max_concurrent_calls
            process_tasks_concurrently()

        loop.run_until_complete(transport.session.close())

        db.execute_this_statement(statement=update_sql)
        db.commit_db_con()

        log.info(db.activity_log)
        if counter_failures:
            log.info(f"{table :<25} Count Failed: {counter_failures}")

        print("time async: %.2f" % (time.time() - st))
    return results

尝试失败的队列:(在await client.service处阻止)

loop = asyncio.get_event_loop()

counter = 0
results = []

async def payload_generator(db_variant: str,order_nrs: set):
    

    # code that generates the data for the request

    yield counter,row,payload


async def service_call_worker(queue,results):
    while True:
        counter,payload = await queue.get()
        results.append(await client.service.myServicename(
            MessageHeader=calculate_message_header(row=row)),myPayload=payload,)
                       )
        print(colorama.Fore.BLUE + f'after result returned {counter}')


        # Here do the relevant processing of response or error
        queue.task_done()


async def main_with_q():
    n_workers = 3
    queue = asyncio.Queue(n_workers)
    e = pprint.pformat(queue)
    p = payload_generator(DB_VARIANT,order_list_from_args())
    results = []
    workers = [asyncio.create_task(service_call_worker(queue,results))
               for _ in range(n_workers)]

    async for c in p:
        await queue.put(c)

    await queue.join()  # wait for all tasks to be processed
    for worker in workers:
        worker.cancel()



if __name__ == '__main__':
    try:
        loop.run_until_complete(main_with_q())
        loop.run_until_complete(transport.session.close())
    finally:
        loop.close()

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)