问题描述
我正在使用python读取Pandas的CSV,修复一些字段,然后将数据逐行写入SQL Server中的表。批量导入在服务器上被禁用-而且,因为最终将有数十个这些文件自动执行文件下载和提取。我可以看到这花了几分钟,但要花HOURS才能完成。
我知道,如果启用了这些功能,我可以在几秒钟内将其批量上传,但这可能是不可能的。
问题是使用python每次运行可能需要1到3个小时。这是不可接受的。我想知道是否有更快的方法来进行此上传。我可以对表格进行一些操作以使其更快地导入,或者采用其他编码方式。
以下是我正在使用的那种代码的示例:
def ingest_glief_reporting_exceptions_csv():
global conn
global cursor
filename = r"20200824-0800-gleif-goldencopy-repex-golden-copy.csv"
# filename = r"repex_1K.csv"
full_filename = os.path.join(raw_data_dir,filename)
sql_str = "exec dbo.util_get_gleif_last_reporting_exception"
cursor.execute(sql_str)
last_lei = ''
for result in cursor.fetchall():
last_lei = result[0]
# "repex" is short for "reporting exceptions",shorten the headers
repex_headers = [
'LEI','ExceptionCategory','ExceptionReason1','ExceptionReason2','ExceptionReason3','ExceptionReason4','ExceptionReason5','ExceptionReference1','ExceptionReference2','ExceptionReference3','ExceptionReference4','ExceptionReference5'
]
df = pd.read_csv(full_filename,header=0,quotechar='"')
# Change to the column headers generated in VBA
df.columns = repex_headers
for colname in df.columns:
df[colname] = df[colname].astype(str)
df[colname] = df[colname].replace({'nan': ''})
place_holder = '?,?'
for i in range(1,len(repex_headers)):
place_holder += ',?'
sql_str = "exec save_gleif_reporting_exception " + place_holder
row_count = 0
row = dict()
do_not_upload = True
if last_lei == '':
do_not_upload = False # There was no last uploaded record,so we can start now
for index,row in df.iterrows():
row_count += 1
if do_not_upload:
if row['LEI'] == last_lei:
do_not_upload = False
continue
else:
continue
values = (
row['LEI'],row['ExceptionCategory'],row['ExceptionReason1'],row['ExceptionReason2'],row['ExceptionReason3'],row['ExceptionReason4'],row['ExceptionReason5'],row['ExceptionReference1'],row['ExceptionReference2'],row['ExceptionReference3'],row['ExceptionReference4'],row['ExceptionReference5'],filename
)
if index % 1000 == 0:
print("Imported %s rows" % (index))
# print(values)
# print("processing row ",row_count)
# return Key is the unique ID the database generated as it inserted this row of data.
error_sql_str = "exec log_message ?,?,?"
connection_failures = 0
connection_failing = True
while connection_failures < 3 and connection_failing:
try:
return_key = cursor.execute(sql_str,values).fetchval()
except pyodbc.OperationalError as e:
connection_failures += 1
connection_failing = True
print("Connection issue. connection failures = ",connection_failures)
time.sleep(30) # wait 30 seconds and go to the top of the loop to try again.
continue
except pyodbc.ProgrammingError as e:
print("Bad field ",values)
error_values = (
'ERROR',__file__,filename,'gleif_reporting_exceptions',row['LEI'],'',str(e)
)
return_key = cursor.execute(error_sql_str,error_values).fetchval()
connection_failures = 0
connection_failures = 0
connection_failing = False
if connection_failures >= 3:
print("Unable to reconnect after 3 tries")
exit(1)
conn.close()
return
我这样打开数据库:
def init_connection(server_name,db_name):
"""
Connect to SQL Server database
:param server_name:
:param db_name:
:return:
"""
pyodbc.pooling = False
try:
conn = pyodbc.connect(
r'DRIVER={ODBC Driver 17 for SQL Server};SERVER=' + server_name + '; \
Database=' + db_name + ';Trusted_Connection=yes;',timeout=5,autocommit=True)
except Exception as e:
print("Unable to connect to database [" + db_name + '] and server [' + server_name + ']')
print(e)
exit(1)
cursor = conn.cursor()
return [conn,cursor]
好的。
该表的定义如下:
CREATE TABLE [dbo].[gleif_exceptions](
[id] [bigint] IDENTITY(1,1) NOT NULL,[ida_last_update_date] [datetime] NULL,[ida_last_update_source_file] [nvarchar](500) NULL,[LEI] [nvarchar](500) NULL,[ExceptionCategory] [nvarchar](500) NULL,[ExceptionReason1] [nvarchar](500) NULL,[ExceptionReason2] [nvarchar](500) NULL,[ExceptionReason3] [nvarchar](500) NULL,[ExceptionReason4] [nvarchar](500) NULL,[ExceptionReason5] [nvarchar](500) NULL,[ExceptionReference1] [nvarchar](500) NULL,[ExceptionReference2] [nvarchar](500) NULL,[ExceptionReference3] [nvarchar](500) NULL,[ExceptionReference4] [nvarchar](500) NULL,[ExceptionReference5] [nvarchar](500) NULL
) ON [PRIMARY]
GO
以下是一些示例数据:
LEI,Exception.Category,Exception.Reason.1,Exception.Reason.2,Exception.Reason.3,Exception.Reason.4,Exception.Reason.5,Exception.Reference.1,Exception.Reference.2,Exception.Reference.3,Exception.Reference.4,Exception.Reference.5
004L5FPTUREIWK9T2N63,DIRECT_ACCOUNTING_CONSOLIDATION_PARENT,NON_CONSOLIDATING,00EHHQ2ZHDCFXJCPCL46,
这是我调用以将记录存储到表中的相应存储过程:
ALTER PROCEDURE [dbo].[save_gleif_reporting_exception]
@LEI [nvarchar] (500) = NULL,@ExceptionCategory [nvarchar] (500) = NULL,@ExceptionReason1 [nvarchar] (500) = NULL,@ExceptionReason2 [nvarchar] (500) = NULL,@ExceptionReason3 [nvarchar] (500) = NULL,@ExceptionReason4 [nvarchar] (500) = NULL,@ExceptionReason5 [nvarchar] (500) = NULL,@ExceptionReference1 [nvarchar] (500) = NULL,@ExceptionReference2 [nvarchar] (500) = NULL,@ExceptionReference3 [nvarchar] (500) = NULL,@ExceptionReference4 [nvarchar] (500) = NULL,@ExceptionReference5 [nvarchar] (500) = NULL,@ida_last_update_source_file [nvarchar] (500) NULL
AS
BEGIN
-- SET NOCOUNT ON added to prevent extra result sets from
-- interfering with SELECT statements.
SET NOCOUNT ON;
-- Insert statements for procedure here
INSERT INTO dbo.gleif_reporting_exceptions(
[LEI],[ExceptionCategory],[ExceptionReason1],[ExceptionReason2],[ExceptionReason3],[ExceptionReason4],[ExceptionReason5],[ExceptionReference1],[ExceptionReference2],[ExceptionReference3],[ExceptionReference4],[ExceptionReference5],[ida_last_update_date],[ida_last_update_source_file]
)
VALUES (
@LEI,@ExceptionCategory,@ExceptionReason1,@ExceptionReason2,@ExceptionReason3,@ExceptionReason4,@ExceptionReason5,@ExceptionReference1,@ExceptionReference2,@ExceptionReference3,@ExceptionReference4,@ExceptionReference5,GETDATE(),@ida_last_update_source_file
)
SELECT @@IDENTITY
END
注1:尽管我将string声明为nvarchar(500),但是它们中的大多数都没有那么长。我认为没关系。我尝试使用较短的字符串定义,但运行该例程仍然需要很长时间。
注2:到目前为止,这只是7个例子之一。最小的桌子大约有几十个K行,甚至几百万个。列数在7到约230之间变化。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)