问题描述
因此,在一个目录下有大约5000个csv文件,其中包含股票的分钟数据。每个文件均以其符号命名。像股票APL一样被命名为AAPL.csv。
我尝试对它们中的每一个进行一些清理和编辑。在这种情况下,我尝试将包含unix纪元datatime的一列转换为可读的日期和时间。我也想更改一列的标签。
我尝试使用多处理来加快处理过程。但是首先尝试杀死我的Macbook。
我在VScode的jupyter笔记本中运行它。如果那很重要。
我想知道我做错了什么以及如何改进。以及如何处理python和pandas中的类似任务。
谢谢!
这是我的代码。
# Define operations will be used in multiprocessing handling
def clean_up(file,fail_list):
print('Working on {}'.format(file))
stock = pd.read_csv('./Data/minutes_data/' + file)
try:
#Convert datetime columns into readable date and time column
stock['Date'] = stock.apply(lambda row: epoch_converter.get_date_from_mill_epoch(row['datetime']),axis=1)
stock['Time'] = stock.apply(lambda row: epoch_converter.get_time_from_mill_epoch(row['datetime']),axis=1)
#Rename 'Unnamed: 0' column into 'Minute'
stock.rename(columns={'Unnamed: 0':'Minute'},inplace=True)
#Write it back to new file
stock.to_csv('./Data/working_data/' + file)
except:
print('{} not successful'.format(file))
fail_list = fail_list.append(file)
fail_list.to_csv('./Failed_list.csv')
#Get file list to working on.
file_list = os.listdir('./Data/minutes_data/')
#prepare Failed_list
fail_list = pd.DataFrame([])
#Loop through each file
processes = []
for file in file_list:
p = multiprocessing.Process(target=clean_up,args=(file,fail_list,))
processes.append(p)
p.start()
for process in processes:
process.join()
更新:CSV_FILE_SAMPLE
,打开,高,低,关闭,音量,日期时间 0,21.9,200,1596722940000 0,20.0,19.9937,1595266500000 1,500,1595266800000 2,1094,1595267040000 3,1595268240000
最终更新:
从@furas和@jsmart组合答案,该脚本设法将5000 csv的处理时间从数小时减少到1分钟以内(在Macbook pro上为6核心i9以下)。我很高兴。你们真棒。谢谢!
最终的脚本在这里:
import pandas as pd
import numpy as np
import os
import multiprocessing
import logging
logging.basicConfig(filename='./log.log',level=logging.DEBUG)
file_list = os.listdir('./Data/minutes_data/')
def cleanup(file):
print('Working on ' + file)
stock = pd.read_csv('./Data/minutes_data/' + file)
try:
#Convert datetime columns into readable date and time column
stock['Date'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.date
stock['Time'] = pd.to_datetime(stock['datetime'],utc=True).dt.tz_convert('America/New_York').dt.time
#Rename 'Unnamed: 0' column into 'Minute'
stock.rename(columns={'Unnamed: 0':'Minute'},inplace=True)
#Write it back to new file
stock.to_csv('./Data/working_data/' + file)
except:
print(file + ' Not successful')
logging.warning(file + ' Not complete.')
pool = multiprocessing.Pool()
pool.map(cleanup,file_list)
解决方法
使用Process
循环可同时创建5000个进程
您可以使用Pool
控制同时运行多少个进程-它会自动释放下一个文件的进程。
它也可以使用return
将失败文件的名称发送到主进程,并且可以保存一次文件。在许多进程中使用同一文件可能会使该文件中的数据出错。此外,进程不共享变量,每个进程将拥有自己的空DataFrame,以后将仅保存自己的失败文件-因此它将删除先前的内容。
def clean_up(file):
# ... code ...
return None # if OK
except:
return file # if failed
# --- main ---
# get file list to working on.
file_list = sorted(os.listdir('./Data/minutes_data/'))
with multiprocessing.Pool(10) as p:
failed_files = p.map(clean_up,file_list)
# remove None from names
failed_files = filter(None,failed_files)
# save all
df = pd.DataFrame(failed_files)
df.to_csv('./failed_list.csv')
还有multiprocessing.pool.ThreadPool
使用threads
而不是processes
。
模块concurrent.futures还具有ThreadPoolExecutor
和ProcessPoolExecutor
您也可以尝试使用外部模块来完成此操作-但我不认为这可能有用。
,原始帖子询问“ ...如何在python和pandas中处理类似的任务。”
- 替换
.apply(...,axis=1)
可以使吞吐量提高100倍或更高。 - 下面是一个包含10_000行数据的示例:
%%timeit
df['date'] = df.apply(lambda x: pd.to_datetime(x['timestamp'],unit='ms'),axis=1)
792 ms ± 26.7 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
重写为:
%%timeit
df['date'] = pd.to_datetime(df['date'],unit='ms')
4.88 ms ± 38.6 µs per loop (mean ± std. dev. of 7 runs,100 loops each)
样本数据:
print(df['timestamp'].head())
0 1586863008214
1 1286654914895
2 1436424291218
3 1423512988135
4 1413205308057
Name: timestamp,dtype: int64