使用带有正则表达式Scala?的字典的PySpark UDF优化挑战 运行测试

问题描述

我正在尝试优化下面的代码(PySpark UDF)。

它给我想要的结果(基于我的数据集),但是对于非常大的数据集(约180M)来说太慢了。 结果(准确性)优于可用的python模块(例如geotext,hdx-python-country)。因此,我不需要其他模块。

DataFrame:

df = spark.createDataFrame([
  ["3030 Whispering Pines Circle,Prosper Texas,US","John"],["Kalverstraat Amsterdam","Mary"],["Kalverstraat Amsterdam,Netherlands","Lex"] 
]).toDF("address","name")

regex.csv

iso2;keywords
US;\bArizona\b
US;\bTexas\b
US;\bFlorida\b
US;\bChicago\b
US;\bAmsterdam\b
US;\bProsper\b
US;\bUS$
CA;\bAlberta\b
CA;\bNova Scotia\b
CA;\bNova Scotia\b
CA;\bWhitehorse\b
CA;\bCA$
NL;\bAmsterdam\b
NL;\Netherlands\b
NL;\bNL$

......<many,many more>

regex.csv创建一个熊猫数据帧,按iso2分组,然后加入keywords\bArizona\b|\bTexas\b\bFlorida\b|\bUS$)。

df = pd.read_csv(regex.csv,sep=';')
df_regex = df.groupby('iso2').agg({'keywords': '|'.join }).reset_index()

功能

def get_iso2(x): 
 
    iso2={}
    
    for j,row in df_regex.iterrows():
 
        regex = re.compile(row['keywords'],re.I|re.M)         
        matches = re.finditer(regex,x)
        
        for m in matches:
            iso2[row['iso2']] = iso2.get(row['iso2'],0) + 1
            
    return [key for key,value in iso2.items() for _ in range(value)]

PySpark UDF:

get_iso2_udf = F.udf(get_iso2,T.ArrayType(T.StringType()))

创建新列:

df_new = df.withColumn('iso2',get_iso2_udf('address')

预期的示例输出

[US,US,NL]
[CA]
[BE,BE,AU]

一些地方发生在一个以上的国家(输入是带有城市,省,州,国家...的地址列)

示例:

美国得克萨斯州Prosper的3030年低语松树圈-> [US,US]
阿姆斯特丹Kalverstraat-> [US,NL]
Kalverstraat荷兰阿姆斯特丹-> [US,NL,NL]

也许可以在PySpark中使用Scala UDF,但是我不知道如何。

高度赞赏您的优化建议!

解决方法

IIUC,您可以不使用UDF尝试以下步骤:

from pyspark.sql.functions import expr,first,collect_list,broadcast,monotonically_increasing_id,flatten
import pandas as pd

df = spark.createDataFrame([
  ["3030 Whispering Pines Circle,Prosper Texas,US","John"],["Kalverstraat Amsterdam","Mary"],["Kalverstraat Amsterdam,Netherlands","Lex"],["xvcv","ddd"]
]).toDF("address","name")

步骤1::将df_regex转换为Spark数据帧df1,并将唯一ID添加到df

df_regex = pd.read_csv("file:///path/to/regex.csv",sep=";")

# adjust keywords to uppercase except chars preceded with backslash:
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)',lambda m: m.group(1) + m.group(2).upper())

# create regex patterns:
df_regex = df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)}).reset_index()

df1 = spark.createDataFrame(df_regex)
df1.show(truncate=False)
+----+---------------------------------------------------------------------------------+
|iso2|keywords                                                                         |
+----+---------------------------------------------------------------------------------+
|CA  |(?m)\bALBERTA\b|\bNOVA SCOTIA\b|\bWHITEHORSE\b|\bCA$                             |
|NL  |(?m)\bAMSTERDAM\b|\bNETHERLANDS\b|\bNL$                                          |
|US  |(?m)\bARIZONA\b|\bTEXAS\b|\bFLORIDA\b|\bCHICAGO\b|\bAMSTERDAM\b|\bPROSPER\b|\bUS$|
+----+---------------------------------------------------------------------------------+

