多线程Python FS Crawler

我编写了一个 python函数,它使用提供的目录模式搜索文件系统,并在每个级别提供可选的“操作”.然后我尝试多线程,因为有些卷在网络共享上,我想最小化IO阻塞.我开始使用多处理池类,因为这是最方便的…(严重的是,没有用于线程的Pool类?)我的函数尽可能地解析提供的FS模式并将新返回的路径提交到池直到没有新的路径被退回.当我直接使用函数和类时,我得到了很好的工作,但现在我试图从另一个类使用这个函数,我的程序似乎挂起.为了简化我使用Threads而不是Processes重写了函数,甚至编写了一个简单的ThreadPool类……同样的问题.这是一个非常简化的代码版本,仍然会出现同样的问题:

file test1.py:
------------------------------------------------

import os
import glob
from multiprocessing import Pool

def mapGlob(pool,paths,pattern):
    results = []
    paths = [os.path.join(p,pattern) for p in paths]
    for result in pool.map(glob.glob,paths):
        results += result
    return results

def findAllMyPaths():
    pool = Pool(10)
    paths = ['/Volumes']
    follow = ['**','ptid_*','expid_*','slkid_*']
    for pattern in follow:
        paths = mapGlob(pool,pattern)
    return paths


file test2.py:
----------------------------------------------------------------------------

from test1 import findAllMyPaths

allmypaths = findAllMyPaths()

现在,如果我打电话

>>>from test1 import findAllMyPaths
>>>findAllMyPaths()
>>>...long list of all the paths

这工作正常,但如果尝试:

>>>from test2 import allmypaths

python永远挂起.调用动作函数(在本例中为glob),但它们似乎永远不会返回…
我需要帮助…平行版本在正常工作时运行得快得多(根据FS树中每个点的’动作’动作快6-20倍),所以我希望能够使用它.

如果我将映射函数更改为非并行版本:

def mapGlob(pool,pattern) for p in paths]
    for path in paths:
        results += glob.glob(path)
    return results

一切顺利.

编辑:

我打开多处理调试,看看是否可以帮助我.
在它工作的情况下,我得到:

[DEBUG/MainProcess] created semlock with handle 5
[DEBUG/MainProcess] created semlock with handle 6
[DEBUG/MainProcess] created semlock with handle 9
[DEBUG/MainProcess] created semlock with handle 10
[INFO/PoolWorker-1] child process calling self.run()
[INFO/PoolWorker-2] child process calling self.run()
[INFO/PoolWorker-3] child process calling self.run()
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-4] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[INFO/PoolWorker-7] child process calling self.run()
[INFO/PoolWorker-9] child process calling self.run()
[INFO/PoolWorker-8] child process calling self.run()
[INFO/PoolWorker-10] child process calling self.run()
[DEBUG/MainProcess] closing pool
[SUBDEBUG/MainProcess] finalizer calling <bound method type._terminate_pool of <class 'multiprocessing.pool.Pool'>> with args (<Queue.Queue instance at 0x34af918>,<multiprocessing.queues.SimpleQueue object at 0x3494950>,<multiprocessing.queues.SimpleQueue object at 0x34a61b0>,[<Process(PoolWorker-1,started daemon)>,<Process(PoolWorker-2,<Process(PoolWorker-3,<Process(PoolWorker-4,<Process(PoolWorker-5,<Process(PoolWorker-6,<Process(PoolWorker-7,<Process(PoolWorker-8,<Process(PoolWorker-9,<Process(PoolWorker-10,started daemon)>],<Thread(Thread-1,started daemon -1341648896)>,<Thread(Thread-2,started daemon -1341116416)>,{}) and kwargs {}
[DEBUG/MainProcess] finalizing pool
[DEBUG/MainProcess] helping task handler/workers to finish
[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] task handler got sentinel
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] task handler exiting
[DEBUG/MainProcess] result handler got sentinel
[DEBUG/MainProcess] ensuring that outqueue is not full
[DEBUG/MainProcess] result handler exiting: len(cache)=0,thread._state=0
[DEBUG/PoolWorker-2] worker got sentinel -- exiting
[DEBUG/PoolWorker-1] worker got sentinel -- exiting
[INFO/PoolWorker-2] process shutting down
[DEBUG/PoolWorker-7] worker got sentinel -- exiting
[INFO/PoolWorker-1] process shutting down
[INFO/PoolWorker-7] process shutting down
[DEBUG/PoolWorker-7] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-1] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-7] running the remaining "atexit" finalizers
[INFO/PoolWorker-7] process exiting with exitcode 0
[DEBUG/PoolWorker-1] running the remaining "atexit" finalizers
[INFO/PoolWorker-1] process exiting with exitcode 0
[DEBUG/PoolWorker-5] worker got sentinel -- exiting
[DEBUG/PoolWorker-2] running all "atexit" finalizers with priority >= 0
[INFO/PoolWorker-5] process shutting down
[DEBUG/PoolWorker-5] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-2] running the remaining "atexit" finalizers
[DEBUG/PoolWorker-5] running the remaining "atexit" finalizers
[INFO/PoolWorker-2] process exiting with exitcode 0
[INFO/PoolWorker-5] process exiting with exitcode 0
[DEBUG/PoolWorker-6] worker got sentinel -- exiting
[INFO/PoolWorker-6] process shutting down
[DEBUG/PoolWorker-6] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-6] running the remaining "atexit" finalizers
[INFO/PoolWorker-6] process exiting with exitcode 0
[DEBUG/PoolWorker-4] worker got sentinel -- exiting
[DEBUG/PoolWorker-9] worker got sentinel -- exiting
[INFO/PoolWorker-9] process shutting down
[DEBUG/PoolWorker-9] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-9] running the remaining "atexit" finalizers
[INFO/PoolWorker-9] process exiting with exitcode 0
[INFO/PoolWorker-4] process shutting down
[DEBUG/PoolWorker-4] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-4] running the remaining "atexit" finalizers
[INFO/PoolWorker-4] process exiting with exitcode 0
[DEBUG/PoolWorker-10] worker got sentinel -- exiting
[INFO/PoolWorker-10] process shutting down
[DEBUG/PoolWorker-10] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-10] running the remaining "atexit" finalizers
[INFO/PoolWorker-10] process exiting with exitcode 0
[DEBUG/PoolWorker-8] worker got sentinel -- exiting
[INFO/PoolWorker-8] process shutting down
[DEBUG/PoolWorker-8] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-8] running the remaining "atexit" finalizers
[INFO/PoolWorker-8] process exiting with exitcode 0
[DEBUG/PoolWorker-3] worker got sentinel -- exiting
[INFO/PoolWorker-3] process shutting down
[DEBUG/PoolWorker-3] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-3] running the remaining "atexit" finalizers
[INFO/PoolWorker-3] process exiting with exitcode 0
[DEBUG/MainProcess] terminating workers
[DEBUG/MainProcess] joining task handler
[DEBUG/MainProcess] joining result handler
[DEBUG/MainProcess] joining pool workers

