在 PYTHON 中使用 DASK 读取文件并写入 NEO4J

问题描述

我在并行化读取一些文件并写入 neo4j 的代码时遇到问题。

  • 我正在使用 dask 并行化 process_language_files 函数(从底部算起的第三个单元格)。
  • 我尝试解释下面的代码,列出函数(前 3 个单元格)。
  • 错误打印在最后(最后 2 个单元格)。
  • 我还在最后列出了环境和软件包版本。

如果我删除 dask.delayed 并按顺序运行此代码,它运行得非常好。

感谢您的帮助。 :)

================================================ ============================

一些与 neo4j 配合使用的函数

from neo4j import GraphDatabase
from tqdm import tqdm

def get_driver(uri_scheme='bolt',host='localhost',port='7687',username='neo4j',password=''):
"""Get a neo4j driver."""
    connection_uri = "{uri_scheme}://{host}:{port}".format(uri_scheme=uri_scheme,host=host,port=port)
    auth = (username,password)
    driver = GraphDatabase.driver(connection_uri,auth=auth)
    return driver

def format_raw_res(raw_res):
"""Parse neo4j results"""
    res = []
    for r in raw_res:
        res.append(r)
    return res

def run_bulk_query(query_list,driver):
"""Run a list of neo4j queries in a session."""
    results = []
    with driver.session() as session:
        for query in tqdm(query_list):
            raw_res = session.run(query)
            res = format_raw_res(raw_res)
            results.append({'query':query,'result':res})
    return results

global_driver = get_driver(uri_scheme='bolt',port='8687',password='abc123')  # neo4j driver object.=

这就是我们如何创建一个 dask 客户端来并行化。

from dask.distributed import Client
client = Client(threads_per_worker=4,n_workers=1)

代码调用函数

import sys
import time
import json

import pandas as pd

import dask

def add_nodes(nodes_list,language_code):
"""Returns a list of strings. Each string is a cypher query to add a node to neo4j."""
    list_of_create_strings = []
    create_string_template = """CREATE (:LABEL {{node_id:{node_id}}})"""

    for index,node in nodes_list.iterrows():
        create_string = create_string_template.format(node_id=node['new_id'])
        list_of_create_strings.append(create_string)

    return list_of_create_strings
        
def add_relations(relations_list,language_code):
"""Returns a list of strings. Each string is a cypher query to add a relationship to neo4j."""
    list_of_create_strings = []
    create_string_template = """
        MATCH (a),(b) WHERE a.node_id = {source} AND b.node_id = {target} 
        MERGE (a)-[r:KNowS {{ relationship_id:{edge_id} }}]-(b)"""

    for index,relations in relations_list.iterrows():
        create_string = create_string_template.format(
            source=relations['from'],target=relations['to'],edge_id=''+str(relations['from'])+'-'+str(relations['to']))
        list_of_create_strings.append(create_string)

    return list_of_create_strings


def add_data(language_code,edges,features,targets,driver):
"""Add nodes and relationships to neo4j"""
    add_nodes_cypher = add_nodes(targets,language_code)  # Returns a list of strings. Each string is a cypher query to add a node to neo4j.
    node_results = run_bulk_query(add_nodes_cypher,driver)  # Runs each string in the above list in a neo4j session.


    add_relations_cypher = add_relations(edges,language_code)  # Returns a list of strings. Each string is a cypher query to add a relationship to neo4j.
    relations_results = run_bulk_query(add_relations_cypher,driver)  # Runs each string in the above list in a neo4j session.    
    
    # Saving some Metadata
    results = {
        "nodes": {"results": node_results,"length":len(add_nodes_cypher),},"relations": {"results": relations_results,"length":len(add_relations_cypher),}
    return results


def load_data(language_code):
"""Load data from files"""
    # Saving file names to variables
    edges_filename = './edges.csv'
    features_filename = './features.json'
    target_filename = './target.csv'
    
    # Loading data from the file names
    edges = helper.read_csv(edges_filename)
    features = helper.read_json(features_filename)
    targets = helper.read_csv(target_filename)
    
    # Saving some Metadata
    results = {
        "edges": {"length":len(edges),"features": {"length":len(features),"targets": {"length":len(targets),}
    return edges,results

主要代码

def process_language_files(process_language_files,driver):
"""Reads files,creates cypher queries to add nodes and relationships,runs cypher query in a neo4j session."""
    edges,reading_results = load_data(language_code)  # Read files.

    writing_results = add_data(language_code,driver)  # Convert files nodes and relationships and add to neo4j in a neo4j session.
    
    return {"reading_results": reading_results,"writing_results": writing_results}  # Return some Metadata


# Execution starts here
res=[]
for index,language_code in enumerate(['ENGLISH','french']):
    
    lazy_result = dask.delayed(process_language_files)(language_code,global_driver)
    res.append(lazy_result)

结果来自 res。这些是 dask 延迟对象。

print(*res)
Delayed('process_language_files-a73f4a9d-6ffa-4295-8803-7fe09849c068') Delayed('process_language_files-c88fbd4f-e8c1-40c0-b143-eda41a209862')

错误。即使使用 dask.compute(),我也会遇到类似的错误

futures = dask.persist(*res)
AttributeError                            Traceback (most recent call last)
~/Code/miniconda3/envs/MVDS/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x,buffer_callback,protocol)
     48         buffers.clear()
---> 49         result = pickle.dumps(x,**dump_kwargs)
     50         if len(result) < 1000:

AttributeError: Can't pickle local object 'BoltPool.open.<locals>.opener

================================================ ============================

#名称 版本 构建 频道
黑夜 2020.12.0 pyhd8ed1ab_0 conda-forge
jupyterlab 3.0.3 pyhd8ed1ab_0 conda-forge
neo4j-python-driver 4.2.1 pyh7fcb38b_0 conda-forge
蟒蛇 3.9.1 hdb3f193_2

解决方法

您收到此错误是因为您试图在您的工作人员之间共享驱动程序对象。

驱动程序对象包含有关连接的私有数据,这些数据在进程之外没有意义(也不能序列化)。

这就像尝试在某处打开文件并在其他地方共享文件描述符。 它不会起作用,因为文件编号仅在生成它的过程中才有意义。

如果您希望您的工作人员访问数据库或任何其他网络资源,您应该向他们提供连接到资源的指示。

在您的情况下,您不应将 global_driver 作为参数传递,而是将连接参数作为参数传递,并让每个工作人员调用 get_driver 以获取自己的驱动程序。