从 Java 中的 Spark 数据帧中提取二进制数据

问题描述

我有一个具有以下架构的数据框

root
 |-- blob: binary (nullable = true)

数据如下图

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|blob                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[1F 8B 08 00 00 00 00 00 00 00 7B C0 C8 F0 EA 1D E3 FC C6 E6 7B 8C B3 DA F7 31 4E EB 3B 94 9C 9F AB 97 98 9B 58 95 9F A7 97 59 92 9A 1B 9F 9B 58 5C 92 5A A4 17 94 9A 93 58 92 99 9F 57 9C 91 59 10 1F 90 58 94 9A 57 12 1F 92 1F EF 9C 91 99 93 12 1F 9E 59 92 11 EF 92 9A 53 92 E8 60 A8 67 D0 99 92 9F 9B 98 99 17 9F 99 D2 0E 36 21 33 A5 A7 2C B5 A8 18 A8 39 BE 24 33 37 B5 AF 2F 37 B1 28 3B B5 A4 20 27 31 39 15 28 D9 5B 5C 9A 94 9B 59 0C 96 CF 4C E9 9B 9C 0C 36 B2 08 C9 BE E2 77 9B 1A BB EE AD EB 56 52 DF D5 D3 E5 64 60 69 66 EC E5 11 10 16 D4 AA C8 D4 9B DD C0 FF B4 75 4E C7 94 BE C3 4C 8B FA 94 BA B3 FB D5 98 98 8B 9F DE D1 9A 70 01 00 F6 9B E3 17 DA 00 00 00]|

我想在数据框上使用 map 函数来读取此列值并执行一些操作。

d.map(relationshipMapFunction,编码器)

我试图在 relationshipMapFunction提取上述 blob。

public class RelationshipMapFunction implements MapFunction<Row,String> {
    private static final long serialVersionUID = 6766320395808127072L;
    private static Logger LOG = Logger.getLogger(JobRunner.class);

    @Override
    public String call(Row row) throws Exception {
        // Code to read binary data and perform some actions

    }
}

如何从调用方法中的 row 变量中提取字节数组?

解决方法

您有多种方法可以做到这一点,但哪一种最好取决于您的要求。让我们看看其中的一些。

使用数据框映射函数

要按照您显示的代码,然后直接从 Row 类型的对象中提取字节数组,您可以使用 Row 类的 getAs(int) 方法:

public class RelationshipMapFunction implements MapFunction<Row,String> {
    @Override
    public String call(Row row) throws Exception {
        final byte[] blob = row.<byte[]>getAs(0); // 0 is the index of the blob column in the dataframe
        return transformBlob(blob);
    }
}

然后你可以将它与你的数据框一起使用:

Dataset<String> mappedDs = df.map(new RelationshipMapFunction(),Encoders.STRING());

具有类型化数据集的映射函数

您可以使用编码器将 Dataframe 转换为类型化数据集,例如,如果您的 Dataframe 仅包含类型为 byte[] 的列,则可以直接使用 Encoders.BINARY()。在这种情况下,map 函数将接收类型为 byte[] 而不是 Row 的对象。

public class RelationshipMapFunction implements MapFunction<byte[],String> {
    @Override
    public String call(byte[] blob) {
        return transformBlob(blob);
    }
}

然后您可以在应用编码器后将其与 Dataframe 一起使用:

Dataset<String> mappedDs = df
    .as(Encoders.BINARY())
    .map(new RelationshipMapFunction(),Encoders.STRING());

在这两种情况下,如果 map 函数很简单,您可以将其替换为 lambda。

其他选项包括使用 UDF。

请注意,如果您需要应用的转换就像从字节数组中解码一个 UTF8 字符串一样简单,您可以直接使用 Spark SQL 函数(从二进制到字符串的转换就可以完成这项工作)。