如何使用 asyncio 而不是线程在一个生成器上评估多个函数?

问题描述

目标

这项工作旨在为以下问题提供有效的解决方案。

source = lambda: range(1 << 24)  # for example
functions = (min,max,sum)  # for example
data = tuple(source())  # from some generator
results = tuple(f(data) for f in functions)

这有效。 source() 函数可以生成任意多的值。它们被放入一个名为 tupledata 中。然后使用该 functions 调用一系列 tuple 以给出 results。这些函数迭代一个给定的参数化迭代器一次,然后给出它们的结果。这适用于小型数据集。但是,如果 source() 生成很多很多值,则必须将它们全部存储起来。这会占用内存。

可能的解决方

有点像……

from typing import Callable,Iterable,Tuple,TypeVar

TI = TypeVar('TI')
TO = TypeVar('TO')


def magic_function(data: Iterable[TI],fxns: Iterable[Callable[[Iterable[TI]],TO]]) -> Tuple[TO,...]:
    stored = tuple(data)  # memory hog,prohibitively
    return tuple(f(stored) for f in fxns)


source = lambda: range(1 << 24)  # for example
functions = (min,sum)  # for example
results = magic_function(source(),functions)

这就是我一直在努力做的。此 magic_function() 会将 data 迭代器提供给某种内部异步服务器。然后 fxns 将被赋予异步客户端——这似乎是普通的迭代器。 fxns 可以将这些客户端作为未修改的迭代器进行处理。 fxns 不能修改。可以使用 threading 模块执行此操作。不过,开销会很可怕。

格外清晰

这应该是真的。

source = lambda: range(1 << 24)  # for example
functions = (min,sum)  # for example
if first_method:
    data = tuple(source())  # from some generator
    results = tuple(f(data) for f in functions)
else:
    results = magic_function(source(),functions)

无论first_methodTrue还是False,对于source()相同的输出和相同的functionsresults应该always 匹配(对于单遍迭代器消耗 functions)。第一个计算并存储整个数据集。这可能会非常浪费和缓慢。神奇的方法应该以最小的开销成本(时间和内存)来节省内存。

线程实现

这是一个使用 threading 模块的有效实现。它明显慢...

#!/usr/bin/python3
from collections import namedtuple
from random import randint
from statistics import geometric_mean,harmonic_mean,mean,median,median_high,median_low,mode
from threading import Event,Lock,Thread
from typing import *

''' https://pastebin.com/u4mTHfgc '''

int_iterable = Iterable[int]
_T = TypeVar('_T1',int,float)
_FXN_T = Callable[[int_iterable],_T]


class Server:
    _it: int_iterable
    slots: int
    edit_slots: Lock
    element: _T
    available: Event
    zero_slots: Event
    end: bool

    def __init__(self,it: int_iterable):
        self._it = it
        self.slots = 0
        self.edit_slots = Lock()
        self.available = Event()
        self.zero_slots = Event()
        self.end = False

    def server(self,queue_length: int):
        available = self.available
        zero_slots = self.zero_slots
        for v in self._it:
            self.slots = queue_length
            self.element = v
            zero_slots.clear()
            available.set()
            zero_slots.wait()
        self.slots = queue_length
        self.end = True
        zero_slots.clear()
        available.set()
        zero_slots.wait()

    def client(self) -> int_iterable:
        available = self.available
        zero_slots = self.zero_slots
        edit_slots = self.edit_slots
        while True:
            available.wait()
            end = self.end
            if not end:
                yield self.element
            with edit_slots:
                self.slots -= 1
                if self.slots == 0:
                    available.clear()
                    zero_slots.set()
            zero_slots.wait()
            if end:
                break


class Slot:
    thread: Thread
    fxn: _FXN_T
    server: Server
    qid: int
    result: Union[Optional[_T],Exception,Tuple[Exception,Exception]]

    def __init__(self,fxn: _FXN_T,server: Server,qid: int):
        self.thread = Thread(target = self.run,name = f'BG {id(self)} thread {qid}')
        self.fxn = fxn
        self.server = server
        self.qid = qid
        self.result = None

    def run(self):
        client = self.server.client()
        try:
            self.result = self.fxn(client)
        except Exception as e:
            self.result = e
            try:
                for _ in client:  # one thread breaking won't break it all.
                    pass
            except Exception as f:
                self.result = e,f