df = df.withColumn('id',monotonically_increasing_id())
df.show(truncate=False)
+-----------------------------------------------+----+---+
|address                                        |name|id |
+-----------------------------------------------+----+---+
|3030 Whispering Pines Circle,US|John|0  |
|Kalverstraat Amsterdam                         |Mary|1  |
|Kalverstraat Amsterdam,Netherlands            |Lex |2  |
|xvcv                                           |ddd |3  |
+-----------------------------------------------+----+---+

第2步::使用rlike将df_regex与df左连接

df2 = df.alias('d1').join(broadcast(df1.alias('d2')),expr("upper(d1.address) rlike d2.keywords"),"left")
df2.show()
+--------------------+----+---+----+--------------------+
|             address|name| id|iso2|            keywords|
+--------------------+----+---+----+--------------------+
|3030 Whispering P...|John|  0|  US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...|Mary|  1|  NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...|Mary|  1|  US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...| Lex|  2|  NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...| Lex|  2|  US|(?m)\bARIZONA\b|\...|
|                xvcv| ddd|  3|null|                null|
+--------------------+----+---+----+--------------------+

第3步:通过将d2.keywords除以d1.address来计算d1.address中匹配的d2.keywords的数量,然后减小结果数组按1:

df3 = df2.withColumn('num_matches',expr("size(split(upper(d1.address),d2.keywords))-1"))
+--------------------+----+---+----+--------------------+-----------+
|             address|name| id|iso2|            keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John|  0|  US|(?m)\bARIZONA\b|\...|          3|
|Kalverstraat Amst...|Mary|  1|  NL|(?m)\bAMSTERDAM\b...|          1|
|Kalverstraat Amst...|Mary|  1|  US|(?m)\bARIZONA\b|\...|          1|
|Kalverstraat Amst...| Lex|  2|  NL|(?m)\bAMSTERDAM\b...|          2|
|Kalverstraat Amst...| Lex|  2|  US|(?m)\bARIZONA\b|\...|          1|
|                xvcv| ddd|  3|null|                null|         -2|
+--------------------+----+---+----+--------------------+-----------+

第4步:使用array_repeat重复iso2 num_matches次的值(需要 Spark 2.4 + ):

df4 = df3.withColumn("iso2",expr("array_repeat(iso2,num_matches)"))
+--------------------+----+---+------------+--------------------+-----------+
|             address|name| id|        iso2|            keywords|num_matches|
+--------------------+----+---+------------+--------------------+-----------+
|3030 Whispering P...|John|  0|[US,US,US]|(?m)\bARIZONA\b|\...|          3|
|Kalverstraat Amst...|Mary|  1|        [NL]|(?m)\bAMSTERDAM\b...|          1|
|Kalverstraat Amst...|Mary|  1|        [US]|(?m)\bARIZONA\b|\...|          1|
|Kalverstraat Amst...| Lex|  2|    [NL,NL]|(?m)\bAMSTERDAM\b...|          2|
|Kalverstraat Amst...| Lex|  2|        [US]|(?m)\bARIZONA\b|\...|          1|
|                xvcv| ddd|  3|          []|                null|         -2|
+--------------------+----+---+------------+--------------------+-----------+

步骤5:分组并进行汇总:

df_new = df4 \
    .groupby('id') \
    .agg(
      first('address').alias('address'),first('name').alias('name'),flatten(collect_list('iso2')).alias('countries')
)
+---+--------------------+----+------------+
| id|             address|name|   countries|
+---+--------------------+----+------------+
|  0|3030 Whispering P...|John|[US,US]|
|  1|Kalverstraat Amst...|Mary|    [NL,US]|
|  3|                xvcv| ddd|          []|
|  2|Kalverstraat Amst...| Lex|[NL,NL,US]|
+---+--------------------+----+------------+

替代方法:熊猫UDF也可以执行步骤3:

from pyspark.sql.functions import pandas_udf,PandasUDFType
from pandas import Series
import re

@pandas_udf("int",PandasUDFType.SCALAR)
def get_num_matches(addr,ptn):
    return Series([ 0 if p is None else len(re.findall(p,s)) for p,s in zip(ptn,addr) ])

