问题描述
我正在从一个网站收集实时数据,该网站在 R 中填充数据框。 这些行可以具有相同的唯一 ID,或者可以引入新行。 我想将动态数据帧发送到 MariaDB 数据库表,其中具有现有唯一 ID 的行更新我指定的列,没有现有唯一 ID 的行作为新行插入表中。 我可以让它与 MariaDB INSERT ON DUPLICATE KEY UPDATE 语句以及一个从动态数据框生成所需值的函数一起使用。
MWE:
install.packages("odbc")
insall.packages("RMariaDB")
library(odbc)
library(RMariaDB)
con <- dbConnect(RMariaDB::MariaDB(),host = Sys.getenv('MARIADB_DBHOST'),port = Sys.getenv('MARIADB_DBPORT'),user = Sys.getenv('MARIADB_DBUSER'),password = Sys.getenv('MARIADB_DBPW'),db = Sys.getenv('MARIADB_dbnAME'),timeout = 5)
# Database table for mwe to work.
db_live <- data.frame(id = c(12,22,32),car_name = c("rolls royce","nissan","mercedes benz"),km = c(123,100,150),temp = c(78,60,80))
# Get table from database,id column is unique index.
db_live <- dbReadTable(con,"db_live")
print(db_live)
id car_name km temp
1 12 rols royce 123 78
2 22 nissan 100 60
3 32 mercedes benz 150 80
# Build dynamic dataframe
df_live <- data.frame(id = c(12,32,42),"mercedes benz","aston martin"),km = c(140,120,200,40),temp = c(81,65,85,50))
print(df_live)
id car_name km temp
1 12 rols royce 140 81
2 22 nissan 120 65
3 32 mercedes benz 200 85
4 42 aston martin 40 50
# create function that generates a string with values for dbSendQuery.
gen_insert_values <- function(df) {
for(i in 1:nrow(df)) {
row_string <- paste(df[i,1],paste0("'",df[i,2],"'"),3],4],collapse = ",")
if(exists("df_string")) {
df_string <- paste0(df_string,",paste0("(",row_string,")"))
} else {
df_string <- paste0("(",")")
}
}
df_string
}
values <- gen_insert_values(df_live)
print(values)
"(12 'rolls royce' 140 81),(22 'nissan' 120 65),(32 'mercedes benz' 200 85),(42 'aston martin' 40 50)"
# Send query.
res <- dbSendQuery(con,paste0("INSERT INTO db_live (id,car_name,km,temp) VALUES ",values," ON DUPLICATE KEY UPDATE km = VALUES(km),temp = VALUES(temp);"))
dbClearResult(res)
#Check db table after sent query.
new_db_live <- dbReadTable(con,"db_live")
print(new_db_live)
id car_name km temp
1 12 rolls royce 140 81
2 22 nissan 120 65
3 32 mercedes benz 200 85
4 42 aston martin 40 50
这似乎不是很有效,因为我必须更改查询和函数,以防我想更新更多列,而且我在我的函数中包含了一个 for 循环,这可能会导致脚本变慢。
解决方法
这里有一种可能更有效的方法:使用临时表而不是手动将数据编码为一串 (a,b,c),(a,c)
数据集。
为了完整演示,我稍微修改了 df_live
数据,这样我们有一行没有变化,一行有更新的数据,还有一行是新的。此过程也适用于您原来的 df_live
,我只是想突出显示三种模式。
不过,从技术上讲,“无更改”行确实更新了数据库,但并不明显。如果该表有一个“lastmodified”字段,该字段在行中的某些内容更新时使用当前时间戳进行更新,那么您可以看到更多正在发生的事情。
事实上,我将添加(仅用于演示)两个字段:created
和 modified
,它们显示第一次创建行的时间和最后一次更新的时间。普通 UPSERT 不需要这些。
设置
这部分不是必需的,除非表上没有主键(在这种情况下,添加一个)。
我将主表命名为 "mydata"
,并将 db_live
数据集上传到其中。我相信(没有进行大量测试)MariaDB 需要 UPSERT 根据预先存在的键来查找重复或冲突的行。这意味着我们需要设置一个(主)键;我会假设您的表已经有这个(并展示我如何使用手动上传的数据来做到这一点)。
db <- DBI::dbConnect(RMariaDB::MariaDB(),...)
db_live <- data.frame(id = c(12,22,32),car_name = c("rolls royce","nissan","mercedes benz"),km = c(123,100,150),temp = c(78,60,80))
df_live <- data.frame(id = c(12,42),"aston martin"),km = c(140,120,40),temp = c(81,65,50))
df_live
# id car_name km temp
# 1 12 rolls royce 140 81 # updated
# 2 22 nissan 100 60 # no change
# 3 42 aston martin 40 50 # new data
DBI::dbWriteTable(db,"mydata",db_live)
DBI::dbExecute(db,"alter table mydata add primary key (id)")
# [1] 0
DBI::dbExecute(db,"
alter table mydata
add column created timestamp not null default CURRENT_TIMESTAMP,add column modified timestamp null default null")
# [1] 0
DBI::dbExecute(db,"
create trigger updatemodified_mydata
before update on mydata
for each row set NEW.modified = CURRENT_TIMESTAMP")
# [1] 0
DBI::dbGetQuery(db,"select * from mydata")
# id car_name km temp created modified
# 1 12 rolls royce 123 78 2021-06-29 19:11:00 <NA>
# 2 22 nissan 100 60 2021-06-29 19:11:00 <NA>
# 3 32 mercedes benz 150 80 2021-06-29 19:11:00 <NA>
如果主表 primary key
上没有 mydata
,则“UPSERT”操作将简单地插入(添加)所有行而不更新。我不知道是否有办法诱使 mariadb 伪造密钥,以便正确触发您预期的“如果存在则更新”逻辑。
UPSERT
我们将使用一个临时表,以便要更新的数据不会持久存在;这有几个好处,如果你做对了,那么你的 DBA 会感谢你 :-)
(如果您不熟悉临时表......它们对数据库上的其他用户不可见,通常对同一用户的不同连接不可见,并且在连接关闭时将被删除。)
DBI::dbCreateTable(db,"mytemp",df_live,temporary = TRUE)
DBI::dbWriteTable(db,append = TRUE,create = FALSE)
DBI::dbGetQuery(db,"select * from mytemp")
# id car_name km temp
# 1 12 rolls royce 140 81
# 2 22 nissan 100 60
# 3 42 aston martin 40 50
DBI::dbExecute(db,"
insert into mydata (id,car_name,km,temp)
select id,temp from mytemp
on duplicate key update km=VALUES(km),temp=VALUES(temp);")
# [1] 5
DBI::dbGetQuery(db,"select * from mydata")
# id car_name km temp created modified
# 1 12 rolls royce 140 81 2021-06-29 19:11:00 2021-06-29 19:11:27
# 2 22 nissan 100 60 2021-06-29 19:11:00 2021-06-29 19:11:27
# 3 32 mercedes benz 150 80 2021-06-29 19:11:00 <NA>
# 4 42 aston martin 40 50 2021-06-29 19:11:27 <NA>
如果您注意到,即使 "nissan"
的值没有不同,据 modified
时间戳证明,该行仍然据称已更新。我们拥有的“更改”行 "rolls royce"
显示了相应的 modified
时间。 mercedes benz
是第一次上传没有更新,aston martin
是第二次更新,所以它的created
时间和其他的不一样。
复制
我使用 mariadb:latest
泊坞窗图像完成了此操作。下面的这些步骤纯粹是为了演示,并不是作为管理数据库(为了安全或性能)的规范方式提供的。是的,我正在连接到 "mysql"
数据库,这不是用户数据应该去的地方,我相信......太仓促了,请原谅。
$ docker pull mariadb:latest
Using default tag: latest
latest: Pulling from library/mariadb
c549ccf8d472: Pull complete
26ea6552a462: Pull complete
329b1f41043f: Pull complete
9f8d09317d80: Pull complete
2bc055a5511d: Pull complete
e989e430508e: Pull complete
cdba2af19f87: Pull complete
04fe4f90eab8: Pull complete
389c6b423e31: Pull complete
bef640655d86: Pull complete
Digest: sha256:0c72b63198ac53df4e84db821876c73794b00509b2d8a77100d186a13e49ac31
Status: Downloaded newer image for mariadb:latest
docker.io/library/mariadb:latest
$ docker run -p 127.0.0.1:3306:3306 --name some-mariadb \
-e MARIADB_ROOT_PASSWORD=mysecretpw -e MARIADB_DATABASE=mydb -d mariadb:latest
在 R 中,连接很简单:
db <- DBI::dbConnect(RMariaDB::MariaDB(),host="127.0.0.1",port=3306,username="root",password="mysecretpw",dbname="mydb")
DBI::dbGetQuery(db,"select version() as dbver")
# dbver
# 1 10.5.11-MariaDB-1:10.5.11+maria~focal