如何使用带有 SparkR 的箭头包修复 readBin() 中的错误?

问题描述

我正在研究 SparkR,并希望使用 Databricks 中的 arrow 包加快处理速度。但是,在 SparkR::dapply 或 gapply 之后执行 collect(df) 时出现以下错误

Error in readBin(con,raw(),as.integer(dataLen),endian = "big") : Error in readBin(con,endian = "big") : 
  invalid 'n' argument

我正在使用 500 万的 SalesRecords 数据,例如。代码如下:

library(SparkR)

SparkR::sparkR.session(sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))

library(arrow)

arrow::arrow_available()

dfSchema <- structType(structField("Region","string"),structField("Country",structField("ItemType",structField("SalesChannel",structField("OrderPriority",structField("OrderDate",structField("OrderID","int"),structField("ShipDate",structField("UnitsSold",structField("UnitPrice",structField("UnitCost",structField("TotalRevenue",structField("TotalCost",structField("TotalProfit","int")
                      )
spark_df <- SparkR::read.df(path="/FileStore/tables/SalesRecords_5m.csv",source="csv",schema=dfSchema)

# Apply an R native function to each partition.
returnSchema <-  structType(structField("UnitsSold","int"))

df <- SparkR::dapply(spark_df,function(rdf) { data.frame(rdf$UnitsSold + 1) },returnSchema
                   )

collect(df)

当我通过将 spark.sql.execution.arrow.sparkr.enabled 设置为 false 来关闭箭头时,整个代码运行时没有任何错误。因此,箭头不起作用,我该如何解决错误

注意:我使用以下版本:Spark 3.1.1、箭头 4.0.1、R 版本 4.0.4

sessionInfo() 的输出是:

R version 4.0.4 (2021-02-15)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 18.04.5 LTS

Matrix products: default
BLAS:   /usr/lib/x86_64-linux-gnu/blas/libblas.so.3.7.1
LAPACK: /usr/lib/x86_64-linux-gnu/lapack/liblapack.so.3.7.1

locale:
 [1] LC_CTYPE=C.UTF-8       LC_NUMERIC=C           LC_TIME=C.UTF-8       
 [4] LC_COLLATE=C.UTF-8     LC_MONETARY=C.UTF-8    LC_MESSAGES=C.UTF-8   
 [7] LC_PAPER=C.UTF-8       LC_NAME=C              LC_ADDRESS=C          
[10] LC_TELEPHONE=C         LC_MEASUREMENT=C.UTF-8 LC_IDENTIFICATION=C   

attached base packages:
[1] stats     graphics  Grdevices utils     datasets  methods   base     

other attached packages:
[1] SparkR_3.1.1

loaded via a namespace (and not attached):
 [1] Rcpp_1.0.5         magrittr_2.0.1     tidyselect_1.1.0   bit_4.0.4         
 [5] xtable_1.8-4       R6_2.5.0           rlang_0.4.9        fastmap_1.0.1     
 [9] hwriter_1.3.2      tools_4.0.4        arrow_4.0.1        htmltools_0.5.0   
[13] bit64_4.0.5        digest_0.6.27      assertthat_0.2.1   Rserve_1.8-7      
[17] shiny_1.5.0        purrr_0.3.4        later_1.1.0.1      hwriterPlus_1.0-3 
[21] vctrs_0.3.5        promises_1.1.1     glue_1.4.2         mime_0.9          
[25] compiler_4.0.4     TeachingDemos_2.10 httpuv_1.5.4 

解决方法

我可以为您提供部分解决方案,但我不确定确切原因。

这里发生了一些变量类型不匹配的情况 - 您正在尝试创建一个“int”类型的字段,但那里的代码实际上创建了一个“double”类型的字段。

如果您在要添加的值中添加一个“L”,这有帮助吗?

df <- SparkR::dapply(spark_df,function(rdf) { data.frame(rdf$UnitsSold + 1L) },returnSchema
                   )

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...