class BranchedGenerator:
    _server: Server
    _queue: List[Slot]

    def __init__(self,it: int_iterable):
        self._server = Server(it)
        self._queue = []

    def new(self,fxn: _FXN_T) -> int:
        qid = len(self._queue)
        self._queue.append(Slot(fxn,self._server,qid))
        return qid

    def finalize(self):
        queue = self._queue
        for t in queue:
            t.thread.start()
        self._server.server(len(queue))
        for t in queue:
            t.thread.join()

    def get(self,qid: int) -> _T:
        return self._queue[qid].result

    @classmethod
    def make(cls,it: int_iterable,fxns: Iterable[_FXN_T]) -> Tuple[_T,...]:
        tmp = cls(it)
        qid_range = max(map(tmp.new,fxns))
        tmp.finalize()
        return tuple((tmp.get(qid)) for qid in range(qid_range + 1))


seq_stats = namedtuple('seq_stats',('tuple','mean','harmonic_mean','geometric_mean','median','median_high','median_low','mode'))


def bundle_bg(xs: int_iterable) -> seq_stats:
    tmp = BranchedGenerator(xs)
    # noinspection PyTypeChecker
    ys = seq_stats(
        tmp.new(tuple),tmp.new(mean),tmp.new(harmonic_mean),tmp.new(geometric_mean),tmp.new(median),tmp.new(median_high),tmp.new(median_low),tmp.new(mode)
    )
    tmp.finalize()
    return seq_stats(
        tmp.get(ys.tuple),tmp.get(ys.mean),tmp.get(ys.harmonic_mean),tmp.get(ys.geometric_mean),tmp.get(ys.median),tmp.get(ys.median_high),tmp.get(ys.median_low),tmp.get(ys.mode)
    )


def bundle(xs: int_iterable) -> seq_stats:
    return seq_stats(
        tuple(xs),mean(xs),harmonic_mean(xs),geometric_mean(xs),median(xs),median_high(xs),median_low(xs),mode(xs)
    )


def display(v: seq_stats):
    print(f'Statistics of {v.tuple}:\n'
          f'\tMean: {v.mean}\n'
          f'\tHarmonic Mean: {v.harmonic_mean}\n'
          f'\tGeometric Mean: {v.geometric_mean}\n'
          f'\tMedian: {v.median}\n'
          f'\tMedian High: {v.median_high}\n'
          f'\tMedian Low: {v.median_low}\n'
          f'\tMode: {v.mode};')


def new(length: int,inclusive_maximum: int) -> int_iterable:
    return (randint(1,inclusive_maximum) for _ in range(length))


def test1() -> int:
    sample = new(10,1 << 65)
    struct1 = bundle_bg(sample)
    display(struct1)
    struct2 = bundle(struct1.tuple)
    display(struct2)
    matches = seq_stats(*(a == b for (a,b) in zip(struct1,struct2)))
    display(matches)
    return sum(((1 >> i) * (not e)) for (i,e) in enumerate(matches))


def test2():
    sample = new(1000,1 << 5)
    struct1 = seq_stats(*BranchedGenerator.make(
        sample,(tuple,geometric_mean,mode)
    ))
    display(struct1)
    struct2 = bundle(struct1.tuple)
    display(struct2)
    matches = seq_stats(*(a == b for (a,e) in enumerate(matches))


def test3():
    pass


if __name__ == '__main__':
    exit((test2()))

Branching Generator Module (V3) [using threading] - Pastebin.com 链接具有更新的代码。从开始到输出,半秒过去了。这只是八个功能test1()test2() 都有这个速度问题。

尝试

我尝试使用 magic_function() 模块实现 asyncio

#!/usr/bin/python3
from asyncio import Task,create_task,run,wait
from collections import deque,namedtuple
from random import randint
from statistics import geometric_mean,mode
from typing import *

''' https://pastebin.com/ELzEaSK8 '''

int_iterable = Iterable[int]
_T = TypeVar('_T1',float)
ENGINE_T = AsyncGenerator[Tuple[_T,bool],int]


async def injector(engine: ENGINE_T,qid: int) -> AsyncIterator[int]:
    while True:
        try:
            x,try_again = await engine.asend(qid)
        except StopAsyncIteration:
            break
        if try_again:
            continue
        yield x


WRAPPER_FXN_T = Callable[[int_iterable],_T]


def wrapper(fxn: WRAPPER_FXN_T,engine: ENGINE_T,qid: int):
    async def i():
        # TypeError: 'async_generator' object is not iterable
        return fxn(iter(x async for x in injector(engine,qid)))

    return i


class BranchedGenerator:
    _it: int_iterable
    _engine: ENGINE_T
    _queue: Union[tuple,deque]

    def __init__(self,it: int_iterable):
        self._it = it
        self._engine = self._make_engine()
        # noinspection PyTypeChecker
        wait(self._engine)
        self._queue = deque()

    async def _make_engine(self) -> ENGINE_T:  # it's like a server
        lq = len(self._queue)
        result = try_again = 0,True
        for value in self._it:
            waiting = set(range(lq))
            while True:
                qid = (yield result)
                if len(waiting) == 0:
                    result = try_again
                    break
                if qid in waiting:
                    waiting.remove(qid)
                    result = value,False
                else:
                    result = try_again

    def new(self,fxn: WRAPPER_FXN_T) -> int:
        qid = len(self._queue)
        self._queue.append(wrapper(fxn,self._engine,qid)())
        return qid

    def finalize(self):
        self._queue = tuple(self._queue)

    def get(self,qid: int) -> Task:
        return create_task(self._queue[qid])

    @classmethod
    @(lambda f: (lambda it,fxns: run(f(it,fxns))))
    def make(cls,fxns: Iterable[Callable[[int_iterable],_T]]) -> Tuple[_T,fxns))
        tmp.finalize()
        return tuple((await tmp.get(qid)) for qid in range(qid_range + 1))


