问题描述
我需要将增量记录从MySQL中的一组表以Parquet格式加载到Amazon S3。这些表在AWS MySQL托管实例中的多个数据库/方案中通用。该代码应并行复制每个模式(具有一组公共表)中的数据。
我正在使用读取API PySpark SQL连接到MySQL实例并读取模式的每个表的数据,并使用写入API作为Parquet文件将结果数据帧写入S3。我正在为数据库中的每个表循环运行此代码,如以下代码所示:
def load_data_to_s3(databases_df):
db_query_properties = config['mysql-query']
auto_id_values = config['mysql-auto-id-values']
for row in databases_df.collect():
for table in db_query_properties.keys():
last_recorded_id_value = auto_id_values[table]
select_sql = "select * from {}.{} where id>{}".format(row.database_name,table,last_recorded_id_value)
df = spark.read.format("jdbc") \
.option("driver",mysql_db_properties['driver']) \
.option("url",row.database_connection_url) \
.option("dbtable",select_sql) \
.option("user",username) \
.option("password",password) \
.load()
s3_path = 's3a://{}/{}/{}'.format(s3_bucket,database_dir,table)
df.write.parquet(s3_path,mode="append")
我想知道如何将该代码扩展到在EMR集群中并行运行的多个数据库。请给我建议一个合适的方法。让我知道是否需要更多详细信息。
解决方法
我可以提出两种解决方案:
1。简单的方法
一次向您的EMR提交多个作业(每个数据库一个作业)。如果需要监控,则仅将失败日志记录写入S3或HDFS。
2。代码需要更改
您可以尝试使用线程并行化来自每个DB的数据提取。我可以显示一个示例,但是您可能需要做更多更改以适合您的用例。
示例实现:
const logOut = () => {
logout();
};
const authenticatedUser = JSON.parse(localStorage.getItem("user"));
const Header = () => {
if (authenticatedUser !== null) {
return (
<nav>
<NavLink to="/" exact>Dashboard</NavLink>
<a onClick={logOut} href="/login">Logout</a>
</nav>
);
}
return <Redirect to={"/login"} />;
};
export default Header;
此外,请确保使用import threading
def load_data_to_s3(databases_df):
db_query_properties = config['mysql-query']
auto_id_values = config['mysql-auto-id-values']
for row in databases_df.collect():
for table in db_query_properties.keys():
last_recorded_id_value = auto_id_values[table]
select_sql = "select * from {}.{} where id>{}".format(row.database_name,table,last_recorded_id_value)
df = spark.read.format("jdbc") \
.option("driver",mysql_db_properties['driver']) \
.option("url",row.database_connection_url) \
.option("dbtable",select_sql) \
.option("user",username) \
.option("password",password) \
.load()
s3_path = 's3a://{}/{}/{}'.format(s3_bucket,database_dir,table)
df.write.parquet(s3_path,mode="append")
threads = [threading.Thread(target=load_data_to_s3,args=(db) for db in databases_df]
for t in threads:
t.start()
for t in threads:
t.join()
属性将调度程序更改为FAIR。这将为您的每个数据库创建一个线程。如果要控制并行运行的线程数,请相应地修改for循环。
此外,如果要从程序内部创建新作业,请将SparkSession连同参数一起传递。
,您的list_of_databases
未并行化。要进行并行处理,您应该对列表进行并行处理,并使用foreach
或spark给出的方法进行并行处理。
打开EMR中的并发选项并为每个表发送EMR步骤,或者您可以使用Spark的公平调度程序,它可以在内部进行工作,同时对代码进行少量修改。