Python中的多处理和多线程

问题描述

我有一个Python程序,该程序1)从磁盘中读取一个非常大的文件(大约95%的时间),然后2)处理并提供一个相对较小的输出(大约5%的时间)。该程序将在teraBytes的文件上运行。

现在,我正在寻求利用多处理和多线程优化此程序。我正在运行的平台是一个在虚拟机上具有4个处理器的虚拟机。

我计划有一个调度程序Process,它将执行4个Process(与处理器相同),然后每个Process应该都有一些线程,因为大部分是I / O。每个线程将处理1个文件,并将结果报告给主线程,而主线程又将通过IPC将其报告回调度程序Process。调度程序可以将它们排入队列,并最终以有序的方式将其写入磁盘

所以想知道如何确定要为这种情况创建的进程和线程数?有什么数学方法可以找出最佳组合。

谢谢

解决方法

我想我会将它与您正在做的事情相反。也就是说,我将创建一定大小的线程池,该线程池负责产生结果。提交给该池的任务将作为处理器池的参数传递,该处理器池可由工作线程用于提交工作的CPU约束部分。换句话说,线程池工作人员将主要执行所有与磁盘相关的操作,并将所有占用大量CPU的工作移交给处理器池。

处理器池的大小应仅为您环境中拥有的处理器数。很难为线程池提供精确的大小。它取决于收益递减规律发挥作用之前可以处理多少个并发磁盘操作。它还取决于您的内存:池越大,占用的内存资源就越大,尤其是在必须将整个文件读入内存进行处理的情况下。因此,您可能必须尝试使用​​此值。下面的代码概述了这些想法。从线程池中获得的收益是I / O操作的重叠,这比仅使用小型处理器池所获得的重叠更大:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from functools import partial
import os

def cpu_bound_function(arg1,arg2):
    ...
    return some_result



def io_bound_function(process_pool_executor,file_name):
    with open(file_name,'r') as f:
        # Do disk related operations:
        . . . # code omitted
        # Now we have to do a CPU-intensive operation:
        future = process_pool_executor.submit(cpu_bound_function,arg1,arg2)
        result = future.result() # get result
        return result
    
file_list = [file_1,file_2,file_n]
N_FILES = len(file_list)
MAX_THREADS = 50 # depends on your configuration on how well the I/O can be overlapped
N_THREADS = min(N_FILES,MAX_THREADS) # no point in creating more threds than required
N_PROCESSES = os.cpu_count() # use the number of processors you have

with ThreadPoolExecutor(N_THREADS) as thread_pool_executor:
    with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
        results = thread_pool_executor.map(partial(io_bound_function,process_pool_executor),file_list)

重要提示

另一种更简单的方法是只拥有一个处理器池,其大小比您拥有的CPU处理器数(例如25)大 。工作进程将同时执行I / O操作和CPU操作。即使您的进程比CPU多,但许多进程仍处于等待状态,等待I / O完成,从而允许运行CPU密集型工作。

此方法的缺点是创建N个进程的开销远远大于创建N个线程+少量进程的开销。但是,随着提交给池的任务的运行时间变得越来越大,则这种增加的开销逐渐减少了占总运行时间的百分比。因此,如果您的任务不那么琐碎,那么这可能是一个合理的简化。

更新:两种方法的基准

我对两种处理24个文件的基准进行了一些基准测试,这些文件的大小约为10,000KB(实际上,这是3个不同的文件,每个文件处理了8次,因此可能已经进行了一些缓存):

方法1(线程池+处理器池)

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from functools import partial
import os
from math import sqrt
import timeit


def cpu_bound_function(b):
    sum = 0.0
    for x in b:
        sum += sqrt(float(x))
    return sum

def io_bound_function(process_pool_executor,'rb') as f:
        b = f.read()
        future = process_pool_executor.submit(cpu_bound_function,b)
        result = future.result() # get result
        return result

