尝试将等待与并发期货一起使用以附加到数据帧并在大约 90% 的时间内获得关键错误

问题描述

我经常遇到一个关于 futures[x].result() 的关键错误,我不确定我做错了什么

也许我没有正确使用等待?

from concurrent.futures import wait,ALL_COMPLETED

import concurrent.futures
import datetime
from datetime import timedelta
import yfinance as yf
import pandas as pd

pool = concurrent.futures.ThreadPoolExecutor(8)

end = datetime.date.today()
start = end - timedelta(weeks=104)

stocks = ['GOOG','CSCO']


def dl(stock):
    return yf.download(stock,start=start,end=end).iloc[:,:5].dropna(axis=0,how='any')


futures = [pool.submit(dl,args) for args in stocks]
wait(futures,return_when=ALL_COMPLETED)

stocks_data = pd.DataFrame()
for x in range(0,len(stocks)):
    prices = pd.DataFrame(futures[x].result())
    prices['Symbol'] = stocks[x]
    stocks_data = pd.concat([stocks_data,prices])

print(stocks_data)

这是我得到的(堆栈)错误

除了这个错误,我真的没有更多的细节要添加,但它要求我指定额外的注释,所以我在这里填充空间以便我可以提供堆栈跟踪

KeyError                                  Traceback (most recent call last)
<ipython-input-160-3da302790b49> in <module>
     24 stocks_data = pd.DataFrame()
     25 for x in range(0,len(stocks)):
---> 26     prices = pd.DataFrame(futures[x].result())
     27     prices['Symbol'] = stocks[x]
     28     stocks_data = pd.concat([stocks_data,prices])

/usr/lib/python3.6/concurrent/futures/_base.py in result(self,timeout)
    423                 raise CancelledError()
    424             elif self._state == FINISHED:
--> 425                 return self.__get_result()
    426 
    427             self._condition.wait(timeout)

/usr/lib/python3.6/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

/usr/lib/python3.6/concurrent/futures/thread.py in run(self)
     54 
     55         try:
---> 56             result = self.fn(*self.args,**self.kwargs)
     57         except BaseException as exc:
     58             self.future.set_exception(exc)

<ipython-input-160-3da302790b49> in dl(stock)
     16 
     17 def dl(stock):
---> 18     return yf.download(stock,how='any')
     19 
     20 

/usr/local/lib/python3.6/dist-packages/yfinance/multi.py in download(tickers,start,end,actions,threads,group_by,auto_adjust,back_adjust,progress,period,interval,prepost,proxy,rounding,**kwargs)
    117 
    118     if len(tickers) == 1:
--> 119         return shared._DFS[tickers[0]]
    120 
    121     try:

KeyError: 'CSCO'

解决方法

看起来线程很糟糕(我读到我应该在某处使用进程而不是线程)

from concurrent.futures import wait,ALL_COMPLETED

import concurrent.futures
import datetime
from datetime import timedelta
import yfinance as yf
import pandas as pd

#pool = concurrent.futures.ThreadPoolExecutor(8)
pool = concurrent.futures.ProcessPoolExecutor()

end = datetime.date.today()
start = end - timedelta(weeks=104)

stocks = ['GOOG','CSCO']


def dl(stock):
    return yf.download(stock,start=start,end=end).iloc[:,:5].dropna(axis=0,how='any')

futures = [pool.submit(dl,args) for args in stocks]
wait(futures,timeout=None,return_when=ALL_COMPLETED)

stocks_data = pd.DataFrame()
for x in range(0,len(stocks)):
    prices = pd.DataFrame(futures[x].result())
    prices['Symbol'] = stocks[x]
    stocks_data = pd.concat([stocks_data,prices])

print(stocks_data)