df3 = df2.withColumn("num_matches",get_num_matches(expr('upper(address)'),'keywords'))
+--------------------+----+---+----+--------------------+-----------+
|             address|name| id|iso2|            keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John|  0|  US|(?m)\bARIZONA\b|\...|          3|
|Kalverstraat Amst...|Mary|  1|  NL|(?m)\bAMSTERDAM\b...|          1|
|Kalverstraat Amst...|Mary|  1|  US|(?m)\bARIZONA\b|\...|          1|
|Kalverstraat Amst...| Lex|  2|  NL|(?m)\bAMSTERDAM\b...|          2|
|Kalverstraat Amst...| Lex|  2|  US|(?m)\bARIZONA\b|\...|          1|
|                xvcv| ddd|  3|null|                null|          0|
+--------------------+----+---+----+--------------------+-----------+

注释:

  1. 由于不区分大小写的模式匹配非常昂贵,因此我们转换了所有关键字字符(锚点或转义字符,例如\b\B\A\z )到大写。
  2. 提醒一下,rlikeregexp_replace中使用的模式是基于Java的,而在pandas_udf中,它是基于Python的,在regex.csv中设置模式时可能会有细微的差异。

方法2:使用pandas_udf

由于使用join和groupby触发数据改组,因此上述方法可能很慢。仅是一种测试选项:

df_regex = pd.read_csv("file:///path/to/regex.csv",sep=";")

df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)',lambda m: m.group(1) + m.group(2).upper())

df_ptn = spark.sparkContext.broadcast(
    df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)})["keywords"].to_dict()
)
df_ptn.value
#{'CA': '(?m)\\bALBERTA\\b|\\bNOVA SCOTIA\\b|\\bNOVA SCOTIA\\b|\\bWHITEHORSE\\b|\\bCA$',# 'NL': '(?m)\\bAMSTERDAM\\b|\\bNETHERLANDS\\b|\\bNL$',# 'US': '(?m)\\bARIZONA\\b|\\bTEXAS\\b|\\bFLORIDA\\b|\\bCHICAGO\\b|\\bAMSTERDAM\\b|\\bPROSPER\\b|\\bUS$'}

# REF: https://stackoverflow.com/questions/952914/how-to-make-a-flat-list-out-of-list-of-lists
from operator import iconcat
from functools import reduce
from pandas import Series
from pyspark.sql.functions import pandas_udf,PandasUDFType,flatten

def __get_iso2(addr,ptn):   
   return Series([ reduce(iconcat,[[k]*len(re.findall(v,s)) for k,v in ptn.value.items()]) for s in addr ])

get_iso2 = pandas_udf(lambda x:__get_iso2(x,df_ptn),"array<string>",PandasUDFType.SCALAR)

df.withColumn('iso2',get_iso2(expr("upper(address)"))).show()
+--------------------+----+---+------------+
|             address|name| id|        iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John|  0|[US,US]|
|Kalverstraat Amst...|Mary|  1|    [NL,US]|
|Kalverstraat Amst...| Lex|  2|[NL,US]|
|                xvcv| ddd|  3|          []|
+--------------------+----+---+------------+

或在pandas_udf中返回一个数组数组(不带reduceiconcat),然后对Spark执行flatten

def __get_iso2_2(addr,ptn):
    return Series([ [[k]*len(re.findall(v,v in ptn.value.items()] for s in addr ])

get_iso2_2 = pandas_udf(lambda x:__get_iso2_2(x,"array<array<string>>",flatten(get_iso2_2(expr("upper(address)")))).show()

更新:要查找唯一的国家/地区,请执行以下操作:

def __get_iso2_3(addr,ptn):
  return Series([ [k for k,v in ptn.value.items() if re.search(v,s)] for s in addr ])

get_iso2_3 = pandas_udf(lambda x:__get_iso2_3(x,get_iso2_3(expr("upper(address)"))).show()
+--------------------+----+--------+
|             address|name|    iso2|
+--------------------+----+--------+
|3030 Whispering P...|John|    [US]|
|Kalverstraat Amst...|Mary|[NL,US]|
|Kalverstraat Amst...| Lex|[NL,US]|
|                xvcv| ddd|      []|
+--------------------+----+--------+

方法3:使用列表理解:

类似于 @CronosNull的方法,如果regex.csv的列表是可管理的,则可以使用列表理解来处理此问题:

from pyspark.sql.functions import size,split,upper,col,array,expr,flatten

df_regex = pd.read_csv("file:///path/to/regex.csv",sep=";")
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)',lambda m: m.group(1) + m.group(2).upper())
df_ptn = df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)})["keywords"].to_dict()

