使用JKS文件在spark udf内部进行加密解密

问题描述

我需要使用spark udf中的jks文件对解密进行加密。 使用spark-shell运行我的应用程序时出现错误

by: java.net.MalformedURLException: no protocol: CDA_KEYSTORE_PT140.jks

我知道udfs被spark视为黑匣子,因此其中的任何文件都不会被读取为hdfs文件,因此我尝试使用以下方式将文件副本发送到每个执行程序的本地工作目录中

/usr/hdp/current/spark2-client/bin/spark-shell --files CDA_KEYSTORE_PT140.jks

我的udf如下

def impl2(col1:String): String ={
var pilotCrypto=new PilotCryptoImpl
    pilotCrypto.setKey1("sensitive data")
    pilotCrypto.setKey2("sensitive data")
    pilotCrypto.setKey3("sensitive data")
    pilotCrypto.init()
    EncryptionUtil.setCrypto(pilotCrypto)
    val psg = new IvParamSpecGenerator(true)
    val crypto = new JceCryptoImpl
    crypto.setKeystoreURL("CDA_KEYSTORE_PT140.jks")
    crypto.setKeystoreType("JCEKS")

我的完整代码如下,我正在使用spark-shell命令运行。代码位于.scala文件

import org.apache.spark.sql.sqlContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rank
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{lit,udf}
import org.apache.spark.sql.types.StringType

def createDataframeFromsql(hc: SparkSession,sql: String): DataFrame = {

    var df = hc.sql(sql)
    return df
  }
  
def impl2(col1:String): String ={

    var pilotCrypto=new PilotCryptoImpl
    pilotCrypto.setKey1("EbT5a8Fuq")
    pilotCrypto.setKey2("aYt2gv6R")
    pilotCrypto.setKey3("9bFp3Gz4k")
    pilotCrypto.init()
    EncryptionUtil.setCrypto(pilotCrypto)
    val psg = new IvParamSpecGenerator(true)
    val crypto = new JceCryptoImpl
    crypto.setKeystoreURL("CDA_KEYSTORE_PT140.jks")
    crypto.setKeystoreType("JCEKS")
    crypto.setKeyAlias(EncryptionUtil.decryptHex("sensitive data"))
    crypto.setKeyPassword(EncryptionUtil.decryptHex("sensitive data"))
    crypto.setCipherTransformation("AES/CBC/PKCS5Padding")
    crypto.setAlgorithmParamSpecGenerator(psg)
    crypto.setEncodeBase64(true)
    crypto.init()
    val string_to_decrypt = col1
    var encryptedBytes1 = col1.getBytes
    var decryptedBytes1 = new String(crypto.decrypt(encryptedBytes1))
    decryptedBytes1
  }
  
  
  

  def processdata(): Unit = {
    try {
      val hc = SparkSession.builder.appName("HivetoSpark").config("spark.sql.warehouse.dir","namenode/apps/hive/warehouse").enableHiveSupport().getorCreate()
      import hc.implicits._
      hc.sql("""set hive.exec.dynamic.partition=true""")
      hc.sql("""set hive.exec.dynamic.partition.mode=nonstrict""")
      hc.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
      hc.sql("""set hive.merge.tezfiles=true""")
      hc.sql("""set hive.merge.smallfiles.avgsize=256000000""")
      hc.sql("""set hive.merge.size.per.task=256000000""")
      hc.sql("""set hive.merge.sparkfiles=true""")
      val start = System.currentTimeMillis()

     val impl2udf = udf(impl2 _)
      var query1 =s"""select * from table"""
      var fraud_trn_df1 = createDataframeFromsql(hc,query1)
      fraud_trn_df1.show(5,false)

      val fraud_trn_df2 = fraud_trn_df1.withColumn("FT_PRIM_NUM_AMT_decr",impl2udf(col("ft_prim_num_amt")))
      val fraud_trn_df3 = fraud_trn_df2.withColumn("FT_SECONDARY_NUMBER_AMOUNT_decr",impl2udf(col("ft_secndy_num_amt")))
      fraud_trn_df3.show(5,false)

      val end = System.currentTimeMillis()
      val runTimeInSec = (end - start) / 1000.0
      println(s"runTimeInSec: ${runTimeInSec}sec")
    }


 

我最后要调用processdata。

stackrace is as below

Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (string) => string)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_133$(UnkNown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(UnkNown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(UnkNown Source)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:232)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.MalformedURLException: no protocol: CDA_KEYSTORE_PT140.jks
        at java.net.URL.<init>(URL.java:593)
        at java.net.URL.<init>(URL.java:490)
        at java.net.URL.<init>(URL.java:439)
        at com.telus.framework.crypto.impl.jce.JceCryptoImpl.init(JceCryptoImpl.java:78)
        at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.impl2(<console>:56)
        at $line29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:59)
        at $line29.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:59)

解决方法

该URL缺少协议(如错误消息所述)。

应以以下格式编写:

sap.m.ComboBox

如果必须使用本地文件运行,请使用:

sap.m.MultiComboBox

使用Spark我更喜欢HDFS文件,因为所有工作人员都更容易访问它们。否则,对于本地文件,您将必须将其复制到所有工作节点。