问题描述
我正在尝试通过在 Azure Databricks 上启动 Python 脚本以增量方式将 Salesforce 数据加载到 Azure sql 数据库。
由于我无法在 Azure Databricks 中安装 Devart ODBC,我尝试使用 simple_salesforce 从 salesforce 获取数据:
import pandas as pd
import pyodbc
from simple_salesforce import Salesforce,SalesforceLogin,SFType
from sqlalchemy.types import Integer,Text,String,DateTime
from sqlalchemy import create_engine
import urllib
sf = Salesforce(password = password,username=username,security_token=jeton)
rep_qr = "SELECT SOMETHING FROM Account WHERE CONDITION"
soql = prep_qr.format(','.join(field_names))
results = sf.query_all(soql)['records']
我得到以下结果(示例):
[OrderedDict([('attributes',OrderedDict([('type','Account'),('url','/services/data/v42.0/sobjects/Account/0014K000009aoU3QAI')])),('Id',XY1),(Name,Y),(Date,2020-11-24T09:16:17.000+0000)])]
然后我将输出转换为熊猫数据框:
results = pd.DataFrame(sf.query_all(soql)['records'])
results.drop(columns=['attributes'],inplace=True) #to keep only the columns
我得到了这样的东西(只是一个例子):
Id | 名称 | 日期 |
---|---|---|
XY1 | Y | 2020-11-24T09:16:17.000+0000 |
为了将这些数据摄取到 Azure sql 数据库中,我使用“sqlalchemy”将 Dataframe 转换为 sql,然后 pyodbc 将负责插入到目标(Azure sql 数据库)的部分,如下所示:>
df = pd.DataFrame(results)
df.reset_index(drop=True,inplace=True) #just to remove the index from dataframe
#Creating the engine from and pyodbc which is connected to Azure sql Database:
params = urllib.parse.quote_plus \
(r'DRIVER={ODBC Driver 17 for sql Server};SERVER=' + server + ';DATABASE=' + database + ';UID=' + username + ';PWD=' + password)
conn_str = 'mssql+pyodbc:///?odbc_connect={}'.format(params)
engine_azure = create_engine(conn_str,echo=True)
df.to_sql('account',engine_azure,if_exists='append',index=False)
但我收到以下错误:
sqlalchemy.exc.DataError: (pyodbc.DataError) ('22007','[22007] [Microsoft][ODBC Driver 17 for sql Server][sql Server]Conversion Failed when converting date and/or time from character string. (241) (sqlExecDirectW)')
我认为问题在于库 simple_salesforce 以这种格式显示日期/时间:
2020-11-24T09:16:17.000+0000
2020-11-24T09:16:17.000
这里的问题是我正在动态加载表(我什至不知道我正在加载的表和列)我无法转换这些数据类型的原因,我需要一种方法来传递数据类型自动转换为pyodbc。
你有什么可以推荐的吗?
谢谢,
解决方法
如果日期/时间值始终以 2020-11-24T11:22:33.000+0000
形式的字符串形式返回,那么您可以使用 pandas 的 .apply()
方法将字符串转换为 SQL Server 将使用的 2020-11-24 11:22:33.000
格式接受:
df = pd.DataFrame(
[
(1,"2020-11-24T11:22:33.000+0000"),(2,None),(3,"2020-11-24T12:13:14.000+0000"),],columns=["id","dtm"],)
print(df)
"""console output:
id dtm
0 1 2020-11-24T11:22:33.000+0000
1 2 None
2 3 2020-11-24T12:13:14.000+0000
"""
df["dtm"] = df["dtm"].apply(lambda x: x[:23].replace("T"," ") if x else None)
print(df)
"""console output:
id dtm
0 1 2020-11-24 11:22:33.000
1 2 None
2 3 2020-11-24 12:13:14.000
"""
df.to_sql(
table_name,engine,index=False,if_exists="append",)
with engine.begin() as conn:
pprint(conn.execute(sa.text(f"SELECT * FROM {table_name}")).fetchall())
"""console output:
[(1,datetime.datetime(2020,11,24,22,33)),12,13,14))]
"""