当它不是我得到的全部是:

[DEBUG/MainProcess] created semlock with handle 6
[DEBUG/MainProcess] created semlock with handle 7
[DEBUG/MainProcess] created semlock with handle 10
[DEBUG/MainProcess] created semlock with handle 11
[INFO/PoolWorker-1] child process calling self.run()
[INFO/PoolWorker-2] child process calling self.run()
[INFO/PoolWorker-3] child process calling self.run()
[INFO/PoolWorker-8] child process calling self.run()
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-4] child process calling self.run()
[INFO/PoolWorker-9] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[INFO/PoolWorker-7] child process calling self.run()
[INFO/PoolWorker-10] child process calling self.run()

解决方法

不是一个完整的解决方案,但我找到了一种方法,使代码能够以任何形式运行:从解释器或作为运行脚本中的代码.我认为问题与多处理文档中的以下注释有关:

此包中的功能要求主方法可由子进程导入.这在编程指南中有所涉及,但值得在此指出.这意味着某些示例(例如multiprocessing.Pool示例)在交互式解释器中不起作用.

我不确定为什么存在这种限制,为什么我仍然有时可以使用交互式解释器中的池,有时候不会,但是哦……

为了解决这个问题,我在任何可能使用多处理的模块中执行以下操作:

import __main__
__SHOULD_MULTITHREAD__ = False
if hasattr(__main__,'__file__'):
    __SHOULD_MULTITHREAD__ = True

然后,该模块中的其余代码可以检查此标志以查看它是应该使用池还是仅执行而不进行并行化.这样做,我仍然可以在交互式解释器中使用和测试模块中的并行化函数,它们的运行速度要慢得多.

相关文章

HashMap是Java中最常用的集合类框架,也是Java语言中非常典型...
在EffectiveJava中的第 36条中建议 用 EnumSet 替代位字段,...
介绍 注解是JDK1.5版本开始引入的一个特性,用于对代码进行说...
介绍 LinkedList同时实现了List接口和Deque接口,也就是说它...
介绍 TreeSet和TreeMap在Java里有着相同的实现,前者仅仅是对...
HashMap为什么线程不安全 put的不安全 由于多线程对HashMap进...