问题描述
import os
import boto
import boto3
import pyspark
import botocore
import botocore.session
from dotenv import load_dotenv
from pyspark.sql import sqlContext
# aws credemtials
load_dotenv('/home/brenda/Desktop/env_vars/bmartin_aws.env',override = True)
AWS_ACCESS_KEY_ID = os.environ.get('ACCESS_KEY')
AWS_SECRET_ACCESS_KEY = os.environ.get('SECRET_KEY')
spark = pyspark.sql.SparkSession \
.builder \
.getorCreate()
## read matrix from athena
# Set the boto library connection to Athena and S3
client = boto3.client('athena',aws_access_key_id = AWS_ACCESS_KEY_ID,aws_secret_access_key = AWS_SECRET_ACCESS_KEY,region_name = 'us-west-2' )
s3 = boto3.resource('s3',region_name = 'us-west-2')
bucket_name = 'otr-files'
#srcPath = 'path/in/bucket'
srcPath = 'prod/routes_optimization/osrm_cost_matrix/data_athena'
bucket_name = 'otr-files'
#srcPath = 'path/in/bucket'
srcPath = 'prod/routes_optimization/osrm_cost_matrix/data_athena'
######### HERE IS MY QUERY #########
response = client.start_query_execution(
QueryString = 'SELECT * FROM osrm_cost_matrix',QueryExecutionContext = {'Database': 'kof_distance_matrix'},ResultConfiguration = {'OutputLocation': 's3://{}/{}'.format(bucket_name,srcPath),}
)
# Get execution data
execution_data = client.get_query_execution(QueryExecutionId = response['QueryExecutionId'])
# Get execution status
status = execution_data['QueryExecution']['Status']['State']
# Get output location
data_output_location = execution_data['QueryExecution']['ResultConfiguration']['OutputLocation']
# Get file name and s3 path to file
srcFileName = '/'.join(data_output_location.split('/')[-1:])
srcFileWithPath = srcPath + srcFileName
# Transfer file to df (Pyspark)
original_matrix = spark.read.format("com.databricks.spark.csv").options(header=True).load(data_output_location)
而 original_matrix 数据帧如下所示:
+-----------+-----------+--------+--------+--------+--------+-------+------------------+----------+---------------+---------------+
|client_id_x|client_id_y| lat_x| lng_x| lat_y| lng_y| dist| time| date| geoid_x| geoid_y|
+-----------+-----------+--------+--------+--------+--------+-------+------------------+----------+---------------+---------------+
| 0700001710| 0700001760|-23.6753|-46.6788|-23.6346|-46.6057|14210.2| 21.045|2021-04-25|8da81000890577f|8da81001b0b353f|
| 0700001710| 0700002137|-23.6753|-46.6788|-23.6309|-46.7059| 7457.9| 12.01|2021-04-25|8da81000890577f|8da810056a5673f|
该代码工作正常,但是我想检索 client_id_x 与某个列表中的所有元素匹配的行。我的列表 clients_list
如下所示:
clients_list = ['B201','0700001710','0700001760','0700002137','0700002497','0700006430','0700002866','0700006457','0700006459','0700006594','0700006873','0700014578','0700008285','0700009268','0700011876','0700014507','0700014756','0700014666','0700016016','0700014806','0700014849','0700014853']
我想执行这个查询:
SELECT * FROM osrm_cost_matrix WHERE client_id_x IN (clients_list)
但是不起作用。我收到以下错误:
Path does not exist: s3://otr-files/prod/routes_optimization/osrm_cost_matrix/data_athena/6126c412-f5dc-40cf-b079-d442d60f876a.csv;
非常感谢您的帮助。
解决方法
检查您的 Athena 查询状态,我认为它可能会失败。如果是这样,您仍然可以获得有效的 data_output_location
,但不会有任何文件。因此 spark.read
将失败并显示“路径不存在”(完全正确)。
- 也许您忘记将
clients_list
替换为其值,例如:
QueryString=f"SELECT * FROM osrm_cost_matrix WHERE client_id_x IN ({','.join(clients_list)})"
- 等待查询结束,可能需要一些时间。