问题描述
我在并行化读取一些文件并写入 neo4j 的代码时遇到问题。
- 我正在使用 dask 并行化 process_language_files 函数(从底部算起的第三个单元格)。
- 我尝试解释下面的代码,列出函数(前 3 个单元格)。
- 错误打印在最后(最后 2 个单元格)。
- 我还在最后列出了环境和软件包版本。
如果我删除 dask.delayed 并按顺序运行此代码,它运行得非常好。
感谢您的帮助。 :)
================================================ ============================
一些与 neo4j 配合使用的函数。
from neo4j import GraphDatabase
from tqdm import tqdm
def get_driver(uri_scheme='bolt',host='localhost',port='7687',username='neo4j',password=''):
"""Get a neo4j driver."""
connection_uri = "{uri_scheme}://{host}:{port}".format(uri_scheme=uri_scheme,host=host,port=port)
auth = (username,password)
driver = GraphDatabase.driver(connection_uri,auth=auth)
return driver
def format_raw_res(raw_res):
"""Parse neo4j results"""
res = []
for r in raw_res:
res.append(r)
return res
def run_bulk_query(query_list,driver):
"""Run a list of neo4j queries in a session."""
results = []
with driver.session() as session:
for query in tqdm(query_list):
raw_res = session.run(query)
res = format_raw_res(raw_res)
results.append({'query':query,'result':res})
return results
global_driver = get_driver(uri_scheme='bolt',port='8687',password='abc123') # neo4j driver object.=
from dask.distributed import Client
client = Client(threads_per_worker=4,n_workers=1)
import sys
import time
import json
import pandas as pd
import dask
def add_nodes(nodes_list,language_code):
"""Returns a list of strings. Each string is a cypher query to add a node to neo4j."""
list_of_create_strings = []
create_string_template = """CREATE (:LABEL {{node_id:{node_id}}})"""
for index,node in nodes_list.iterrows():
create_string = create_string_template.format(node_id=node['new_id'])
list_of_create_strings.append(create_string)
return list_of_create_strings
def add_relations(relations_list,language_code):
"""Returns a list of strings. Each string is a cypher query to add a relationship to neo4j."""
list_of_create_strings = []
create_string_template = """
MATCH (a),(b) WHERE a.node_id = {source} AND b.node_id = {target}
MERGE (a)-[r:KNowS {{ relationship_id:{edge_id} }}]-(b)"""
for index,relations in relations_list.iterrows():
create_string = create_string_template.format(
source=relations['from'],target=relations['to'],edge_id=''+str(relations['from'])+'-'+str(relations['to']))
list_of_create_strings.append(create_string)
return list_of_create_strings
def add_data(language_code,edges,features,targets,driver):
"""Add nodes and relationships to neo4j"""
add_nodes_cypher = add_nodes(targets,language_code) # Returns a list of strings. Each string is a cypher query to add a node to neo4j.
node_results = run_bulk_query(add_nodes_cypher,driver) # Runs each string in the above list in a neo4j session.
add_relations_cypher = add_relations(edges,language_code) # Returns a list of strings. Each string is a cypher query to add a relationship to neo4j.
relations_results = run_bulk_query(add_relations_cypher,driver) # Runs each string in the above list in a neo4j session.
# Saving some Metadata
results = {
"nodes": {"results": node_results,"length":len(add_nodes_cypher),},"relations": {"results": relations_results,"length":len(add_relations_cypher),}
return results
def load_data(language_code):
"""Load data from files"""
# Saving file names to variables
edges_filename = './edges.csv'
features_filename = './features.json'
target_filename = './target.csv'
# Loading data from the file names
edges = helper.read_csv(edges_filename)
features = helper.read_json(features_filename)
targets = helper.read_csv(target_filename)
# Saving some Metadata
results = {
"edges": {"length":len(edges),"features": {"length":len(features),"targets": {"length":len(targets),}
return edges,results
主要代码。
def process_language_files(process_language_files,driver):
"""Reads files,creates cypher queries to add nodes and relationships,runs cypher query in a neo4j session."""
edges,reading_results = load_data(language_code) # Read files.
writing_results = add_data(language_code,driver) # Convert files nodes and relationships and add to neo4j in a neo4j session.
return {"reading_results": reading_results,"writing_results": writing_results} # Return some Metadata
# Execution starts here
res=[]
for index,language_code in enumerate(['ENGLISH','french']):
lazy_result = dask.delayed(process_language_files)(language_code,global_driver)
res.append(lazy_result)
结果来自 res。这些是 dask 延迟对象。
print(*res)
Delayed('process_language_files-a73f4a9d-6ffa-4295-8803-7fe09849c068') Delayed('process_language_files-c88fbd4f-e8c1-40c0-b143-eda41a209862')
错误。即使使用 dask.compute(),我也会遇到类似的错误。
futures = dask.persist(*res)
AttributeError Traceback (most recent call last)
~/Code/miniconda3/envs/MVDS/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x,buffer_callback,protocol)
48 buffers.clear()
---> 49 result = pickle.dumps(x,**dump_kwargs)
50 if len(result) < 1000:
AttributeError: Can't pickle local object 'BoltPool.open.<locals>.opener
================================================ ============================
#名称 | 版本 | 构建 | 频道 |
---|---|---|---|
黑夜 | 2020.12.0 | pyhd8ed1ab_0 | conda-forge |
jupyterlab | 3.0.3 | pyhd8ed1ab_0 | conda-forge |
neo4j-python-driver | 4.2.1 | pyh7fcb38b_0 | conda-forge |
蟒蛇 | 3.9.1 | hdb3f193_2 |
解决方法
您收到此错误是因为您试图在您的工作人员之间共享驱动程序对象。
驱动程序对象包含有关连接的私有数据,这些数据在进程之外没有意义(也不能序列化)。
这就像尝试在某处打开文件并在其他地方共享文件描述符。 它不会起作用,因为文件编号仅在生成它的过程中才有意义。
如果您希望您的工作人员访问数据库或任何其他网络资源,您应该向他们提供连接到资源的指示。
在您的情况下,您不应将 global_driver
作为参数传递,而是将连接参数作为参数传递,并让每个工作人员调用 get_driver
以获取自己的驱动程序。