如何在 Athena 中选择与列表中所有项目匹配的行使用 python

问题描述

我正在使用以下代码对 AWS Athena 进行一些查询

import os
import boto
import boto3
import pyspark
import botocore
import botocore.session
from dotenv import load_dotenv
from pyspark.sql import sqlContext

# aws credemtials 
load_dotenv('/home/brenda/Desktop/env_vars/bmartin_aws.env',override = True)
AWS_ACCESS_KEY_ID = os.environ.get('ACCESS_KEY') 
AWS_SECRET_ACCESS_KEY = os.environ.get('SECRET_KEY')

spark = pyspark.sql.SparkSession \
    .builder \
    .getorCreate()

## read matrix from athena
# Set the boto library connection to Athena and S3
client = boto3.client('athena',aws_access_key_id = AWS_ACCESS_KEY_ID,aws_secret_access_key = AWS_SECRET_ACCESS_KEY,region_name = 'us-west-2' )

s3 = boto3.resource('s3',region_name = 'us-west-2')

bucket_name = 'otr-files'
#srcPath = 'path/in/bucket'
srcPath = 'prod/routes_optimization/osrm_cost_matrix/data_athena'

bucket_name = 'otr-files'
#srcPath = 'path/in/bucket'
srcPath = 'prod/routes_optimization/osrm_cost_matrix/data_athena'

######### HERE IS MY QUERY #########
response = client.start_query_execution(
            QueryString = 'SELECT * FROM osrm_cost_matrix',QueryExecutionContext = {'Database': 'kof_distance_matrix'},ResultConfiguration = {'OutputLocation': 's3://{}/{}'.format(bucket_name,srcPath),}
        )

# Get execution data
execution_data = client.get_query_execution(QueryExecutionId = response['QueryExecutionId'])

# Get execution status
status = execution_data['QueryExecution']['Status']['State']

# Get output location
data_output_location = execution_data['QueryExecution']['ResultConfiguration']['OutputLocation']

# Get file name and s3 path to file
srcFileName = '/'.join(data_output_location.split('/')[-1:])
srcFileWithPath = srcPath + srcFileName

# Transfer file to df (Pyspark)
original_matrix = spark.read.format("com.databricks.spark.csv").options(header=True).load(data_output_location)

而 original_matrix 数据帧如下所示:

+-----------+-----------+--------+--------+--------+--------+-------+------------------+----------+---------------+---------------+
|client_id_x|client_id_y|   lat_x|   lng_x|   lat_y|   lng_y|   dist|              time|      date|        geoid_x|        geoid_y|
+-----------+-----------+--------+--------+--------+--------+-------+------------------+----------+---------------+---------------+
| 0700001710| 0700001760|-23.6753|-46.6788|-23.6346|-46.6057|14210.2|            21.045|2021-04-25|8da81000890577f|8da81001b0b353f|
| 0700001710| 0700002137|-23.6753|-46.6788|-23.6309|-46.7059| 7457.9|             12.01|2021-04-25|8da81000890577f|8da810056a5673f|

代码工作正常,但是我想检索 client_id_x 与某个列表中的所有元素匹配的行。我的列表 clients_list 如下所示:

clients_list = ['B201','0700001710','0700001760','0700002137','0700002497','0700006430','0700002866','0700006457','0700006459','0700006594','0700006873','0700014578','0700008285','0700009268','0700011876','0700014507','0700014756','0700014666','0700016016','0700014806','0700014849','0700014853']

我想执行这个查询

SELECT * FROM osrm_cost_matrix WHERE client_id_x IN (clients_list)

但是不起作用。我收到以下错误

Path does not exist: s3://otr-files/prod/routes_optimization/osrm_cost_matrix/data_athena/6126c412-f5dc-40cf-b079-d442d60f876a.csv;

非常感谢您的帮助。

解决方法

检查您的 Athena 查询状态,我认为它可能会失败。如果是这样,您仍然可以获得有效的 data_output_location,但不会有任何文件。因此 spark.read 将失败并显示“路径不存在”(完全正确)。

  • 也许您忘记将 clients_list 替换为其值,例如:
QueryString=f"SELECT * FROM osrm_cost_matrix WHERE client_id_x IN ({','.join(clients_list)})"
  • 等待查询结束,可能需要一些时间。