seq_stats = namedtuple('seq_stats','mode'))


@(lambda f: (lambda xs: run(f(xs))))
async def bundle_bg(xs: int_iterable) -> seq_stats:
    tmp = BranchedGenerator(xs)
    # noinspection PyTypeChecker
    ys = seq_stats(
        tmp.new(tuple),tmp.new(mode)
    )
    tmp.finalize()
    return seq_stats(
        await tmp.get(ys.tuple),await tmp.get(ys.mean),await tmp.get(ys.harmonic_mean),await tmp.get(ys.geometric_mean),await tmp.get(ys.median),await tmp.get(ys.median_high),await tmp.get(ys.median_low),await tmp.get(ys.mode)
    )


def bundle(xs: int_iterable) -> seq_stats:
    return seq_stats(
        tuple(xs),e) in enumerate(matches))


async def test2():
    sample = new(1000,1 << 5)
    # noinspection PyTypeChecker
    struct1 = seq_stats(*await BranchedGenerator.make(
        sample,e) in enumerate(matches))


async def test3():
    pass


if __name__ == '__main__':
    exit((test1()))

Branching Generator Module (V2) - Pastebin.com 链接具有最新版本。我不会更新嵌入的代码!如果进行了更改,pastebin 副本将包含它们。

测试

  1. test1() 确保 bundle_bg()bundle() 做的事情。他们应该做同样的事情。

  2. test2() 查看 BranchedGenarator.make() 的行为是否像 bundle_bg() 和(传递地)像 bundle()BranchedGenarator.make() 应该最像 magic_function()

  3. test3() 还没有任何用途。

状态

一个测试失败。第二个测试在调用 BranchedGenerator.make() 时也有类似的错误

[redacted]/b_gen.py:45: RuntimeWarning: coroutine 'wait' was never awaited
  wait(self._engine)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Traceback (most recent call last):
  File "[redacted]/b_gen.py",line 173,in <module>
    exit((test1()))
  File "[redacted]/b_gen.py",line 144,in test1
    struct1 = bundle_bg(sample)
  File "[redacted]/b_gen.py",line 87,in <lambda>
    @(lambda f: (lambda xs: run(f(xs))))
  File "/usr/lib64/python3.9/asyncio/runners.py",line 44,in run
    return loop.run_until_complete(main)
  File "/usr/lib64/python3.9/asyncio/base_events.py",line 642,in run_until_complete
    return future.result()
  File "[redacted]/b_gen.py",line 103,in bundle_bg
    await tmp.get(ys.tuple),File "[redacted]/b_gen.py",line 31,in i
    return fxn(iter(x async for x in injector(engine,qid)))
TypeError: 'async_generator' object is not iterable
sys:1: RuntimeWarning: coroutine 'wrapper.<locals>.i' was never awaited

老实说,我是 asyncio 的新手。我不知道如何解决这个问题。

问题

有人可以帮我解决这个问题吗?!请?带有 asyncio 的这个应该和带有 threading 的那个完全一样——只是没有开销。

另一种途径

在此之前,我尝试了一个更简单的实现。

#!/usr/bin/python3
from random import randrange
from statistics import mean as st_mean,median as st_median,mode as st_mode
from typing import Any,Callable,TypeVar

