问题描述
我正在尝试使用spaCy和Pandas UDF(PySpark)进行实体提取,但出现错误。
使用UDF可以正常工作,但速度很慢。我在做什么错了?
每次加载模型都是为了避免加载错误- Can't find model 'en_core_web_lg'. It doesn't seem to be a shortcut link,a Python package or a valid path to a data directory.
工作的UDF:
def __get_entities(x):
global nlp
nlp = spacy.load("en_core_web_lg")
ents=[]
doc = nlp(x)
for ent in doc.ents:
if ent.label_ == 'PERSON' OR ent.label_ == 'ORG':
ents.append(ent.label_)
return ents
get_entities_udf = F.udf(__get_entities),T.ArrayType(T.StringType()))
熊猫UDF错误:
def __get_entities(x):
global nlp
nlp = spacy.load("en_core_web_lg")
ents=[]
doc = nlp(x)
for ent in doc.ents:
if ent.label_ == 'PERSON' OR ent.label_ == 'ORG':
ents.append(ent.label_)
return pd.Series(ents)
get_entities_udf = F.pandas_udf(lambda x: __get_entities(x),"array<string>",F.PandasUDFType.SCALAR)
错误消息:
TypeError: Argument 'string'has incorrect type (expected str,got series)
示例Spark DataFrame:
df = spark.createDataFrame([
['John Doe'],['Jane Doe'],['Microsoft Corporation'],['Apple Inc.'],]).toDF("name",)
新列:
df_new = df.withColumn('entity',get_entities_udf('name'))
解决方法
您需要将输入显示为pd.Series
,而不是单个值
我可以通过重构代码来使其工作。请注意x.apply
调用是特定于熊猫的,并将函数应用于pd.Series
。
def entities(x):
global nlp
import spacy
nlp = spacy.load("en_core_web_lg")
ents=[]
doc = nlp(x)
for ent in doc.ents:
if ent.label_ == 'PERSON' or ent.label_ == 'ORG':
ents.append(ent.label_)
return ents
def __get_entities(x):
return x.apply(entities)
get_entities_udf = pandas_udf(lambda x: __get_entities(x),"array<string>",PandasUDFType.SCALAR)
df_new = df.withColumn('entity',get_entities_udf('name'))
df_new.show()
+--------------------+--------+
| name| entity|
+--------------------+--------+
| John Doe|[PERSON]|
| Jane Doe|[PERSON]|
|Microsoft Corpora...| [ORG]|
| Apple Inc.| [ORG]|
+--------------------+--------+
,
我正在使用:pyspark 3.1.1 和 python 3.7
上面的答案对我不起作用,我花了相当多的时间让事情发挥作用,所以我想我会分享我想出的解决方案。
设置
创建一个包含 16 个随机人物和公司名称的样本
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType,ArrayType
from pyspark.sql.functions import pandas_udf,PandasUDFType
from faker import Faker
import spacy
spark = SparkSession.builder.appName("pyspark_sandbox").getOrCreate()
names = []
fake = Faker()
for _ in range(8):
names.append(f"{fake.company()} {fake.company_suffix()}")
names.append(fake.name())
df = spark.createDataFrame(names,StringType())
原来如此
首先,检查当前提出的解决方案。我只是在加载 spacy 模型时添加一个打印语句,以查看我们加载模型的次数。
# printing a msg each time we load the model
def load_spacy_model():
print("Loading spacy model...")
return spacy.load("en_core_web_sm")
def entities(x):
global nlp
import spacy
nlp = load_spacy_model()
ents=[]
doc = nlp(x)
for ent in doc.ents:
if ent.label_ == 'PERSON' or ent.label_ == 'ORG':
ents.append(ent.label_)
return ents
def __get_entities(x):
return x.apply(entities)
get_entities_udf = pandas_udf(lambda x: __get_entities(x),get_entities_udf('value'))
df_new.show()
然后我们可以看到模型加载了16 次,因此我们处理的每个条目都加载了一次。不是我想要的。
批处理
使用 spark 3.0+ 中引入的装饰器重写,即使用类型提示 (python 3.6+)。然后我们的 UDF 使用 nlp.pipe() 对整个 pd.Series 进行批处理
# printing a msg each time we load the model
def load_spacy_model():
print("Loading spacy model...")
return spacy.load("en_core_web_sm")
# decorator indicating that this function is pandas_udf
# and that it's gonna process list of string
@pandas_udf(ArrayType(StringType()))
# function receiving a pd.Series and returning a pd.Series
def entities(list_of_text: pd.Series) -> pd.Series:
global nlp
nlp = load_spacy_model()
docs = nlp.pipe(list_of_text)
# retrieving the str representation of entity label
# as we are limited in the types of obj
# we can return from a panda_udf
# we couldn't return a Span obj for example
ents=[
[ent.label_ for ent in doc.ents]
for doc in docs
]
return pd.Series(ents)
df_new = df.withColumn('entity',entities('value'))
df_new.show()
就我而言,模型加载了 4 次,效果更好。每次创建一个 python worker 来处理一个批处理。所以这个数字将取决于 Spark 使用了多少个内核,但在我的情况下更重要:我们的数据有多少分区。所以还没有达到最佳状态
广播nlp
对象
# printing a msg each time we load the model
def load_spacy_model():
print("Loading spacy model...")
return spacy.load("en_core_web_sm")
@pandas_udf(ArrayType(StringType()))
def entities(list_of_text: pd.Series) -> pd.Series:
nlp = boardcasted_nlp.value
docs = nlp.pipe(list_of_text)
# retrieving the str representation of entity label
# as we are limited in the types of obj
# we can return from a panda_udf
# we couldn't return a Span obj for example
ents=[
[ent.label_ for ent in doc.ents]
for doc in docs
]
return pd.Series(ents)
boardcasted_nlp = spark.sparkContext.broadcast(load_spacy_model())
df_new = df.withColumn('entity',entities('value'))
df_new.show()
现在模型只加载一次,然后广播给每个生成的 python 工作线程。
完整代码
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType,PandasUDFType
from faker import Faker
import spacy
spark = SparkSession.builder.appName("pyspark_sandbox").getOrCreate()
# creating our set of fake person and company names
names = []
fake = Faker()
for _ in range(8):
names.append(f"{fake.company()} {fake.company_suffix()}")
names.append(fake.name())
df = spark.createDataFrame(names,StringType())
# printing a msg each time we load the model
def load_spacy_model():
print("Loading spacy model...")
return spacy.load("en_core_web_sm")
# decorator indicating that this function is pandas_udf
# and that it's gonna process list of string
@pandas_udf(ArrayType(StringType()))
# function receiving a pd.Series and returning a pd.Series
def entities(list_of_text: pd.Series) -> pd.Series:
# retrieving the shared nlp object
nlp = boardcasted_nlp.value
# batch processing our list of text
docs = nlp.pipe(list_of_text)
# retrieving the str representation of entity label
# as we are limited in the types of obj
# we can return from a panda_udf
# we couldn't return a Span obj for example
ents=[
[ent.label_ for ent in doc.ents]
for doc in docs
]
return pd.Series(ents)
# we load the spacy model and broadcast it
boardcasted_nlp = spark.sparkContext.broadcast(load_spacy_model())
df_new = df.withColumn('entity',entities('value'))
df_new.show(truncate=False)
结果
+----------------------------------+--------------------------------+
|value |entity |
+----------------------------------+--------------------------------+
|Ferguson,Price and Green Ltd |[ORG,ORG,ORG] |
|Cassandra Goodman MD |[PERSON] |
|Solis Ltd LLC |[ORG] |
|Laurie Foster |[PERSON] |
|Lane-Vasquez Group |[ORG] |
|Matthew Wright |[PERSON] |
|Scott,Pugh and Rodriguez and Sons|[PERSON,PERSON,PERSON]|
|Tina Cooke |[PERSON] |
|Watkins,Blake and Foster Ltd |[ORG] |
|Charles Reyes |[PERSON] |
|Cooper,Norris and Roberts PLC |[ORG] |
|Michael Tate |[PERSON] |
|Powell,Lawson and Perez and Sons |[PERSON,PERSON]|
|James Wolf PhD |[PERSON] |
|Greer-Swanson PLC |[ORG] |
|Nicholas Hale |[PERSON] |
+----------------------------------+--------------------------------+