问题描述
我无法找到将数据从数据集 spark 写入 s3 的正确方法。我应该添加更多配置。我是否必须在我的代码中提及 AWS 配置,或者它会从本地 .aws/ 配置文件中获取?
请指导
import java.util.Properties;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class sparksqlMysqL {
private static final org.apache.log4j.Logger LOGGER = org.apache.log4j.Logger.getLogger(sparksqlMysqL.class);
private static final SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Spark2JdbcDs")
.getorCreate();
public static void main(String[] args) {
// JDBC connection properties
final Properties connectionProperties = new Properties();
connectionProperties.put("user","root");
connectionProperties.put("password","password");
connectionProperties.put("driver","com.MysqL.jdbc.Driver");
final String dbTable = "(select * from Fielding) t";
final String dbTable1 = "(select * from Salaries) m";
final String dbTable2 = "(select * from pitching) n";
// Load MysqL query result as Dataset
Dataset<Row> jdbcDF2 = sparkSession.read().jdbc("jdbc:MysqL://localhost:3306/lahman2016",dbTable,connectionProperties);
Dataset<Row> jdbcDF3 = sparkSession.read().jdbc("jdbc:MysqL://localhost:3306/lahman2016",dbTable1,connectionProperties);
Dataset<Row> jdbcDF4 = sparkSession.read().jdbc("jdbc:MysqL://localhost:3306/lahman2016",dbTable2,connectionProperties);
jdbcDF2.createOrReplaceTempView("Fielding");
jdbcDF3.createOrReplaceTempView("Salaries");
jdbcDF4.createOrReplaceTempView("pitching");
Dataset<Row> sqlDF = sparkSession.sql(
"select Salaries.yearID,avg(Salaries.salary) as Fielding from Salaries inner join Fielding ON Salaries.yearID = Fielding.yearID AND Salaries.playerID = Fielding.playerID group by Salaries.yearID limit 5");
Dataset<Row> sqlDF1 = sparkSession.sql(
"select Salaries.yearID,avg(Salaries.salary) as pitching from Salaries inner join pitching ON Salaries.yearID = pitching.yearID AND Salaries.playerID = pitching.playerID group by Salaries.yearID limit 5");
// sqlDF.show();
// sqlDF1.show();
sqlDF.createOrReplaceTempView("avg_fielding");
sqlDF1.createOrReplaceTempView("avg_pitching");
Dataset<Row> final_query_1_output = sparkSession.sql(
"select avg_fielding.yearID,avg_fielding.Fielding,avg_pitching.pitching from avg_fielding inner join avg_pitching ON avg_pitching.yearID = avg_fielding.yearID");
final_query_1_output.show();
final_query_1_output.show();
+------+------------------+------------------+
|yearID| Fielding| pitching|
+------+------------------+------------------+
| 1990| 507978.625320787| 485947.2487437186|
| 2003|2216200.9609838845|2133800.1867612293|
| 2007|2633213.0126475547|2617533.3393665156|
| 2015|3996199.5729421354| 3955581.121535181|
| 2006| 2565803.492487479| 2534756.866972477|
+------+------------------+------------------+
我想将此数据集写入 s3:我该怎么做?
final_query_1_output.write().mode("overwrite").save("s3n://druids3migration/data.csv");
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)