问题描述
因此,我在Java中实现了单独的链式哈希的实现:https://github.com/Big-data-analytics-project/Static-hashing-closed/blob/main/Static%20hashing%20closed
下一步是使用spark来实现它,我尝试阅读教程,但仍然迷失了方向。我该怎么办?
解决方法
一种可能性是从您的哈希实现中创建一个jar,然后将其作为UDF在Spark应用程序中注册,如下所示:
spark.udf.registerJavaFunction("udf_hash","function_name_inside_jar",<returnType e.g: StringType()>)
之后,您可以通过SQL表达式使用它,如下所示:
df = df.withColumn("hashed_column",expr("udf_hash({})".format("column")))
有用的链接:
Register UDF to SqlContext from Scala to use in PySpark
Spark: How to map Python with Scala or Java User Defined Functions?
重要的是,您必须使用--jars在spark-submit中定义您的jar
,您可以使用下面的UDF来实现:
#1.define hash id calculation UDF
def calculate_hashidUDF = udf((uid: String) => {
val md = java.security.MessageDigest.getInstance("SHA-1")
new BigInteger( DatatypeConverter.printHexBinary(md.digest(uid.getBytes)),16).mod(BigInteger.valueOf(10000))
})
#2.register hash id calculation UDF as spark sql function
spark.udf.register("hashid",calculate_hashidUDF)
对于直接哈希值,请在上面的def中使用md
,此函数将如何返回从1到10000的值
一旦您注册为spark udf,就可以在hashid
中使用spark.sql
。