如何在 CPU 内核上执行函数,并在完成后获得回调?

问题描述

如何在 cpu 内核上执行一个函数,并在它完成时获得回调?


上下文

我正在接收一个流:

public static class AutoMapperExtensions
{
    public static TDestination Map<TDestination>(this IMapper mapper,params object[] source) where TDestination : class
    {
      TDestination destination = mapper.Map<TDestination>(source.FirstOrDefault());

      foreach (var src in source.Skip(1))
        destination = mapper.Map(src,destination);

      return destination;
    }

    public static TDestination Map<TDestination>(this IMapper mapper,TDestination destination,params object[] source) where TDestination : class
    {
      foreach (var src in source)
        destination = mapper.Map(src,destination);

      return destination;
    }
}

我需要利用多个 cpu 内核来加快速度。

symbols = ['ABC','DFG',...] # 52 of these handlers = { symbol: Handler(symbol) for symbol in symbols } async for symbol,payload in lines: # 600M of these handlers[symbol].Feed(payload) (例如)持有状态,但它与(例如)handler['ABC']

的状态不相交

基本上我不能同时运行 2 个内核,例如handler['DFG']


我目前的做法

我想出了以下解决方案,但它是伪代码的一部分,因为我看不到如何实现它。

handler['ABC']

所以我的问题是:如何将最后一条语句转换为可用的 Python 代码

NCORES = 4
symbol_curr_active_on_core = [None]*NCORES

NO_CORES_FREE = -1
def first_free_core():
    for i,symbol in enumerate(symbol_curr_active_on_core):
        if not symbol:
            return i
    return NO_CORES_FREE

for symbol,payload in lines:
    # wait for avail core to handle it

    while True:
        sleep(0.001)
        if first_free_core() == NO_CORES_FREE:
            continue
        if symbol in symbol_curr_active_on_core:
            continue
        core = first_free_core()
        symbol_curr_active_on_core[core] = symbol

        cores[core].execute(
            processor[symbol].Feed(payload),on_complete=lambda core_index: \
                symbol_curr_active_on_core[core_index] = None
        )

PS 更一般地说,我的方法是最佳的吗?

解决方法

假设以下方法是可行的:

  1. 您的 Handler 类可以“腌制”和
  2. Handler 类没有携带如此多的状态信息,因此其与每个工作器调用之间的序列化成本高得令人望而却步。

主进程创建一个 handlers 字典,其中键是 52 个符号之一,值是一个具有两个键的字典:'handler' 的值是符号的处理程序,而 'processing' 的值是是 TrueFalse,具体取决于进程当前是否正在处理该符号的一个或多个负载。

池中的每个进程都使用另一个 queue_dict 字典初始化,该字典的键是 52 个符号之一,其值是一个 multiprocessing.Queue 实例,该实例将保存要为该符号处理的有效负载实例。

主进程迭代输入的每一行以获得下一个符号/有效载荷对。有效载荷排队到当前符号的适当队列中。通过检查当前交易品种的 handlers 标志,访问 processing 字典以确定任务是否已被排入处理池以处理当前交易品种的特定于交易品种的处理程序。如果此标志为 True,则无需进一步操作。否则,processing 标志设置为 True 并调用 apply_async,并将此符号的处理程序作为参数传递。

排队任务(即有效载荷)的计数得到维护,并且每次主任务将有效载荷写入 52 个处理程序队列之一时都会增加。指定为 apply_async 参数的工作函数采用其 handler 参数,并从中推断出需要处理的队列。对于它在队列中找到的每个有效负载,它都会调用处理程序的 feed 方法。然后它返回一个由更新的处理程序和从队列中删除的有效负载消息数组成的元组。 apply_async 方法的回调函数 (1) 更新 handlers 字典中的处理程序并 (2) 将相应符号的 processing 标志重置为 False。最后,它根据已删除的有效负载消息的数量减少排队任务的数量。

当主进程在将负载入队后检查当前是否有进程正在运行此符号的处理程序并看到 processing 标志为 True 并且在此基础上不提交新任务通过 apply_async,有一个小窗口,该工作人员已经完成处理其队列中的所有有效负载并且即将返回或已经返回并且回调函数尚未设置 {{1 }} 标记为 processing。在这种情况下,有效载荷将在队列中处于未处理状态,直到从输入中读取并处理该符号的下一个有效载荷。但是如果该符号没有进一步的输入行,那么当所有任务完成后,我们将有未处理的有效载荷。但是我们也会有一个非零的排队任务计数,这表明我们遇到了这种情况。因此,与其尝试实现复杂的多处理同步协议,不如通过重新创建一个新池并检查 52 个队列中的每一个来检测这种情况并对其进行处理。

False

打印:

from multiprocessing import Pool,Queue
import time
from queue import Empty
from threading import Lock

# This class needs to be Pickle-able:
class Handler:
    def __init__(self,symbol):
        self.symbol = symbol
        self.counter = 0

    def feed(self,payload):
        # For testing just increment counter by payload:
        self.counter += payload


def init_pool(the_queue_dict):
    global queue_dict
    queue_dict = the_queue_dict


def worker(handler):
    symbol = handler.symbol
    q = queue_dict[symbol]
    tasks_removed = 0
    while True:
        try:
            payload = q.get_nowait()
            handler.feed(payload)
            tasks_removed += 1
        except Empty:
            break
    # return updated handler:
    return handler,tasks_removed

