将pathos与XML结合使用时出现酸洗错误

问题描述

我正在尝试将the multistream Wikipedia dump读入数据库。这是我尝试并行加载较小的块。这是脚本:

#!/usr/bin/python3
import xml.sax
from bz2 import BZ2File
import mwparserfromhell
import psycopg2
import pathos
import os
import dill


class XmlHandler(xml.sax.handler.ContentHandler):
    def __init__(self):
        xml.sax.handler.ContentHandler.__init__(self)
        self._buffer = None
        self._values = {}
        self._current_tag = None
        self._pages = []

    def get_pages(self):
        return self._pages

    def get_page_count(self):
        return len(self._pages)

    def get_values(self):
        return self._values

    def characters(self,content):
        if self._current_tag:
            self._buffer.append(content)

    def startElement(self,name,attrs):
        if name in ('title','text','infobox'):
            self._current_tag = name
            self._buffer = []

    def endElement(self,name):
        if name == self._current_tag:
            self._values[name] = ' '.join(self._buffer)

        if name == 'page':
            self.process_article()

    def process_article(self):
        wikicode = mwparserfromhell.parse(self._values['text'])
        infobox_array = wikicode.filter_templates(matches="infobox .*")
        infobox = str(infobox_array[0]) if len(infobox_array) > 0 else ""
        self._pages.append((self._values['title'],self._values['text'],infobox))


def load_xml(filename):
    wiki_handler = XmlHandler()
    wiki_parser = xml.sax.make_parser()
    wiki_parser.setContentHandler(wiki_handler)

    file = os.path.join("chunks",filename)
    print("I'm a worker process")
    cursor = conn.cursor()

    with BZ2File(file,'r') as f:
        for line in f:
            wiki_parser.feed(line)

        pages = wiki_handler.get_pages()

    for page in pages:
        cursor.execute("INSERT INTO pages (title,text,infobox) VALUES (%s,%s,%s) ON CONFLICT DO NOTHING",page)

    cursor.close()
    print("all done")


if __name__ == "__main__":
    conn = psycopg2.connect(dbname="wikipedia",user="postgres",password="postgres",host="localhost",port=5432)

    file_list = [f for f in os.listdir("chunks") if os.path.isfile(os.path.join("chunks",f))]
    pool = pathos.multiprocessing.ProcessingPool(processes=pathos.multiprocessing.cpu_count())
    pool.map(load_xml,file_list)

和追溯:

Traceback (most recent call last):
  File "./loader_parallel.py",line 114,in <module>
    pool.map(load_xml,file_list)
  File "/home/smh/.local/lib/python3.7/site-packages/multiprocess/pool.py",line 268,in map
    return self._map_async(func,iterable,mapstar,chunksize).get()
  File "/home/smh/.local/lib/python3.7/site-packages/multiprocess/pool.py",line 657,in get
    raise self._value
multiprocess.pool.MaybeEncodingError: Error sending result: 
'<multiprocess.pool.ExceptionWithTraceback object at 0x7f87ac0f4470>'. 
Reason: 'TypeError("can't pickle pyexpat.xmlparser objects")'

为什么不能腌制pyexpat.xmlparser对象?我该如何解决?我尝试通过运行dill.copy(XmlHandler())对其进行测试,并且这样做没有错误。

我通过Debian 10上运行Python 3.7的pip3安装了pathos,对此还很陌生,感谢您的帮助!

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)