问题描述
是否可以从AWS glue python作业执行任意sql命令(如ALTER TABLE)?我知道我可以用它来读取表中的数据,但是有没有一种方法可以执行其他特定于数据库的命令?
我需要将数据提取到目标数据库中,然后立即运行一些ALTER命令。
解决方法
因此,在进行了广泛的研究并在AWS支持下打开了一个案例之后,他们告诉我目前无法通过Python Shell或Glue pyspark工作。但是我只是尝试了一些有创意的东西,而且效果很好!这个想法是使用sparks依赖的py4j并利用标准的Java sql包。
此方法的两个巨大好处:
-
此功能的巨大好处是,您可以将数据库连接定义为Glue数据连接,并在其中保留jdbc详细信息和凭据,而无需在Glue代码中对其进行硬编码。下面的示例通过调用
glueContext.extract_jdbc_conf('your_glue_data_connection_name')
来获取在Glue中定义的jdbc url和凭据来实现这一点。 -
如果您需要在受支持的即用型Glue数据库上运行SQL命令,甚至不需要为该数据库使用/传递jdbc驱动程序-只要确保为该数据库设置了Glue连接即可并将该连接添加到您的Glue作业-Glue将上传正确的数据库驱动程序jar。
请记住,下面的代码是由驱动程序进程执行的,而Spark工作者/执行者不能执行。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv,['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
logger = glueContext.get_logger()
job = Job(glueContext)
job.init(args['JOB_NAME'],args)
# dw-poc-dev spark test
source_jdbc_conf = glueContext.extract_jdbc_conf('glue_database_connection_name')
from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")
conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url'),source_jdbc_conf.get('user'),source_jdbc_conf.get('password'))
print(conn.getMetaData().getDatabaseProductName())
# call stored procedure
cstmt = conn.prepareCall("{call dbo.sp_start_job(?)}");
cstmt.setString("job_name","testjob");
results = cstmt.execute();
conn.close()
,
这取决于。如果您将redshift用作目标,则可以选择将前操作和后操作指定为连接选项的一部分。您将可以在那指定更改动作。但是对于其余的目标类型,您可能需要使用某些python模块,例如pg8000(在Postgres情况下)和其他
,我修改了 mishkin 共享的代码,但它对我不起作用。因此,在进行了一些故障排除后,我意识到目录中的连接不起作用。所以我不得不手动修改它并稍微调整代码。现在它的工作正常,但最终还是 thorwoiung 异常,因为它无法将 java 结果转换为 python 结果。我做了一个变通方法,所以请谨慎使用。
below is my code.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [TempDir,JOB_NAME]
args = getResolvedOptions(sys.argv,['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'],args)
#source_jdbc_conf = glueContext.extract_jdbc_conf('redshift_publicschema')
from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.SQLException")
print('Trying to connect to DB')
conn = sc._gateway.jvm.DriverManager.getConnection('jdbc:redshift://redshift-cluster-2-url:4000/databasename','myusername','mypassword')
print('Trying to connect to DB success!')
print(conn.getMetaData().getDatabaseProductName())
# call stored procedure,in this case I call sp_start_job
stmt = conn.createStatement();
#cstmt = conn.prepareCall("call dbname.schemaname.my_storedproc();");
print('Call to proc trying ')
#cstmt.setString("job_name","testjob");
try:
rs = stmt.executeQuery('call mySchemaName.my_storedproc()');
except:
print("An exception occurred but proc has run")
#results = cstmt.execute();`enter code here`
conn.close()