无法使用 pySpark 从 Presto SQL (Trino) 获取数据

问题描述

我有一个在 AWS glue 上运行的 pyspark 作业。 当我通过本地机器运行代码时,代码运行良好。 但是当我尝试从 AWS glue 运行相同的代码时,我无法获取数据。 下面是我的代码错误信息。 从输出中您会注意到我能够获取架构信息。一旦我尝试获取数据,就会发生错误

更新: 由于工作节点无法访问主节点中存在的密钥库,因此出现此问题。有人可以帮助如何将文件复制到子节点或如何使子节点可以访问文件吗?

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import glueContext
from awsglue.job import Job
import boto3
import ssl
from pyspark.sql import sqlContext
from pyspark.sql.types import StructType,StructField,StringType


## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv,['JOB_NAME'])

sc = SparkContext()
glueContext = glueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
sqlCont = sqlContext(spark)
job.init(args['JOB_NAME'],args)


query = "SELECT * FROM test.employee where id='1001'"

s3_client = boto3.client('s3')

s3_client.download_file('bucket1','rootca_ca.jks','/tmp/rootca_ca.jks')
  

conparms_r = glueContext.extract_jdbc_conf("presto_test",catalog_id = None)
 
source_df = sqlCont.read.format("jdbc").option("driver","io.prestosql.jdbc.PrestoDriver").option("url","jdbc:presto://test-db.net:18000/hive").option("query",query).option("user",conparms_r['user']).option("password",conparms_r['password']).option("SSL",True).option("SSLKeyStorePath","/tmp/rootca_ca.jks").option("SSLKeyStorePassword","test12").load()



print("************************************source_df SUCCESSFULLY CREATED !!!!!!!!!!!!!!!!!*****************************************")

source_df.printSchema()
   
source_df.show(5)

输出

************************************source_df SUCCESSFULLY CREATED !!!!!!!!!!!!!!!!!*****************************************
root
 |-- lineage_key: long (nullable = true)
 |-- agreement_id: string (nullable = true)
 |-- termination_date: timestamp (nullable = true)

Traceback (most recent call last):
  File "/tmp/pytest",line 51,in <module>
    source_df.show(5)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py",line 378,in show
    print(self._jdf.showString(n,20,vertical))
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",line 1257,in __call__
    answer,self.gateway_client,self.target_id,self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",line 63,in deco
    return f(*a,**kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",line 328,in get_return_value
    format(target_id,".",name),value)
py4j.protocol.Py4JJavaError: An error occurred while calling o91.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 Failed 4 times,most recent failure: Lost task 0.3 in stage 0.0 (TID 3,100.64.188.253,executor 1): java.sql.sqlException: Error setting up SSL: /tmp/rootca_ca.jks (No such file or directory)
    at io.prestosql.jdbc.PrestoDriverUri.setupClient(PrestoDriverUri.java:235)
    at io.prestosql.jdbc.PrestoDriver.connect(PrestoDriver.java:88)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:272)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: io.prestosql.jdbc.$internal.client.ClientException: Error setting up SSL: /tmp/rootca_ca.jks (No such file or directory)
    at io.prestosql.jdbc.$internal.client.OkHttpUtil.setupSsl(OkHttpUtil.java:241)
    at io.prestosql.jdbc.PrestoDriverUri.setupClient(PrestoDriverUri.java:203)
    ... 23 more

解决方法

我能够通过将证书位置作为作业参数 --extra-files 传递来解决这个问题 并在代码中引用如下:

 source_df = sqlCont.read.format("jdbc").option("driver","io.prestosql.jdbc.PrestoDriver").option("url","jdbc:presto://test-db.net:18000/hive").option("query",query).option("user",conparms_r['user']).option("password",conparms_r['password']).option("SSL",True).option("SSLKeyStorePath","./rootca_ca.jks").option("SSLKeyStorePassword","test12").load()

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...