在PySpark SQL中并行执行读写API调用

问题描述

我需要将增量记录从MySQL中的一组表以Parquet格式加载到Amazon S3。这些表在AWS MySQL托管实例中的多个数据库/方案中通用。该代码应并行复制每个模式(具有一组公共表)中的数据。

我正在使用读取API PySpark SQL连接到MySQL实例并读取模式的每个表的数据,并使用写入API作为Parquet文件将结果数据帧写入S3。我正在为数据库中的每个表循环运行此代码,如以下代码所示:

def load_data_to_s3(databases_df):
    db_query_properties = config['mysql-query']
    auto_id_values = config['mysql-auto-id-values']
    for row in databases_df.collect():
        for table in db_query_properties.keys():
            last_recorded_id_value = auto_id_values[table]
            select_sql = "select * from {}.{} where id>{}".format(row.database_name,table,last_recorded_id_value)
            df = spark.read.format("jdbc") \
                    .option("driver",mysql_db_properties['driver']) \
                    .option("url",row.database_connection_url) \
                    .option("dbtable",select_sql) \
                    .option("user",username) \
                    .option("password",password) \
                    .load()
            s3_path = 's3a://{}/{}/{}'.format(s3_bucket,database_dir,table)
            df.write.parquet(s3_path,mode="append") 

我想知道如何将该代码扩展到在EMR集群中并行运行的多个数据库。请给我建议一个合适的方法。让我知道是否需要更多详细信息。

解决方法

我可以提出两种解决方案:

1。简单的方法

一次向您的EMR提交多个作业(每个数据库一个作业)。如果需要监控,则仅将失败日志记录写入S3或HDFS。

2。代码需要更改

您可以尝试使用线程并行化来自每个DB的数据提取。我可以显示一个示例,但是您可能需要做更多更改以适合您的用例。

示例实现:

    const logOut = () => {
          logout();
        };
        const authenticatedUser = JSON.parse(localStorage.getItem("user"));
        const Header = () => {
          if (authenticatedUser !== null) {
            return (
              <nav>
                <NavLink to="/" exact>Dashboard</NavLink>
                <a onClick={logOut} href="/login">Logout</a>
              </nav>
            );
          }
          return <Redirect to={"/login"} />;
        };
        
        export default Header;

此外,请确保使用import threading def load_data_to_s3(databases_df): db_query_properties = config['mysql-query'] auto_id_values = config['mysql-auto-id-values'] for row in databases_df.collect(): for table in db_query_properties.keys(): last_recorded_id_value = auto_id_values[table] select_sql = "select * from {}.{} where id>{}".format(row.database_name,table,last_recorded_id_value) df = spark.read.format("jdbc") \ .option("driver",mysql_db_properties['driver']) \ .option("url",row.database_connection_url) \ .option("dbtable",select_sql) \ .option("user",username) \ .option("password",password) \ .load() s3_path = 's3a://{}/{}/{}'.format(s3_bucket,database_dir,table) df.write.parquet(s3_path,mode="append") threads = [threading.Thread(target=load_data_to_s3,args=(db) for db in databases_df] for t in threads: t.start() for t in threads: t.join() 属性将调度程序更改为FAIR。这将为您的每个数据库创建一个线程。如果要控制并行运行的线程数,请相应地修改for循环。

此外,如果要从程序内部创建新作业,请将SparkSession连同参数一起传递。

,

您的list_of_databases未并行化。要进行并行处理,您应该对列表进行并行处理,并使用foreach或spark给出的方法进行并行处理。


打开EMR中的并发选项并为每个表发送EMR步骤,或者您可以使用Spark的公平调度程序,它可以在内部进行工作,同时对代码进行少量修改。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...