问题描述
我正在尝试从 Binance API 检索加密货币历史数据并将其存储到我的 postgresql 数据库中。 这是我的脚本的样子:
import config
from binance.client import Client
import psycopg2
import psycopg2.extras
from tqdm import tqdm
import concurrent.futures
client = Client(config.BINANCE_API_KEY,config.BINANCE_API_SECRET)
connection = psycopg2.connect(
host=config.DB_HOST,database=config.DB_NAME,user=config.DB_USER,password=config.DB_PASSWORD)
cursor = connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
cursor.execute(
"SELECT concat(symbol,quote_asset) as ticker FROM currency")
tickers = cursor.fetchall()
coin_data = []
for ticker in tickers:
coin_data.append(ticker)
def unixtimetodatetime(unixtime):
return unixtime.format("'YYYY-MM-DD HH:mm:ss'")
def retrieve_klines(ticker_symbol):
minute_prices = []
for ticker in tqdm(coin_data):
klines = client.get_historical_klines(
f"{coin_data.get(ticker)}",Client.KLINE_INTERVAL_1MINUTE,"4 May 2018","21 May,2021")
for coin in klines:
coin[0] = unixtimetodatetime(coin[0])
coin[6] = ticker[1]
minute_prices.append(coin[0:7])
for row in minute_prices:
sql_insert = f"""INSERT INTO currency_price (dt,open,high,low,close,volume,currency_id) VALUES ({row[0]},{row[1]},{row[2]},{row[3]},{row[4]},{row[5]},(SELECT id FROM currency WHERE symbol = '{row[6]}')) ON CONFLICT (dt,currency_id) DO nothing"""
cursor.execute(sql_insert)
connection.commit()
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(retrieve_klines,coin_data)
现在的问题是,当使用 ThreadPoolExecutor 时,脚本会在所有线程启动后立即完成,而没有实际检索任何数据。 我已经阅读了 concurrent.futures 文档,它指出,如果我将上下文管理器与执行程序一起使用,则不需要指明“等待”参数。 在数据被检索并存储到我的数据库之前,保持程序运行的解决方案是什么?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)