为什么这个python多处理脚本在一段时间后会变慢?

script from this answer的基础上,我有以下场景:一个包含2500个大文本文件的文件夹(每个约55Mb),所有制表符分隔. Web日志,基本上.

我需要md5哈希每个文件的每一行中的第二个’列’,将修改后的文件保存在别处.源文件位于机械磁盘上​​,目标文件位于SSD上.

该脚本非常快速地处理前25个(左右)文件.然后它减慢了WAY.基于前25个文件,它应该在2分钟左右完成所有文件.但是,根据之后的表现,完成它们需要15分钟(左右).

它运行在具有32 Gb RAM的服务器上,任务管理器很少显示超过6 Gb的使用情况.我已经设置了启动6个进程,但核心上的CPU使用率很低,很少超过15%.

为什么这放慢了?读/写磁盘问题?垃圾收集器?坏代码?有关如何加快速度的任何想法?

这是脚本

import os

import multiprocessing
from multiprocessing import Process
import threading
import hashlib

class ThreadRunner(threading.Thread):
    """ This class represents a single instance of a running thread"""
    def __init__(self,fileset,filedirectory):
        threading.Thread.__init__(self)
        self.files_to_process = fileset
        self.filedir          = filedirectory

    def run(self):
        for current_file in self.files_to_process:

            # Open the current file as read only
            active_file_name = self.filedir + "/" + current_file
            output_file_name = "D:/hashed_data/" + "hashed_" + current_file

            active_file = open(active_file_name,"r")
            output_file = open(output_file_name,"ab+")

            for line in active_file:
                # Load the line,hash the username,save the line
                lineList = line.split("\t")

                if not lineList[1] == "-":
                    lineList[1] = hashlib.md5(lineList[1]).hexdigest()

                lineOut = '\t'.join(lineList)
                output_file.write(lineOut)

            # Always close files after you open them
            active_file.close()
            output_file.close()

            print "\nCompleted " + current_file

class ProcessRunner:
    """ This class represents a single instance of a running process """
    def runp(self,pid,numThreads,filedirectory):
        mythreads = []
        for tid in range(numThreads):
            th = ThreadRunner(fileset,filedirectory)
            mythreads.append(th) 
        for i in mythreads:
            i.start()
        for i in mythreads:
            i.join()

class ParallelExtractor:    
    def runInParallel(self,numProcesses,filedirectory):
        myprocs = []
        prunner = ProcessRunner()

        # Store the file names from that directory in a list that we can iterate
        file_names = os.listdir(filedirectory)

        file_sets = []
        for i in range(numProcesses):
            file_sets.append([])

        for index,name in enumerate(file_names):
            num = index % numProcesses
            file_sets[num].append(name)


        for pid in range(numProcesses):
            pr = Process(target=prunner.runp,args=(pid,file_sets[pid],filedirectory)) 
            myprocs.append(pr) 
        for i in myprocs:
            i.start()

        for i in myprocs:
            i.join()

if __name__ == '__main__':    

    file_directory = "E:/original_data"

    processes = 6
    threads   = 1

    extractor = ParallelExtractor()
    extractor.runInParallel(numProcesses=processes,numThreads=threads,filedirectory=file_directory)
最佳答案
散列是一项相对简单的任务,与旋转磁盘的速度相比,现代CPU速度非常快. i7上的快速基础测试显示,它可以使用MD5散列大约450 MB / s,使用SHA-1散布大约290 MB / s.相比之下,旋转盘具有约70-150MB / s的典型(顺序原始读取)速度.这意味着,即使忽略文件系统的开销和最终的磁盘搜索,CPU也可以将文件散列大约比磁盘读取速度快3倍.

处理第一个文件时可能会提高性能,因为操作系统会将第一个文件缓存在内存中,因此不会发生磁盘I / O.这可以通过以下任一方式确认:

>重新启动服务器,从而刷新缓存
>通过从磁盘读取足够大的文件,用其他东西填充缓存
>在处理第一个文件时仔细监听磁盘搜索的缺失

现在,由于散列文件的性能瓶颈是磁盘,因此在多个进程或线程中执行散列是没用的,因为它们都使用相同的磁盘.正如@Max Noel所提到的,它实际上可以降低性能,因为您将并行读取多个文件,因此您的磁盘必须在文件之间进行搜索.正如他所提到的,性能也将根据您正在使用的操作系统的I / O调度程序而有所不同.

现在,如果您仍在生成数据,那么您有一些可能的解决方案:

>使用更快的磁盘或SSD,如@Max Noel建议的那样.
>从多个磁盘读取 – 在不同的文件系统中或在RAID上的单个文件系统中读取
>在多台计算机上拆分任务(每台计算机有一个或多个磁盘)

但是,如果你想要做的就是散列这2500个文件并且你已经将它们放在一个磁盘上,那么这些解决方案就毫无用处.将它们从磁盘读取到其他磁盘然后执行散列更慢,因为您将读取文件两次,并且您可以尽可能快地读取它们.

最后,根据@yaccz的想法,如果安装了find,xargs和md5sum的cygwin二进制文件,我想你可以避免编写程序执行散列的麻烦.

相关文章

Python中的函数(二) 在上一篇文章中提到了Python中函数的定...
Python中的字符串 可能大多数人在学习C语言的时候,最先接触...
Python 面向对象编程(一) 虽然Python是解释性语言,但是它...
Python面向对象编程(二) 在前面一篇文章中谈到了类的基本定...
Python中的函数(一) 接触过C语言的朋友对函数这个词肯定非...
在windows下如何快速搭建web.py开发框架 用Python进行web开发...