def callback_result(result):
    global queued_tasks
    global lock

    handler,tasks_removed = result
    # show done processing this symbol by updating handler state:
    d = handlers[handler.symbol]
    # The order of the next two statements matter:
    d['handler'] = handler
    d['processing'] = False
    with lock:
        queued_tasks -= tasks_removed

def main():
    global handlers
    global lock
    global queued_tasks

    symbols = [
        'A','B','C','D','E','F','G','H','I','J','K','L','M','AA','BB','CC','DD','EE','FF','GG','HH','II','JJ','KK','LL','MM','a','b','c','d','e','f','g','h','i','j','k','l','m','aa','bb','cc','dd','ee','ff','gg','hh','ii','jj','kk','ll','mm'
    ]

    queue_dict = {symbol: Queue() for symbol in symbols}

    handlers = {symbol: {'processing': False,'handler': Handler(symbol)} for symbol in symbols}

    lines = [
        ('A',1),('B',('C',('D',('E',('F',('G',('H',('I',('J',('K',('L',('M',('AA',('BB',('CC',('DD',('EE',('FF',('GG',('HH',('II',('JJ',('KK',('LL',('MM',('a',('b',('c',('d',('e',('f',('g',('h',('i',('j',('k',('l',('m',('aa',('bb',('cc',('dd',('ee',('ff',('gg',('hh',('ii',('jj',('kk',('ll',('mm',1)
    ]


    def get_lines():
        # Emulate 52_000 lines:
        for _ in range(10_000):
            for line in lines:
                yield line

    POOL_SIZE = 4

    queued_tasks = 0
    lock = Lock()

    # Create pool of POOL_SIZE processes:
    pool = Pool(POOL_SIZE,initializer=init_pool,initargs=(queue_dict,))
    for symbol,payload in get_lines():
        # Put some limit on memory utilization:
        while queued_tasks > 10_000:
            time.sleep(.001)
        d = handlers[symbol]
        q = queue_dict[symbol]
        q.put(payload)
        with lock:
            queued_tasks += 1
        if not d['processing']:
            d['processing'] = True
            handler = d['handler']
            pool.apply_async(worker,args=(handler,),callback=callback_result)
    # Wait for all tasks to complete
    pool.close()
    pool.join()

    if queued_tasks:
        # Re-create pool:
        pool = Pool(POOL_SIZE,))
        for d in handlers.values():
            handler = d['handler']
            d['processing'] = True
            pool.apply_async(worker,callback=callback_result)
        pool.close()
        pool.join()
        assert queued_tasks == 0

    # Print results:
    for d in handlers.values():
        handler = d['handler']
        print(handler.symbol,handler.counter)


if __name__ == "__main__":
    main()
,

这远非唯一(甚至可能是“最佳”)方法,但根据我对您另一篇文章的评论,这里有一个让特定子进程处理特定“符号”的示例

from multiprocessing import Process,Queue
from queue import Empty
from math import ceil

class STOPFLAG: pass

class Handler:
    def __init__(self,symbol):
        self.counter = 0 #maintain some state for each "Handler"
        self.symbol = symbol

    def feed(self,payload):
        self.counter += payload
        return self.counter

class Worker(Process):
    def __init__(self,out_q):
        self.handlers = {}
        self.in_q = Queue()
        self.out_q = out_q
        super().__init__()

    def run(self):
        while True:
            try:
                symbol = self.in_q.get(1)
            except Empty:
                pass #put break here if you always expect symbols to be available and a timeout "shouldn't" happen
            else:
                if isinstance(symbol,STOPFLAG):
                    #pass back the handlers with their now modified state
                    self.out_q.put(self.handlers)
                    break
                else:
                    self.handlers[symbol[0]].feed(symbol[1])
def main():
    n_workers = 4
    # Just 8 for testing:
    symbols = ['ABC','DEF','GHI','JKL','MNO','PQR','STU','VWX']

    workers = []
    out_q = Queue()
    for i in range(n_workers):
        workers.append(Worker(out_q))
    symbol_worker_mapping = {}
    for i,symbol in enumerate(symbols):
        workers[i%n_workers].handlers[symbol] = Handler(symbol)
        symbol_worker_mapping[symbol] = i%n_workers

    for worker in workers: worker.start() #start processes

    # Just a few for testing:
    lines = [
        ('ABC',('DEF',('GHI',('JKL',('MNO',('PQR',('STU',('VWX',('ABC',]
    #putting this loop in a thread could allow results to be collected while inputs are still being fed in.
    for symbol,payload in lines: #feed in tasks
        worker = workers[symbol_worker_mapping[symbol]] #select the correct worker
        worker.in_q.put([symbol,payload]) #pass the inputs

    results = [] #results are handler dicts from each worker
    for worker in workers:
        worker.in_q.put(STOPFLAG()) #Send stop signal to each worker
        results.append(out_q.get()) #get results (may be out of order)

    for worker in workers: worker.join() #cleanup
    for result in results:
        for symbol,handler in result.items():
            print(symbol,handler.counter)


if __name__ == "__main__":
    main()

每个子进程处理“符号”的一个子集,每个子​​进程都有自己的输入队列。这与正常的 pool 不同,其中每个孩子都是相同的,它们都共享一个输入队列,其中下一个可用的孩子总是接受下一个输入。然后,它们都将结果放入共享输出队列,返回到主进程。

一个完全不同的解决方案可能是在主进程中保持所有状态,为每个符号维护一个锁,并在将必要状态发送给工作进程时保持锁,直到收到结果,以及主进程更新。