问题描述
我构建了一个脚本,该脚本非常适合小型数据集(
我将在脚本中共享几个命令作为示例。在每个示例中,数据集都是10到1500万行和10到15列。
- 获取由9个变量分组的数据框的最低日期
dataframe %>%
group_by(key_a,key_b,key_c,key_d,key_e,key_f,key_g,key_h,key_i) %>%
summarize(min_date = min(date)) %>%
ungroup()
- 对两个数据框进行左连接以添加额外的列
merge(dataframe,dataframe_two,by = c("key_a","key_b","key_c","key_d","key_e","key_f","key_g","key_h","key_i"),all.x = T) %>%
as_tibble()
- 在closest date上加入两个数据框
dataframe %>%
left_join(dataframe_two,by = "key_a") %>%
group_by(key_a,date.x) %>%
summarise(key_z = key_z[which.min(abs(date.x - date.y))]) %>%
arrange(date.x) %>%
rename(day = date.x)
我可以应用哪些最佳实践,特别是我该怎么做才能针对大型数据集优化这些类型的函数?
-
这是示例数据集
set.seed(1010)
library("conflicted")
conflict_prefer("days","lubridate")
bigint <- rep(
sample(1238794320934:19082323109,1*10^7)
)
key_a <-
rep(c("green","blue","orange"),1*10^7/2)
key_b <-
rep(c("yellow","purple","red"),1*10^7/2)
key_c <-
rep(c("hazel","pink","lilac"),1*10^7/2)
key_d <-
rep(c("A","B","C"),1*10^7/2)
key_e <-
rep(c("D","E","F","G","H","I"),1*10^7/5)
key_f <-
rep(c("Z","M","Q","T","X","B"),1*10^7/5)
key_g <-
rep(c("Z",1*10^7/5)
key_h <-
rep(c("tree","plant","animal","forest"),1*10^7/3)
key_i <-
rep(c("up","up","left","right","right"),1*10^7/5)
sequence <-
seq(ymd("2010-01-01"),ymd("2020-01-01"),by = "1 day")
date_sequence <-
rep(sequence,1*10^7/(length(sequence) - 1))
dataframe <-
data.frame(
bigint,date = date_sequence[1:(1*10^7)],key_a = key_a[1:(1*10^7)],key_b = key_b[1:(1*10^7)],key_c = key_c[1:(1*10^7)],key_d = key_d[1:(1*10^7)],key_e = key_e[1:(1*10^7)],key_f = key_f[1:(1*10^7)],key_g = key_g[1:(1*10^7)],key_h = key_h[1:(1*10^7)],key_i = key_i[1:(1*10^7)]
)
dataframe_two <-
dataframe %>%
mutate(date_sequence = ymd(date_sequence) + days(1))
sequence_sixdays <-
seq(ymd("2010-01-01"),by = "6 days")
date_sequence <-
rep(sequence_sixdays,3*10^6/(length(sequence_sixdays) - 1))
key_z <-
sample(1:10000000,3*10^6)
dataframe_three <-
data.frame(
key_a = sample(key_a,3*10^6),date = date_sequence[1:(3*10^6)],key_z = key_z[1:(3*10^6)]
)
解决方法
我可以应用哪些最佳实践,特别是我该怎么做才能针对大型数据集优化这些类型的函数?
使用data.table
包
library(data.table)
d1 = as.data.table(dataframe)
d2 = as.data.table(dataframe_two)
1
按许多列分组是data.table在
方面的出色表现
请参阅第二个图最底部的条形图,以与dplyr spark和其他类似的分组进行比较
https://h2oai.github.io/db-benchmark
by_cols = paste("key",c("a","b","c","d","e","f","g","h","i"),sep="_")
a1 = d1[,.(min_date = min(date_sequence)),by=by_cols]
请注意,我将date
更改为date_sequence
,我想您的意思是作为列名
2
尚不清楚要合并表的字段,dataframe_two没有指定的字段,因此查询无效
请澄清
3
data.table具有一种非常有用的联接类型,称为滚动联接,它可以完全满足您的需求
a3 = d2[d1,on=c("key_a","date_sequence"),roll="nearest"]
# Error in vecseq(f__,len__,if (allow.cartesian || notjoin || #!anyDuplicated(f__,:
# Join results in more than 2^31 rows (internal vecseq reached #physical limit). Very likely misspecified join. Check for #duplicate key values in i each of which join to the same group in #x over and over again. If that's ok,try by=.EACHI to run j for #each group to avoid the large allocation. Otherwise,please search #for this error message in the FAQ,Wiki,Stack Overflow and #data.table issue tracker for advice.
它导致错误。错误实际上非常有用。在您的真实数据上,它可能工作得很好,因为错误(匹配行的基数)背后的原因可能与生成样本数据的过程有关。具有良好的虚拟数据进行连接非常棘手。
如果您在真实数据上遇到相同的错误,则可能需要查看该查询的设计,因为它试图通过进行多对多联接来使行爆炸。即使仅考虑了单个date_sequence
身份(考虑了roll
)也是如此。我认为这种问题不适用于该数据(严格来说是联接字段的基准)。您可能需要在工作流程中引入数据质量检查层,以确保在key_a
和date_sequence
上没有重复项。
扩展@jangorecki的答案。
数据:
library(lubridate)
library(dplyr)
library(conflicted)
library(data.table)
dataframe = data.frame(bigint,date_sequence = date_sequence[1:(1*10^7)],key_a = key_a[1:(1*10^7)],key_b = key_b[1:(1*10^7)],key_c = key_c[1:(1*10^7)],key_d = key_d[1:(1*10^7)],key_e = key_e[1:(1*10^7)],key_f = key_f[1:(1*10^7)],key_g = key_g[1:(1*10^7)],key_h = key_h[1:(1*10^7)],key_i = key_i[1:(1*10^7)])
dataframe_two = dataframe %>% mutate(date_sequence1 = ymd(date_sequence) + days(1))
dataframe_two$date_sequence = NULL
基准:
1。
dplyr 2次运行的结果:2.2639秒; 2.2205秒st = Sys.time()
a1 = dataframe %>%
group_by(key_a,key_b,key_c,key_d,key_e,key_f,key_g,key_h,key_i) %>%
summarize(min_date = min(date_sequence)) %>% ungroup()
Sys.time() - st
data.table结果:1.0987秒; 0.9825秒
setDT(dataframe)
by_cols = paste("key",sep="_")
st = Sys.time()
a2 = dataframe[,by=by_cols]
Sys.time() - st
2。
dplyr
setDF(dataframe)
st = Sys.time()
df3 = merge(dataframe,dataframe_two,by = c("key_a","key_b","key_c","key_d","key_e","key_f","key_g","key_h","key_i"),all.x = T) %>% as_tibble()
Sys.time() - st
# Error in merge.data.frame(dataframe,:
# negative length vectors are not allowed
data.table
setDT(dataframe)
setDT(dataframe_two)
st = Sys.time()
df3 = merge(dataframe,all.x = T)
Sys.time() - st
# Error in vecseq(f__,if (allow.cartesian || notjoin || !anyDuplicated(f__,# :
# Join results in more than 2^31 rows (internal vecseq reached physical limit).
# Very likely misspecified join. Check for duplicate key values in i each of which
# join to the same group in x over and over again. If that's ok,try by=.EACHI to
# run j for each group to avoid the large allocation. Otherwise,please search for
# this error message in the FAQ,Stack Overflow and data.table issue tracker
# for advice.
此错误很有帮助,并运行以下命令:
uniqueN(dataframe_two,"key_i"))
给予
12
当我处理包含大约1000万行和15列的数据集时,我在合并之前将字符串转换为因子,并发现性能大约从5%提高了。内部连接的时间为30秒到10秒。令我惊讶的是,在这种特殊情况下,setkey()的效果不如将字符串转换为因数。
编辑:可重现的data.table示例,它以3种方式(在字符列,设置键,字符串转换为因子)上合并
创建表格:
x = 1e6
ids = x:(2*x-1)
chrs = rep(LETTERS[1:10],x)
quant_1 = sample(ids,x,replace = T)
quant_2 = sample(ids,replace = T)
ids_c = paste0(chrs,as.character(ids))
dt1 = data.table(unique(ids_c),quant_1)
dt2 = data.table(unique(ids_c),quant_2)
(i)在字符列上
system.time({result_chr = merge(dt1,dt2,by = 'V1')})
# user system elapsed
# 10.66 5.18 18.64
(ii)使用setkey
system.time(setkey(dt1,V1))
# user system elapsed
# 3.37 1.55 5.66
system.time(setkey(dt2,V1))
# user system elapsed
# 3.42 1.67 5.85
system.time({result_setkey = merge(dt1,by = 'V1')})
# user system elapsed
# 0.17 0.00 0.16
(iii)因素字符串
dt3 = data.table(unique(ids_c),quant_1)
dt4 = data.table(unique(ids_c),quant_2)
system.time({dt3[,V1 := as.factor(V1)]})
# user system elapsed
# 8.16 0.00 8.20
system.time({dt4[,V1 := as.factor(V1)]})
# user system elapsed
# 8.04 0.00 8.06
system.time({result_fac = merge(dt3,dt4,by = 'V1')})
# user system elapsed
# 0.32 0.01 0.28
在这种情况下,setkey总体上最快,总计11.67秒。但是,如果使用字符串将数据提取为真,则无需使用setkey。
示例2::如果您的数据进入一个文件中,并且行之间用属性隔开,例如日期,那么您需要先将其分开,然后再进行联接。
数据:
dt5 = data.table(date = '202009',id = unique(ids_c),quant = quant_1)
dt6 = data.table(date = '202010',quant = quant_2)
# Original data comes combined
dt = rbindlist(list(dt5,dt6))
(i)设置键
system.time(setkey(dt,id))
# user system elapsed
# 5.78 3.39 10.78
dt5 = dt[date == '202009']
dt6 = dt[date == '202010']
system.time({result_setkey = merge(dt5,dt6,by = 'id')})
# user system elapsed
# 0.17 0.00 0.17
(ii)字符串作为因素
dt5 = data.table(date = '202009',quant = quant_2)
dt = rbindlist(list(dt5,dt6))
system.time({dt[,id := as.factor(id)]})
# user system elapsed
# 8.17 0.00 8.20
dt5 = dt[date == '202009']
dt6 = dt[date == '202010']
system.time({result_fac = merge(dt5,by = 'id')})
# user system elapsed
# 0.34 0.00 0.33
在这种情况下,字符串到因子的速度比10.95的更快,为8.53秒。但是,在创建表ids_c = sample(ids_c,replace = F)
之前改组键时,setkey的执行速度快2倍。
此外,请注意,并非data.table中的每个函数都比基本函数的组合要快。例如:
# data.table
system.time(uniqueN(ids_c))
# user system elapsed
# 10.63 4.21 16.88
# base R
system.time(length(unique(ids_c)))
# user system elapsed
# 0.78 0.08 0.94
重要的是要注意,uniqueN()消耗的内存少4倍,因此如果RAM大小受限制,则最好使用。我已经为这个火焰图使用了profvis软件包(来自与上面不同的运行):
最后,如果使用大于RAM的数据集,请查看disk.frame。
,默认情况下,R处理内存中的数据。当数据变得非常大时,R可能会抛出内存不足错误,或者根据您的设置,使用页面文件(see here),但是页面文件会很慢,因为它涉及到磁盘的读写操作。
1。批处理
仅从计算角度来看,您可以通过批量处理来发现改进。您的示例包括对数据集进行汇总,因此大概您的汇总数据集要比输入数据小得多(如果没有,则值得考虑使用其他方法来生成相同的最终数据集)。这意味着您可以按分组变量进行批处理。
我通常通过对数字索引取模来实现:
num_batches = 50
output = list()
for(i in 0:(num_batches-1)){
subset = df %>% filter(numeric_key %% num_batches == i)
this_summary = subset %>%
group_by(numeric_key,other_keys) %>%
summarise(result = min(col)
output[[i]] = this_summary
}
final_output = bind_rows(output)
您可以为基于文本的键开发类似的方法。
2。减少数据大小
存储文本比存储数字数据需要更多的内存。一个简单的选择是用数字代码替换字符串,或将字符串存储为因子。这样会占用较少的内存,因此,计算机在分组/加入时要读取的信息也较少。
请注意,根据您的R版本,stringsAsFactors
可能默认为TRUE
或FALSE
。因此,最好将其明确设置。 (discussed here)
3。移至磁盘
除了某些大小之外,值得在磁盘上存储数据,并让R管理从磁盘读取数据和从磁盘读取数据。这是几个现有R软件包(包括bigmemory,ff and ffbase和大量parallelisation packages)背后的思想的一部分。
除了依赖R之外,您还可以将任务推送到数据库。尽管数据库的性能永远不会像内存数据那样快,但它们是为处理大量数据而设计的。 PostgreSQL是免费的开放源代码(getting started guide here),您可以将它与R在同一台计算机上运行-它不必是专用服务器。 R还有一个专门用于PostgreSQL(RPostgreSQL)的软件包。如果需要其他与数据库交互的选项,还可以使用其他几个软件包来设计数据库,包括dbplyr,DBI,RODBC。
虽然建立数据库有一些开销,但是dplyr和dbplyr会为您将R代码转换为SQL,因此您不必学习新的语言。缺点是您只能使用核心dplyr命令,因为从R到SQL的转换仅是为标准过程定义的。