与熊猫一起使用多重处理来读取,修改和写入数千个CSV文件

问题描述

因此,在一个目录下有大约5000个csv文件,其中包含股票的分钟数据。每个文件均以其符号命名。像股票APL一样被命名为AAPL.csv。

我尝试对它们中的每一个进行一些清理和编辑。在这种情况下,我尝试将包含unix纪元datatime的一列转换为可读的日期和时间。我也想更改一列的标签

我尝试使用多处理来加快处理过程。但是首先尝试杀死我的Macbook。

我在VScode的jupyter笔记本中运行它。如果那很重要。

我想知道我做错了什么以及如何改进。以及如何处理python和pandas中的类似任务。

谢谢!

这是我的代码

# Define operations will be used in multiprocessing handling
def clean_up(file,fail_list):
    print('Working on {}'.format(file))
    stock = pd.read_csv('./Data/minutes_data/' + file)

    try:
        #Convert datetime columns into readable date and time column
        stock['Date'] = stock.apply(lambda row: epoch_converter.get_date_from_mill_epoch(row['datetime']),axis=1)
        stock['Time'] = stock.apply(lambda row: epoch_converter.get_time_from_mill_epoch(row['datetime']),axis=1)

        #Rename 'Unnamed: 0' column into 'Minute'
        stock.rename(columns={'Unnamed: 0':'Minute'},inplace=True)

        #Write it back to new file
        stock.to_csv('./Data/working_data/' + file)
    except:
        print('{} not successful'.format(file))
        fail_list = fail_list.append(file)
        fail_list.to_csv('./Failed_list.csv')



#Get file list to working on.
file_list = os.listdir('./Data/minutes_data/')

#prepare Failed_list
fail_list = pd.DataFrame([])
#Loop through each file
processes = []
for file in file_list:
    p = multiprocessing.Process(target=clean_up,args=(file,fail_list,))
    processes.append(p)
    p.start()

for process in processes:
    process.join()

更新:CSV_FILE_SAMPLE

,打开,高,低,关闭,音量,日期时间 0,21.9,200,1596722940000 0,20.0,19.9937,1595266500000 1,500,1595266800000 2,1094,1595267040000 3,1595268240000

最终更新:

从@furas和@jsmart组合答案,该脚本设法将5000 csv的处理时间从数小时减少到1分钟以内(在Macbook pro上为6核心i9以下)。我很高兴。你们真棒。谢谢!

最终的脚本在这里

import pandas as pd
import numpy as np
import os
import multiprocessing
import logging

logging.basicConfig(filename='./log.log',level=logging.DEBUG)

file_list = os.listdir('./Data/minutes_data/')

def cleanup(file):
    print('Working on ' + file)
    stock = pd.read_csv('./Data/minutes_data/' + file)
    
    try:
        #Convert datetime columns into readable date and time column
        stock['Date'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.date
        stock['Time'] = pd.to_datetime(stock['datetime'],utc=True).dt.tz_convert('America/New_York').dt.time

        #Rename 'Unnamed: 0' column into 'Minute'
        stock.rename(columns={'Unnamed: 0':'Minute'},inplace=True)

        #Write it back to new file
        stock.to_csv('./Data/working_data/' + file)
    except:
        print(file + ' Not successful')
        logging.warning(file + ' Not complete.')



pool = multiprocessing.Pool()
pool.map(cleanup,file_list)

解决方法

使用Process循环可同时创建5000个进程

您可以使用Pool控制同时运行多少个进程-它会自动释放下一个文件的进程。

它也可以使用return将失败文件的名称发送到主进程,并且可以保存一次文件。在许多进程中使用同一文件可能会使该文件中的数据出错。此外,进程不共享变量,每个进程将拥有自己的空DataFrame,以后将仅保存自己的失败文件-因此它将删除先前的内容。

def clean_up(file):
    # ... code ...
    
        return None  # if OK
    except:
        return file  # if failed
    
    
# --- main ---

# get file list to working on.
file_list = sorted(os.listdir('./Data/minutes_data/'))

with multiprocessing.Pool(10) as p:
    failed_files = p.map(clean_up,file_list)

# remove None from names
failed_files = filter(None,failed_files)

# save all
df = pd.DataFrame(failed_files)
df.to_csv('./failed_list.csv')

还有multiprocessing.pool.ThreadPool使用threads而不是processes

模块concurrent.futures还具有ThreadPoolExecutorProcessPoolExecutor

您也可以尝试使用外部模块来完成此操作-但我不认为这可能有用。

,

原始帖子询问“ ...如何在python和pandas中处理类似的任务。”

  • 替换.apply(...,axis=1)可以使吞吐量提高100倍或更高。
  • 下面是一个包含10_000行数据的示例:
%%timeit
df['date'] = df.apply(lambda x: pd.to_datetime(x['timestamp'],unit='ms'),axis=1)
792 ms ± 26.7 ms per loop (mean ± std. dev. of 7 runs,1 loop each)

重写为:

%%timeit
df['date'] = pd.to_datetime(df['date'],unit='ms')
4.88 ms ± 38.6 µs per loop (mean ± std. dev. of 7 runs,100 loops each)

样本数据:

print(df['timestamp'].head())
0    1586863008214
1    1286654914895
2    1436424291218
3    1423512988135
4    1413205308057
Name: timestamp,dtype: int64