如何在S3 Parquet中写入JSON文件

问题描述

    import json
    import requests
    import datetime
    import boto3
    import parquet
    import pyarrow
    import pandas as pd
    from pandas import DataFrame
         
    noaa_codes = [
        'KAST','KBDN','KCVO','KEUG','KHIO','KHRI','KMMV','KONP','KPDX','KRDM','KSLE','KSPB','KTMK','KTTD','KUAO'
        ]
     
    urls = [f"https://api.weather.gov/stations/{x}/observations/latest" for x in noaa_codes]
    
    
    s3_bucket="XXXXXX"
    s3_prefix = "XXXXX/parquetfiles"
    s3 = boto3.resource("s3")
    
    def get_datetime():
        dt = datetime.datetime.Now()
        return dt.strftime("%Y%m%d"),dt.strftime("%H:%M:%s")
              
    def reshape(r):
        props = r["properties"]
        res = {    
            "stn": props["station"].split("/")[-1],"temp": props["temperature"]["value"],"dewp": props["dewpoint"]["value"],"slp": props["seaLevelPressure"]["value"],"stp": props["barometricPressure"]["value"],"visib": props["visibility"]["value"],"wdsp": props["windSpeed"]["value"],"gust": props["windGust"]["value"],"max": props["maxTemperatureLast24Hours"]["value"],"min": props["minTemperatureLast24Hours"]["value"],"prcp": props["precipitationLast6Hours"]["value"]
        }
        return res
               
    def lambda_handler(event,context):
                           
        responses = []
        for url in urls:
            r = requests.get(url)
            responses.append(reshape(r.json()))
        
        datestr,timestr = get_datetime()
        fname = f"noaa_hourly_measurements_{timestr}"    
        file_prefix = "/".join([s3_prefix,datestr,fname])
        s3_obj = s3.Object(s3_bucket,file_prefix)`enter code here`
        serialized = []
        for r in responses:
            serialized.append(json.dumps(r))
        jsonlines_doc = "\n".join(serialized)
        df= pd.read_json(jsonlines_doc,lines=True)
        df.to_parquet(s3_obj,engine='auto',compression='snappy',index=None)
        print("created")

无法在aws s3中创建实木复合地板文件,但可以在本地创建。提出一种更好的方法。当我运行代码时,我可以在s3中创建一个json文件,但是当我尝试创建镶木文件时出现以下错误errorMessage“:”无效的文件路径或缓冲区对象类型:“,” errorType“:” ValueError“,” stackTrace“:[[”“ /var/task/lambda_function.py",80,"lambda_handler","df.to_parquet(location,engine ='auto',compression ='snappy',index =无)“

解决方法

确保您的s3_object是s3 url字符串。它必须看起来像这样

"s3://my_bucket/path/to/data_folder/my-file.parquet"

此外,不建议使用熊猫编写数据帧作为S3的拼花地板。对于python 3.6 +,AWS有一个名为aws-data-wrangler的库,可帮助实现Pandas / S3 / Parquet之间的集成

安装do;

pip install awswrangler

要将df写到s3,请这样做;

import awswrangler as wr
wr.s3.to_parquet(df=df,path="s3://my_bucket/path/to/data_folder/my-file.parquet")