def main():
    file_list = ['/download/httpd-2.4.16-win32-VC14.zip'] * 8 + ['/download/curlmanager-1.0.6-x64.exe'] * 8 + ['/download/Element_v2.8.0_UserManual_RevA.pdf'] * 8
    N_FILES = len(file_list)
    MAX_THREADS = 50 # depends on your configuration on how well the I/O can be overlapped
    N_THREADS = min(N_FILES,MAX_THREADS) # no point in creating more threds than required
    N_PROCESSES = os.cpu_count() # use the number of processors you have

    with ThreadPoolExecutor(N_THREADS) as thread_pool_executor:
        with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
            results = list(thread_pool_executor.map(partial(io_bound_function,file_list))
            print(results)

if __name__ == '__main__':
    print(timeit.timeit(stmt='main()',number=1,globals=globals()))

方法2(仅处理器池)

from concurrent.futures import ProcessPoolExecutor
from math import sqrt
import timeit


def cpu_bound_function(b):
    sum = 0.0
    for x in b:
        sum += sqrt(float(x))
    return sum

def io_bound_function(file_name):
    with open(file_name,'rb') as f:
        b = f.read()
        result = cpu_bound_function(b)
        return result

def main():
    file_list = ['/download/httpd-2.4.16-win32-VC14.zip'] * 8 + ['/download/curlmanager-1.0.6-x64.exe'] * 8 + ['/download/Element_v2.8.0_UserManual_RevA.pdf'] * 8
    N_FILES = len(file_list)
    MAX_PROCESSES = 50 # depends on your configuration on how well the I/O can be overlapped
    N_PROCESSES = min(N_FILES,MAX_PROCESSES) # no point in creating more threds than required

    with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
        results = list(process_pool_executor.map(io_bound_function,file_list))
        print(results)

if __name__ == '__main__':
    print(timeit.timeit(stmt='main()',globals=globals()))

结果:

(我有8个核心)

线程池+处理器池:13.5秒 仅处理器池:13.3秒

结论:我将首先尝试使用更简单的方法,即仅对所有内容使用处理器池。现在,最棘手的事情是确定要创建的最大进程数,这是您最初的问题的一部分,并且在做的所有事情都是CPU密集型计算时有一个简单的答案。如果您正在读取的文件数量不是太多,那很重要。每个文件可以有一个进程。但是,如果您有数百个文件,则您不希望池中有数百个进程(可以创建的进程数也有上限,并且同样存在那些讨厌的内存限制)。我无法给您确切的电话号码。如果确实有大量文件,请从较小的池大小开始,并不断增加,直到没有其他好处为止(当然,对于这些测试,您可能不希望处理的文件数量超过某个最大数目,否则您将永远运行只是为实际运行确定合适的池大小)。

,

对于并行处理: 我看到了this question,并引用了已接受的答案:

在实践中,可能很难找到最佳线程数,即使每次运行该程序,该线程数也可能会有所不同。因此,从理论上讲,最佳线程数将是计算机上具有的内核数。如果您的内核是“超线程”(如英特尔所说),则每个内核可以运行2个线程。然后,在这种情况下,最佳线程数是计算机上内核数的两倍。

对于多处理: 有人问了类似的问题here,被接受的答案是这样的:

如果所有线程/进程确实确实受CPU约束,则您应运行与CPU报告核心数量一样多的进程。由于超线程,每个物理CPU内核可能能够呈现多个虚拟内核。调用multiprocessing.cpu_count以获取虚拟核的数量。

如果只有1个线程中的p个受CPU限制,则可以乘以p来调整该数字。例如,如果一半的进程受CPU限制(p = 0.5),并且您有两个CPU,每个CPU具有4个内核,并且具有2个HyperThreading,则应该启动0.5 * 2 * 4 * 2 = 8个进程。

这里的关键是了解您正在使用的计算机,从中,您可以选择接近最佳数量的线程/进程来拆分代码的执行。我说几乎是最佳,因为每次运行脚本时,它都会略有不同,因此很难从数学的角度预测这个最佳数字。

对于您的特定情况,如果您的计算机具有4个内核,我建议您最多只创建4个线程,然后拆分它们:

  • 1进入主线程。
  • 3用于文件读取和处理。
,

使用多个进程来提高IO性能可能不是一个好主意,请查看下面的this和下面的sample code以查看是否有帮助

,

一个想法可以是让一个线程只读取文件(如果我很好理解,只有一个文件),并将独立的部分(例如行)推送到带有消息的队列中。
消息可以由4个线程处理。这样,您可以优化处理器之间的负载。

,

在受I / O约束的过程中(如您所描述的那样),您不一定需要多线程或多处理:您还可以在OS中使用更高级的I / O原语。

例如,在Linux上,您可以将读取请求与适当大小的可变缓冲区一起提交给内核,并在缓冲区被填满时得到通知。这可以使用AIO API完成,为此我编写了一个纯Python绑定:python-libaio(在pypi上为libaio),或者使用最新的io_uring API为此,似乎有一个CFFI python binding(在pypy上是liburing)(我既没有使用io_uring也没有使用此python绑定)。

这消除了您级别的并行处理的复杂性,可能减少了OS /用户环境上下文切换的数量(甚至进一步减少了CPU时间),并让OS知道您要做什么,从而为OS提供了更多信息。更有效率地调度IO的机会(在虚拟环境中,即使我自己没有尝试过,如果它减少了数据副本的数量,我也不会感到惊讶。

当然,缺点是您的程序将与在其上执行该程序的操作系统更紧密地绑定在一起,需要更多的精力才能使其在另一个程序上运行。