如何将 ByteBuffer 转换为 BLOB 作为 spark sql 流的一部分? 斯卡拉

问题描述

所以我将数据作为 ByteBuffer 传入,我需要将其转换为 blob。这是我正在做的一个基本示例:

worker 函数接受一个字符串并执行将其转换为 ByteBuffer 所需的操作:

def compute(key: String): ByteBuffer = {
  var key_bytes = getBytes(key)
  var byte_hash = LongTupleHashFunction.xx128().hashBytes(key_bytes) //returns an array of longs
  var buffer = ByteBuffer.allocate(16).putLong(byte_hash(0)).putLong(byte_hash(1))
  buffer.order(ByteOrder.BIG_ENDIAN)
  buffer.flip()
  return buffer
    }

接下来,这两个都在我的批处理函数中,我有一个 UDF 来针对列的数据运行该函数:

  def xx_u = udf((s: String) => compute(s))

最后我在这里有了 spark sql 数据框的东西,简化了这个问题:

  val batchdff_test = batchDF.withColumn("key",(xx_u(col("key_1"))))

当然,如果我按原样运行它,我会得到:Schema for type java.nio.ByteBuffer is not supported

我一直在到处阅读,试图弄清楚如何将 ByteBuffer 变成 BLOB。我知道如何在 cassandra 的准备好的语句中执行此操作,但现在我需要在 spark sql 中执行此操作,而且似乎在这里的工作方式完全不同。我需要第二个 udf 吗?

我在 compute 中尝试了 GetBytes() 方法,但根本不起作用。我尝试遵循二进制类型的一些内容 (https://sparkbyexamples.com/spark/spark-sql-dataframe-data-types/),希望它能在我的 UDF 中工作,但没有骰子。

我尝试使用此示例,但对于我的 ByteBuffer:How do I convert array<FloatType> to BinaryType in spark dataframes using Scala

但是它将“to”和“map”标记为红色,并且通常感觉就像我在咆哮错误的树。我对 spark sql 和所有内容都很陌生,所以我试图弄清楚我应该在哪里进行转换。我认为它在 UDF 中,但我不确定为什么 GetBytes 方法在其中不起作用,如果那是 cassandra UDF?我不知道,但似乎 ByteBuffers 不是人们使用的常见数据类型,因此在 google 上没有太多帮助。

由于我的哈希函数输出一个 long 数组,以某种方式将它们转换为 blob 格式会更容易吗?我不喜欢 ByteBuffer,所以如果那样更容易,我完全赞成。我只是不能使用字符串作为中介,因为它们太慢了。 (我也没有在 google 上看到太多关于 long 数组的信息,所以这就是我使用 ByteBuffer 的原因。)

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...