Spark JDBC - 读取 -> 更新 -> 写入没有主键的大表

问题描述

我正在尝试更新大型 MysqL 表(接近 500 百万行)的每一行的几个字段。该表没有任何主键(或具有像 UUID 这样的字符串主键)。我没有足够的执行程序内存来​​一次性读取和保存整个数据。任何人都可以让我知道我有哪些选择来处理这些表格。

下面是架构

CREATE TABLE Persons ( Personid varchar(255) NOT NULL,LastName varchar(255) NOT NULL,FirstName varchar(255) DEFAULT NULL,Email varchar(255) DEFAULT NULL,Age int(11) DEFAULT NULL) ) ENGINE=InnoDB DEFAULT CHARSET=latin1;

Spark 代码就像

 SparkSession spark = SparkSession.builder().master("spark://localhost:7077").appName("KMASK").getorCreate();
DataFrame rawDataFrame = spark.read().format("jdbc").load();
rawDataFrame.createOrReplaceTempView("data");
//encrypt is UDF
String sql = "select Personid,LastName,FirstName,encrypt(Email),Age from data";
Dataset newData = spark.sql(sql);
newData.write().mode(SaveMode.Overwrite).format("jdbc").options(options).save();

该表有大约 150 万条记录,数据大小大约为 6GB。我的执行者内存只有 2 gb。我可以使用 Spark - jdbc 处理这个表吗。

解决方法

理想情况下,您可以更改 spark jdbc fetchsize 选项以减少/增加每次获取和处理的记录数。

对数据进行分区还有助于减少洗牌和额外开销。由于您将 Age 作为数字字段。您还可以处理由 Age 确定的分区中的数据。首先确定最小和最大年龄并使用 Spark JDBC Options

值得注意的是:

  • partitionColumn : Age
  • lowerBound :您确定的最低年龄
  • upperBound :您确定的最大年龄
  • numPartitions:真正依赖于内核和工作节点的数量,但更多hints and links are here

您还可以使用自定义查询来仅选择和更新可以使用 query 选项保存在内存中的几条记录。注意。使用 query 选项时不应使用 dbtable 选项。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...