df1 = df.select("*",*[ (size(split(upper(col('address')),v))-1).alias(k) for k,v in df_ptn.items()])

df1.select(*df.columns,flatten(array(*[ expr("array_repeat('{0}',`{0}`)".format(c)) for c in df_ptn.keys() ])).alias('iso2')).show()
+--------------------+----+---+------------+
|             address|name| id|        iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John|  0|[US,US]|
|                xvcv| ddd|  3|          []|
+--------------------+----+---+------------+
,

注意:根据评论进行编辑

我喜欢@jxc的方法。我采用了略有不同的方式,仍然不使用UDF,也不需要广播正则表达式数据集(您只能在驱动程序中使用它)。

设置场景

    <meta
      http-equiv="Content-Security-Policy"
      content="frame-src <%= VUE_APP_IDPURL %>;"
    />

通过正则表达式字典的每一行创建一个新列

import re
from io import StringIO
from pyspark.sql.functions import (
    split,regexp_replace,regexp_extract,size,concat,lit,when,array_repeat,array_join,)
from pyspark.sql import DataFrame
import pandas as pd
df = spark.createDataFrame([
  ["3030 Whispering Pines Circle,"Lex"] 
]).toDF("address","name")

sample_data = r"""iso2;keywords
US;\bArizona\b
US:\bTexas\b
US:\bFlorida\b
US;\bChicago\b
US:\bAmsterdam\b
US;\bProsper\b
US;\bUS$
CA:\bAlberta\b
CA:\bNova Scotia\b
CA:\bNova Scotia\b
CA;\bWhitehorse\b
CA;\bCA$
NL:\bAmsterdam\b
NL:\bNetherlands\b
NL;\bNL$"""
replace_pd = pd.read_csv(StringIO(sample_data),delimiter='[;:]',engine='python')
#Resample to have a similar number of rows
replace_pd = replace_pd.append([replace_pd]*10000)

运行测试

def replace_dict(df: DataFrame,column: str,replace_pd: pd.DataFrame)->DataFrame:
    """
    returns a dataframe with the required transformations 
    to have a list of iso2 codes,and its number of repeats,based on the column (e.g. address) selected
    """
    _df = (
        df.withColumn("words",col(column))
    )
    #For each row in the csv create a new column
    # it will contains the new value if the original
    # column contains a matching string. 
    i = 0
    cols = []
    #grouping by iso2 code
    grouped_df = replace_pd.groupby('iso2').agg({'keywords':lambda x: '(?im)' + '|'.join(x)}).reset_index()
    for index,row in grouped_df.iterrows():
        key = row.keywords
        value = row.iso2
        _cr = value
        _df = _df.withColumn(_cr,size(split(col("words"),f"({key})"))-1)
        cols.append(_cr)
        i += 1
    # Join the aux columns,removing the empty strings. 
    _df = _df.withColumn("iso2",array(*[when(col(x)>0,concat(lit(x),lit(":"),col(x))) for x in cols])).withColumn(
        "iso2",expr(r"filter( iso2,x->x NOT rlike '^\s*$')")
    )
    _df = _df.drop("words",*cols) #drop the aux columns.
    return _df

这将为您提供结果:

replace_dict(df,'address',replace_pd).show(truncate=False)

它应该比其他方法要快(所有转换都是狭窄的),但这取决于regex.csv文件的大小(因为它会创建许多稀疏列)。

,

您将要向集群中的所有节点广播onChange(),以便每个内核可以并行处理数据。

df_regex

更新df_regex_b = spark.sparkContext.broadcast(df_regex) 以使用广播变量:

get_iso2

使用嵌套函数定义UDF:

def get_iso2(x,df_regex_b): 
 
    iso2={}
    
    for j,row in df_regex_b.value.iterrows():
 
        regex = re.compile(row['keywords'],re.I|re.M)         
        matches = re.finditer(regex,x)
        
        for m in matches:
            iso2[row['iso2'] = iso2.get(row['iso2'],0) + 1
            
    return [key for key,value in iso2.items() for _ in range(value)]

让我知道这是否可行。这是一篇博客文章,其中包含a more detailed discussion这种设计模式,可能会有所帮助。顺便问一句;)