''' https://pastebin.com/xhfT1njJ '''


class BranchedGenerator:
    _n: Iterable[int]
    _stop_value: Any

    def __init__(self,n: Iterable[int],stop: Any):
        self._n = n
        self._stop_value = stop

    @property
    def new(self):
        return


def wrapper1(f):
    new = (yield)
    # SyntaxError: 'yield' inside generator expression
    yield f((y for _ in new if (y := (yield)) or True))
    return


_T1 = TypeVar('_T1')
_T2 = TypeVar('_T2')


def wrapper2(ns: Iterable[_T1],fs: Iterable[Callable[[Iterable[_T1]],_T2]]) -> Tuple[_T2,...]:
    def has_new():
        while new:
            yield True
        while True:
            yield False

    new = True
    xwf = tuple(map(wrapper1,fs))
    for x in xwf:
        next(x)
        x.send(has_new)
        next(x)
    for n in ns:
        for x in xwf:
            x.send(n)
    new = False
    return tuple(map(next,xwf))


def source(n: int) -> Iterable[int]:
    return (randrange(-9,9000) for _ in range(n))


normal = (tuple,st_mean,st_median,st_mode)


def test0():
    sample = tuple(source(25))
    s_tuple,s_mean,s_median,s_mode = wrapper2(sample,normal)
    b_tuple,b_mean,b_median,b_mode = (f(s_tuple) for f in normal)
    assert all((
        s_tuple == b_tuple,s_mean == b_mean,s_median == b_median,s_mode == b_mode
    ))


def test1():
    sample = source(25)
    s_tuple,b_mode = (f(s_tuple) for f in normal)
    print(
        'Test1:'
        '\nTuple',s_tuple,'\n',b_tuple,'\n==?',v0 := s_tuple == b_tuple,'\nMean',v1 := s_mean == b_mean,'\nMedian',v2 := s_median == b_median,'\nMode',s_mode,b_mode,v3 := s_mode == b_mode,'\nPasses',''.join('01'[v * 1] for v in (v0,v1,v2,v3)),'All?',all((v0,v3))
    )


if __name__ == '__main__':
    test0()
    test1()

Branching Generator Module (V1) - Pastebin.com 链接包含更新政策。

测试

  1. 测试 0 说明 wrapper2() 是否做了应该做的事情。即调用所有 functions 并返回结果。不保存内存,例如 first_method == True

  2. 测试 1 就像 first_method == Falsesample 不是 tuple

问题

哎哟!我可以编码,我向你保证。

 File "[redacted]/branched_generator.py",line 25
    yield f((y for _ in new if (y := (yield)) or True))
            ^
SyntaxError: 'yield' inside generator expression

我坦率地承认:这个版本过时了。 wrapper2() 显然最像 magic_function()

问题

由于这是更简单的实现,因此可以挽救此 wrapper2() 吗?如果没有,请不要担心。

解决方法

如果只是你担心的数据的具体化,你可以这样做

from itertools import tee
from statistics import geometric_mean,harmonic_mean,mean,median,median_high,median_low,mode
from random import randint

def magic_function(data,fxns):
    return tuple(f(d) for f,d in zip(fxns,tee(data,len(fxns))))

def new(length: int,inclusive_maximum: int) -> Iterable[int]:
    return (randint(1,inclusive_maximum) for _ in range(length))

sample = new(1000,1 << 5)
functions = (tuple,geometric_mean,mode)

magic_function(sample,functions)

NB tee 虽然不是线程安全的

PS:你说得对,这会消耗生成器并生成其中所有数据的 n 个副本。

我认为我们无法挽救您的问题中的 asyncawait 版本。 fxns 中的任意函数必须异步使用迭代器;他们必须(大致)在他们弹出和处理的每个项目之后释放控制流。但是 asyncawait 是合作的,我们不能强制任何给定的函数 f 在它的循环中 await (这就是为什么我们得到 TypeError)。但是您使用 threading 确实 的解决方案有效,因为在它们的循环中的某些点,线程会被 VM 先发制人地休眠,这样就为其他函数提供了机会运行。

请记住,有一个 difference between simultaneous and concurrent。当我说函数的顺序循环就足够了时,我的意思是这样的,让其中一个消耗一个项目,然后让下一个消耗一个。这些功能不需要同时运行。事实上,您的工作线程示例不会同时运行任何内容(在 CPython VM 上。IronPython 和 Jython 可能同时运行多个 threading.Thread,但在 CPython 上一次只有 1 个运行)