将 spark Java 的最终数据集输出写入 s3

问题描述

我无法找到将数据从数据集 spark 写入 s3 的正确方法。我应该添加更多配置。我是否必须在我的代码中提及 AWS 配置,或者它会从本地 .aws/ 配置文件获取

请指导

import java.util.Properties;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class sparksqlMysqL {

private static final org.apache.log4j.Logger LOGGER = org.apache.log4j.Logger.getLogger(sparksqlMysqL.class);

private static final SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Spark2JdbcDs")
        .getorCreate();

public static void main(String[] args) {
    // JDBC connection properties

    final Properties connectionProperties = new Properties();
    connectionProperties.put("user","root");
    connectionProperties.put("password","password");
    connectionProperties.put("driver","com.MysqL.jdbc.Driver");
    final String dbTable = "(select * from Fielding) t";
    final String dbTable1 = "(select * from Salaries) m";
    final String dbTable2 = "(select * from pitching) n";

    // Load MysqL query result as Dataset

    Dataset<Row> jdbcDF2 = sparkSession.read().jdbc("jdbc:MysqL://localhost:3306/lahman2016",dbTable,connectionProperties);
    Dataset<Row> jdbcDF3 = sparkSession.read().jdbc("jdbc:MysqL://localhost:3306/lahman2016",dbTable1,connectionProperties);

    Dataset<Row> jdbcDF4 = sparkSession.read().jdbc("jdbc:MysqL://localhost:3306/lahman2016",dbTable2,connectionProperties);

    jdbcDF2.createOrReplaceTempView("Fielding");
    jdbcDF3.createOrReplaceTempView("Salaries");
    jdbcDF4.createOrReplaceTempView("pitching");

    Dataset<Row> sqlDF = sparkSession.sql(
            "select Salaries.yearID,avg(Salaries.salary) as Fielding from Salaries inner join Fielding ON Salaries.yearID = Fielding.yearID  AND Salaries.playerID = Fielding.playerID group by Salaries.yearID limit 5");

    Dataset<Row> sqlDF1 = sparkSession.sql(
            "select Salaries.yearID,avg(Salaries.salary) as pitching from Salaries inner join pitching ON Salaries.yearID = pitching.yearID  AND Salaries.playerID = pitching.playerID group by Salaries.yearID limit 5");

    // sqlDF.show();

    // sqlDF1.show();

    sqlDF.createOrReplaceTempView("avg_fielding");

    sqlDF1.createOrReplaceTempView("avg_pitching");

    Dataset<Row> final_query_1_output = sparkSession.sql(
            "select avg_fielding.yearID,avg_fielding.Fielding,avg_pitching.pitching from avg_fielding inner join  avg_pitching ON avg_pitching.yearID = avg_fielding.yearID");

    final_query_1_output.show();

查询输出是:

 final_query_1_output.show();
+------+------------------+------------------+
|yearID|          Fielding|          pitching|
+------+------------------+------------------+
|  1990|  507978.625320787| 485947.2487437186|
|  2003|2216200.9609838845|2133800.1867612293|
|  2007|2633213.0126475547|2617533.3393665156|
|  2015|3996199.5729421354| 3955581.121535181|
|  2006| 2565803.492487479| 2534756.866972477|
+------+------------------+------------------+      
    

我想将此数据集写入 s3:我该怎么做?

    final_query_1_output.write().mode("overwrite").save("s3n://druids3migration/data.csv");

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)