问题描述
我正在尝试优化下面的代码(PySpark UDF)。
它给我想要的结果(基于我的数据集),但是对于非常大的数据集(约180M)来说太慢了。 结果(准确性)优于可用的python模块(例如geotext,hdx-python-country)。因此,我不需要其他模块。
DataFrame:
df = spark.createDataFrame([
["3030 Whispering Pines Circle,Prosper Texas,US","John"],["Kalverstraat Amsterdam","Mary"],["Kalverstraat Amsterdam,Netherlands","Lex"]
]).toDF("address","name")
regex.csv
iso2;keywords
US;\bArizona\b
US;\bTexas\b
US;\bFlorida\b
US;\bChicago\b
US;\bAmsterdam\b
US;\bProsper\b
US;\bUS$
CA;\bAlberta\b
CA;\bNova Scotia\b
CA;\bNova Scotia\b
CA;\bWhitehorse\b
CA;\bCA$
NL;\bAmsterdam\b
NL;\Netherlands\b
NL;\bNL$
......<many,many more>
从regex.csv
创建一个熊猫数据帧,按iso2
分组,然后加入keywords
(\bArizona\b|\bTexas\b\bFlorida\b|\bUS$
)。
df = pd.read_csv(regex.csv,sep=';')
df_regex = df.groupby('iso2').agg({'keywords': '|'.join }).reset_index()
:
def get_iso2(x):
iso2={}
for j,row in df_regex.iterrows():
regex = re.compile(row['keywords'],re.I|re.M)
matches = re.finditer(regex,x)
for m in matches:
iso2[row['iso2']] = iso2.get(row['iso2'],0) + 1
return [key for key,value in iso2.items() for _ in range(value)]
PySpark UDF:
get_iso2_udf = F.udf(get_iso2,T.ArrayType(T.StringType()))
创建新列:
df_new = df.withColumn('iso2',get_iso2_udf('address')
预期的示例输出:
[US,US,NL]
[CA]
[BE,BE,AU]
一些地方发生在一个以上的国家(输入是带有城市,省,州,国家...的地址列)
示例:
美国得克萨斯州Prosper的3030年低语松树圈-> [US,US]
阿姆斯特丹Kalverstraat-> [US,NL]
Kalverstraat荷兰阿姆斯特丹-> [US,NL,NL]
也许可以在PySpark中使用Scala UDF,但是我不知道如何。
高度赞赏您的优化建议!
解决方法
IIUC,您可以不使用UDF尝试以下步骤:
from pyspark.sql.functions import expr,first,collect_list,broadcast,monotonically_increasing_id,flatten
import pandas as pd
df = spark.createDataFrame([
["3030 Whispering Pines Circle,Prosper Texas,US","John"],["Kalverstraat Amsterdam","Mary"],["Kalverstraat Amsterdam,Netherlands","Lex"],["xvcv","ddd"]
]).toDF("address","name")
步骤1::将df_regex转换为Spark数据帧df1
,并将唯一ID添加到df
。
df_regex = pd.read_csv("file:///path/to/regex.csv",sep=";")
# adjust keywords to uppercase except chars preceded with backslash:
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)',lambda m: m.group(1) + m.group(2).upper())
# create regex patterns:
df_regex = df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)}).reset_index()
df1 = spark.createDataFrame(df_regex)
df1.show(truncate=False)
+----+---------------------------------------------------------------------------------+
|iso2|keywords |
+----+---------------------------------------------------------------------------------+
|CA |(?m)\bALBERTA\b|\bNOVA SCOTIA\b|\bWHITEHORSE\b|\bCA$ |
|NL |(?m)\bAMSTERDAM\b|\bNETHERLANDS\b|\bNL$ |
|US |(?m)\bARIZONA\b|\bTEXAS\b|\bFLORIDA\b|\bCHICAGO\b|\bAMSTERDAM\b|\bPROSPER\b|\bUS$|
+----+---------------------------------------------------------------------------------+
df = df.withColumn('id',monotonically_increasing_id())
df.show(truncate=False)
+-----------------------------------------------+----+---+
|address |name|id |
+-----------------------------------------------+----+---+
|3030 Whispering Pines Circle,US|John|0 |
|Kalverstraat Amsterdam |Mary|1 |
|Kalverstraat Amsterdam,Netherlands |Lex |2 |
|xvcv |ddd |3 |
+-----------------------------------------------+----+---+
第2步::使用rlike将df_regex与df左连接
df2 = df.alias('d1').join(broadcast(df1.alias('d2')),expr("upper(d1.address) rlike d2.keywords"),"left")
df2.show()
+--------------------+----+---+----+--------------------+
| address|name| id|iso2| keywords|
+--------------------+----+---+----+--------------------+
|3030 Whispering P...|John| 0| US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...|Mary| 1| NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...|Mary| 1| US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...| Lex| 2| NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...| Lex| 2| US|(?m)\bARIZONA\b|\...|
| xvcv| ddd| 3|null| null|
+--------------------+----+---+----+--------------------+
第3步:通过将d2.keywords
除以d1.address
来计算d1.address
中匹配的d2.keywords
的数量,然后减小结果数组按1:
df3 = df2.withColumn('num_matches',expr("size(split(upper(d1.address),d2.keywords))-1"))
+--------------------+----+---+----+--------------------+-----------+
| address|name| id|iso2| keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John| 0| US|(?m)\bARIZONA\b|\...| 3|
|Kalverstraat Amst...|Mary| 1| NL|(?m)\bAMSTERDAM\b...| 1|
|Kalverstraat Amst...|Mary| 1| US|(?m)\bARIZONA\b|\...| 1|
|Kalverstraat Amst...| Lex| 2| NL|(?m)\bAMSTERDAM\b...| 2|
|Kalverstraat Amst...| Lex| 2| US|(?m)\bARIZONA\b|\...| 1|
| xvcv| ddd| 3|null| null| -2|
+--------------------+----+---+----+--------------------+-----------+
第4步:使用array_repeat重复iso2
num_matches
次的值(需要 Spark 2.4 + ):
df4 = df3.withColumn("iso2",expr("array_repeat(iso2,num_matches)"))
+--------------------+----+---+------------+--------------------+-----------+
| address|name| id| iso2| keywords|num_matches|
+--------------------+----+---+------------+--------------------+-----------+
|3030 Whispering P...|John| 0|[US,US,US]|(?m)\bARIZONA\b|\...| 3|
|Kalverstraat Amst...|Mary| 1| [NL]|(?m)\bAMSTERDAM\b...| 1|
|Kalverstraat Amst...|Mary| 1| [US]|(?m)\bARIZONA\b|\...| 1|
|Kalverstraat Amst...| Lex| 2| [NL,NL]|(?m)\bAMSTERDAM\b...| 2|
|Kalverstraat Amst...| Lex| 2| [US]|(?m)\bARIZONA\b|\...| 1|
| xvcv| ddd| 3| []| null| -2|
+--------------------+----+---+------------+--------------------+-----------+
步骤5:分组并进行汇总:
df_new = df4 \
.groupby('id') \
.agg(
first('address').alias('address'),first('name').alias('name'),flatten(collect_list('iso2')).alias('countries')
)
+---+--------------------+----+------------+
| id| address|name| countries|
+---+--------------------+----+------------+
| 0|3030 Whispering P...|John|[US,US]|
| 1|Kalverstraat Amst...|Mary| [NL,US]|
| 3| xvcv| ddd| []|
| 2|Kalverstraat Amst...| Lex|[NL,NL,US]|
+---+--------------------+----+------------+
替代方法:熊猫UDF也可以执行步骤3:
from pyspark.sql.functions import pandas_udf,PandasUDFType
from pandas import Series
import re
@pandas_udf("int",PandasUDFType.SCALAR)
def get_num_matches(addr,ptn):
return Series([ 0 if p is None else len(re.findall(p,s)) for p,s in zip(ptn,addr) ])
df3 = df2.withColumn("num_matches",get_num_matches(expr('upper(address)'),'keywords'))
+--------------------+----+---+----+--------------------+-----------+
| address|name| id|iso2| keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John| 0| US|(?m)\bARIZONA\b|\...| 3|
|Kalverstraat Amst...|Mary| 1| NL|(?m)\bAMSTERDAM\b...| 1|
|Kalverstraat Amst...|Mary| 1| US|(?m)\bARIZONA\b|\...| 1|
|Kalverstraat Amst...| Lex| 2| NL|(?m)\bAMSTERDAM\b...| 2|
|Kalverstraat Amst...| Lex| 2| US|(?m)\bARIZONA\b|\...| 1|
| xvcv| ddd| 3|null| null| 0|
+--------------------+----+---+----+--------------------+-----------+
注释:
- 由于不区分大小写的模式匹配非常昂贵,因此我们转换了所有关键字字符(锚点或转义字符,例如
\b
,\B
,\A
,\z
)到大写。 - 提醒一下,
rlike
和regexp_replace
中使用的模式是基于Java的,而在pandas_udf中,它是基于Python的,在regex.csv中设置模式时可能会有细微的差异。
方法2:使用pandas_udf
由于使用join和groupby触发数据改组,因此上述方法可能很慢。仅是一种测试选项:
df_regex = pd.read_csv("file:///path/to/regex.csv",sep=";")
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)',lambda m: m.group(1) + m.group(2).upper())
df_ptn = spark.sparkContext.broadcast(
df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)})["keywords"].to_dict()
)
df_ptn.value
#{'CA': '(?m)\\bALBERTA\\b|\\bNOVA SCOTIA\\b|\\bNOVA SCOTIA\\b|\\bWHITEHORSE\\b|\\bCA$',# 'NL': '(?m)\\bAMSTERDAM\\b|\\bNETHERLANDS\\b|\\bNL$',# 'US': '(?m)\\bARIZONA\\b|\\bTEXAS\\b|\\bFLORIDA\\b|\\bCHICAGO\\b|\\bAMSTERDAM\\b|\\bPROSPER\\b|\\bUS$'}
# REF: https://stackoverflow.com/questions/952914/how-to-make-a-flat-list-out-of-list-of-lists
from operator import iconcat
from functools import reduce
from pandas import Series
from pyspark.sql.functions import pandas_udf,PandasUDFType,flatten
def __get_iso2(addr,ptn):
return Series([ reduce(iconcat,[[k]*len(re.findall(v,s)) for k,v in ptn.value.items()]) for s in addr ])
get_iso2 = pandas_udf(lambda x:__get_iso2(x,df_ptn),"array<string>",PandasUDFType.SCALAR)
df.withColumn('iso2',get_iso2(expr("upper(address)"))).show()
+--------------------+----+---+------------+
| address|name| id| iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John| 0|[US,US]|
|Kalverstraat Amst...|Mary| 1| [NL,US]|
|Kalverstraat Amst...| Lex| 2|[NL,US]|
| xvcv| ddd| 3| []|
+--------------------+----+---+------------+
或在pandas_udf中返回一个数组数组(不带reduce
和iconcat
),然后对Spark执行flatten
:
def __get_iso2_2(addr,ptn):
return Series([ [[k]*len(re.findall(v,v in ptn.value.items()] for s in addr ])
get_iso2_2 = pandas_udf(lambda x:__get_iso2_2(x,"array<array<string>>",flatten(get_iso2_2(expr("upper(address)")))).show()
更新:要查找唯一的国家/地区,请执行以下操作:
def __get_iso2_3(addr,ptn):
return Series([ [k for k,v in ptn.value.items() if re.search(v,s)] for s in addr ])
get_iso2_3 = pandas_udf(lambda x:__get_iso2_3(x,get_iso2_3(expr("upper(address)"))).show()
+--------------------+----+--------+
| address|name| iso2|
+--------------------+----+--------+
|3030 Whispering P...|John| [US]|
|Kalverstraat Amst...|Mary|[NL,US]|
|Kalverstraat Amst...| Lex|[NL,US]|
| xvcv| ddd| []|
+--------------------+----+--------+
方法3:使用列表理解:
类似于 @CronosNull的方法,如果regex.csv的列表是可管理的,则可以使用列表理解来处理此问题:
from pyspark.sql.functions import size,split,upper,col,array,expr,flatten
df_regex = pd.read_csv("file:///path/to/regex.csv",sep=";")
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)',lambda m: m.group(1) + m.group(2).upper())
df_ptn = df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)})["keywords"].to_dict()
df1 = df.select("*",*[ (size(split(upper(col('address')),v))-1).alias(k) for k,v in df_ptn.items()])
df1.select(*df.columns,flatten(array(*[ expr("array_repeat('{0}',`{0}`)".format(c)) for c in df_ptn.keys() ])).alias('iso2')).show()
+--------------------+----+---+------------+
| address|name| id| iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John| 0|[US,US]|
| xvcv| ddd| 3| []|
+--------------------+----+---+------------+
,
注意:根据评论进行编辑
我喜欢@jxc的方法。我采用了略有不同的方式,仍然不使用UDF,也不需要广播正则表达式数据集(您只能在驱动程序中使用它)。
设置场景
<meta
http-equiv="Content-Security-Policy"
content="frame-src <%= VUE_APP_IDPURL %>;"
/>
通过正则表达式字典的每一行创建一个新列
import re
from io import StringIO
from pyspark.sql.functions import (
split,regexp_replace,regexp_extract,size,concat,lit,when,array_repeat,array_join,)
from pyspark.sql import DataFrame
import pandas as pd
df = spark.createDataFrame([
["3030 Whispering Pines Circle,"Lex"]
]).toDF("address","name")
sample_data = r"""iso2;keywords
US;\bArizona\b
US:\bTexas\b
US:\bFlorida\b
US;\bChicago\b
US:\bAmsterdam\b
US;\bProsper\b
US;\bUS$
CA:\bAlberta\b
CA:\bNova Scotia\b
CA:\bNova Scotia\b
CA;\bWhitehorse\b
CA;\bCA$
NL:\bAmsterdam\b
NL:\bNetherlands\b
NL;\bNL$"""
replace_pd = pd.read_csv(StringIO(sample_data),delimiter='[;:]',engine='python')
#Resample to have a similar number of rows
replace_pd = replace_pd.append([replace_pd]*10000)
运行测试
def replace_dict(df: DataFrame,column: str,replace_pd: pd.DataFrame)->DataFrame:
"""
returns a dataframe with the required transformations
to have a list of iso2 codes,and its number of repeats,based on the column (e.g. address) selected
"""
_df = (
df.withColumn("words",col(column))
)
#For each row in the csv create a new column
# it will contains the new value if the original
# column contains a matching string.
i = 0
cols = []
#grouping by iso2 code
grouped_df = replace_pd.groupby('iso2').agg({'keywords':lambda x: '(?im)' + '|'.join(x)}).reset_index()
for index,row in grouped_df.iterrows():
key = row.keywords
value = row.iso2
_cr = value
_df = _df.withColumn(_cr,size(split(col("words"),f"({key})"))-1)
cols.append(_cr)
i += 1
# Join the aux columns,removing the empty strings.
_df = _df.withColumn("iso2",array(*[when(col(x)>0,concat(lit(x),lit(":"),col(x))) for x in cols])).withColumn(
"iso2",expr(r"filter( iso2,x->x NOT rlike '^\s*$')")
)
_df = _df.drop("words",*cols) #drop the aux columns.
return _df
这将为您提供结果:
replace_dict(df,'address',replace_pd).show(truncate=False)
它应该比其他方法要快(所有转换都是狭窄的),但这取决于regex.csv文件的大小(因为它会创建许多稀疏列)。
,您将要向集群中的所有节点广播onChange()
,以便每个内核可以并行处理数据。
df_regex
更新df_regex_b = spark.sparkContext.broadcast(df_regex)
以使用广播变量:
get_iso2
使用嵌套函数定义UDF:
def get_iso2(x,df_regex_b):
iso2={}
for j,row in df_regex_b.value.iterrows():
regex = re.compile(row['keywords'],re.I|re.M)
matches = re.finditer(regex,x)
for m in matches:
iso2[row['iso2'] = iso2.get(row['iso2'],0) + 1
return [key for key,value in iso2.items() for _ in range(value)]
让我知道这是否可行。这是一篇博客文章,其中包含a more detailed discussion这种设计模式,可能会有所帮助。顺便问一句;)