问题描述
上下文
我正在接收一个流:
public static class AutoMapperExtensions
{
public static TDestination Map<TDestination>(this IMapper mapper,params object[] source) where TDestination : class
{
TDestination destination = mapper.Map<TDestination>(source.FirstOrDefault());
foreach (var src in source.Skip(1))
destination = mapper.Map(src,destination);
return destination;
}
public static TDestination Map<TDestination>(this IMapper mapper,TDestination destination,params object[] source) where TDestination : class
{
foreach (var src in source)
destination = mapper.Map(src,destination);
return destination;
}
}
symbols = ['ABC','DFG',...] # 52 of these
handlers = { symbol: Handler(symbol) for symbol in symbols }
async for symbol,payload in lines: # 600M of these
handlers[symbol].Feed(payload)
(例如)持有状态,但它与(例如)handler['ABC']
基本上我不能同时运行 2 个内核,例如handler['DFG']
。
我目前的做法
我想出了以下解决方案,但它是伪代码的一部分,因为我看不到如何实现它。
handler['ABC']
所以我的问题是:如何将最后一条语句转换为可用的 Python 代码?
NCORES = 4
symbol_curr_active_on_core = [None]*NCORES
NO_CORES_FREE = -1
def first_free_core():
for i,symbol in enumerate(symbol_curr_active_on_core):
if not symbol:
return i
return NO_CORES_FREE
for symbol,payload in lines:
# wait for avail core to handle it
while True:
sleep(0.001)
if first_free_core() == NO_CORES_FREE:
continue
if symbol in symbol_curr_active_on_core:
continue
core = first_free_core()
symbol_curr_active_on_core[core] = symbol
cores[core].execute(
processor[symbol].Feed(payload),on_complete=lambda core_index: \
symbol_curr_active_on_core[core_index] = None
)
PS 更一般地说,我的方法是最佳的吗?
解决方法
假设以下方法是可行的:
- 您的
Handler
类可以“腌制”和 -
Handler
类没有携带如此多的状态信息,因此其与每个工作器调用之间的序列化成本高得令人望而却步。
主进程创建一个 handlers
字典,其中键是 52 个符号之一,值是一个具有两个键的字典:'handler' 的值是符号的处理程序,而 'processing' 的值是是 True
或 False
,具体取决于进程当前是否正在处理该符号的一个或多个负载。
池中的每个进程都使用另一个 queue_dict
字典初始化,该字典的键是 52 个符号之一,其值是一个 multiprocessing.Queue
实例,该实例将保存要为该符号处理的有效负载实例。
主进程迭代输入的每一行以获得下一个符号/有效载荷对。有效载荷排队到当前符号的适当队列中。通过检查当前交易品种的 handlers
标志,访问 processing
字典以确定任务是否已被排入处理池以处理当前交易品种的特定于交易品种的处理程序。如果此标志为 True
,则无需进一步操作。否则,processing
标志设置为 True
并调用 apply_async
,并将此符号的处理程序作为参数传递。
排队任务(即有效载荷)的计数得到维护,并且每次主任务将有效载荷写入 52 个处理程序队列之一时都会增加。指定为 apply_async
参数的工作函数采用其 handler 参数,并从中推断出需要处理的队列。对于它在队列中找到的每个有效负载,它都会调用处理程序的 feed
方法。然后它返回一个由更新的处理程序和从队列中删除的有效负载消息数组成的元组。 apply_async
方法的回调函数 (1) 更新 handlers
字典中的处理程序并 (2) 将相应符号的 processing
标志重置为 False
。最后,它根据已删除的有效负载消息的数量减少排队任务的数量。
当主进程在将负载入队后检查当前是否有进程正在运行此符号的处理程序并看到 processing
标志为 True
并且在此基础上不提交新任务通过 apply_async
,有一个小窗口,该工作人员已经完成处理其队列中的所有有效负载并且即将返回或已经返回并且回调函数尚未设置 {{1 }} 标记为 processing
。在这种情况下,有效载荷将在队列中处于未处理状态,直到从输入中读取并处理该符号的下一个有效载荷。但是如果该符号没有进一步的输入行,那么当所有任务完成后,我们将有未处理的有效载荷。但是我们也会有一个非零的排队任务计数,这表明我们遇到了这种情况。因此,与其尝试实现复杂的多处理同步协议,不如通过重新创建一个新池并检查 52 个队列中的每一个来检测这种情况并对其进行处理。
False
打印:
from multiprocessing import Pool,Queue
import time
from queue import Empty
from threading import Lock
# This class needs to be Pickle-able:
class Handler:
def __init__(self,symbol):
self.symbol = symbol
self.counter = 0
def feed(self,payload):
# For testing just increment counter by payload:
self.counter += payload
def init_pool(the_queue_dict):
global queue_dict
queue_dict = the_queue_dict
def worker(handler):
symbol = handler.symbol
q = queue_dict[symbol]
tasks_removed = 0
while True:
try:
payload = q.get_nowait()
handler.feed(payload)
tasks_removed += 1
except Empty:
break
# return updated handler:
return handler,tasks_removed
def callback_result(result):
global queued_tasks
global lock
handler,tasks_removed = result
# show done processing this symbol by updating handler state:
d = handlers[handler.symbol]
# The order of the next two statements matter:
d['handler'] = handler
d['processing'] = False
with lock:
queued_tasks -= tasks_removed
def main():
global handlers
global lock
global queued_tasks
symbols = [
'A','B','C','D','E','F','G','H','I','J','K','L','M','AA','BB','CC','DD','EE','FF','GG','HH','II','JJ','KK','LL','MM','a','b','c','d','e','f','g','h','i','j','k','l','m','aa','bb','cc','dd','ee','ff','gg','hh','ii','jj','kk','ll','mm'
]
queue_dict = {symbol: Queue() for symbol in symbols}
handlers = {symbol: {'processing': False,'handler': Handler(symbol)} for symbol in symbols}
lines = [
('A',1),('B',('C',('D',('E',('F',('G',('H',('I',('J',('K',('L',('M',('AA',('BB',('CC',('DD',('EE',('FF',('GG',('HH',('II',('JJ',('KK',('LL',('MM',('a',('b',('c',('d',('e',('f',('g',('h',('i',('j',('k',('l',('m',('aa',('bb',('cc',('dd',('ee',('ff',('gg',('hh',('ii',('jj',('kk',('ll',('mm',1)
]
def get_lines():
# Emulate 52_000 lines:
for _ in range(10_000):
for line in lines:
yield line
POOL_SIZE = 4
queued_tasks = 0
lock = Lock()
# Create pool of POOL_SIZE processes:
pool = Pool(POOL_SIZE,initializer=init_pool,initargs=(queue_dict,))
for symbol,payload in get_lines():
# Put some limit on memory utilization:
while queued_tasks > 10_000:
time.sleep(.001)
d = handlers[symbol]
q = queue_dict[symbol]
q.put(payload)
with lock:
queued_tasks += 1
if not d['processing']:
d['processing'] = True
handler = d['handler']
pool.apply_async(worker,args=(handler,),callback=callback_result)
# Wait for all tasks to complete
pool.close()
pool.join()
if queued_tasks:
# Re-create pool:
pool = Pool(POOL_SIZE,))
for d in handlers.values():
handler = d['handler']
d['processing'] = True
pool.apply_async(worker,callback=callback_result)
pool.close()
pool.join()
assert queued_tasks == 0
# Print results:
for d in handlers.values():
handler = d['handler']
print(handler.symbol,handler.counter)
if __name__ == "__main__":
main()
,
这远非唯一(甚至可能是“最佳”)方法,但根据我对您另一篇文章的评论,这里有一个让特定子进程处理特定“符号”的示例
from multiprocessing import Process,Queue
from queue import Empty
from math import ceil
class STOPFLAG: pass
class Handler:
def __init__(self,symbol):
self.counter = 0 #maintain some state for each "Handler"
self.symbol = symbol
def feed(self,payload):
self.counter += payload
return self.counter
class Worker(Process):
def __init__(self,out_q):
self.handlers = {}
self.in_q = Queue()
self.out_q = out_q
super().__init__()
def run(self):
while True:
try:
symbol = self.in_q.get(1)
except Empty:
pass #put break here if you always expect symbols to be available and a timeout "shouldn't" happen
else:
if isinstance(symbol,STOPFLAG):
#pass back the handlers with their now modified state
self.out_q.put(self.handlers)
break
else:
self.handlers[symbol[0]].feed(symbol[1])
def main():
n_workers = 4
# Just 8 for testing:
symbols = ['ABC','DEF','GHI','JKL','MNO','PQR','STU','VWX']
workers = []
out_q = Queue()
for i in range(n_workers):
workers.append(Worker(out_q))
symbol_worker_mapping = {}
for i,symbol in enumerate(symbols):
workers[i%n_workers].handlers[symbol] = Handler(symbol)
symbol_worker_mapping[symbol] = i%n_workers
for worker in workers: worker.start() #start processes
# Just a few for testing:
lines = [
('ABC',('DEF',('GHI',('JKL',('MNO',('PQR',('STU',('VWX',('ABC',]
#putting this loop in a thread could allow results to be collected while inputs are still being fed in.
for symbol,payload in lines: #feed in tasks
worker = workers[symbol_worker_mapping[symbol]] #select the correct worker
worker.in_q.put([symbol,payload]) #pass the inputs
results = [] #results are handler dicts from each worker
for worker in workers:
worker.in_q.put(STOPFLAG()) #Send stop signal to each worker
results.append(out_q.get()) #get results (may be out of order)
for worker in workers: worker.join() #cleanup
for result in results:
for symbol,handler in result.items():
print(symbol,handler.counter)
if __name__ == "__main__":
main()
每个子进程处理“符号”的一个子集,每个子进程都有自己的输入队列。这与正常的 pool
不同,其中每个孩子都是相同的,它们都共享一个输入队列,其中下一个可用的孩子总是接受下一个输入。然后,它们都将结果放入共享输出队列,返回到主进程。
一个完全不同的解决方案可能是在主进程中保持所有状态,为每个符号维护一个锁,并在将必要状态发送给工作进程时保持锁,直到收到结果,以及主进程更新。