Hive 使用自定义 udf 将 array<string, string> 转换为 array<struct<key:string, value:string>>

问题描述

我需要在 hive 中创建自定义 UDF 以将 array<map<string,string>> 转换为 array<struct<key:string,value:string>>

我正在尝试以下课程:

import java.util.List;
import java.util.Map;

import com.google.common.collect.Lists;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.Metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;

public class ArrayOfMapToArrayOfStructUdf extends GenericUDF {

    private static final String UDF_NAME = "convertArrayMapToArrayStruct";

    @Override
    public String getUdfName() {
        return UDF_NAME;
    }

    @Override
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
        if (objectInspectors.length != 1) {
            throw new UDFArgumentLengthException(UDF_NAME + " takes 1 argument of type array<map<key,value>>");
        }
        if (!(validateArgumentType(objectInspectors))) {
            throw new IllegalArgumentException("Code should never reach this section!");
        }
        return createReturnObjectInspector();
    }

    private boolean validateArgumentType(ObjectInspector[] objectInspectors) throws UDFArgumentException {
        if (!(objectInspectors[0] instanceof ListObjectInspector)) {
            throw new UDFArgumentException("the argument must be of type: array<map<key,value>>");
        }
        ListObjectInspector listObjectInspector = (ListObjectInspector) objectInspectors[0];
        if (!(listObjectInspector.getListElementObjectInspector() instanceof MapObjectInspector)) {
            throw new UDFArgumentException("the array contents must be of type: map<key,value>");
        }
        return true;
    }

    private ObjectInspector createReturnObjectInspector() {
        List<String> structFieldNames = Lists.newArrayList("key","value");
        List<ObjectInspector> structFieldobjectInspectors =
                Lists.newArrayList(PrimitiveObjectInspectorFactory.javaStringObjectInspector,PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        StructObjectInspector structObjectInspector =
                ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames,structFieldobjectInspectors);

        return ObjectInspectorFactory.getStandardListObjectInspector(structObjectInspector);
    }

    @Override
    public Object evaluate(Deferredobject[] deferredobjects) throws HiveException {

        if (deferredobjects == null || deferredobjects.length < 1) {
            return null;
        }

        List<Map<String,String>> arrayOfMap = (List<Map<String,String>>) deferredobjects[0].get();

        if (arrayOfMap == null) {
            return null;
        }

        List<Object> arrayOfStruct = Lists.newArrayList();

        for (Map<String,String> map : arrayOfMap) {
            Object[] object = new Object[2];
            object[0] = new Text(map.get("key"));
            object[1] = new Text(map.get("value"));
            arrayOfStruct.add(object);
        }
        return arrayOfStruct;
    }

    @Override
    public String getdisplayString(String[] strings) {
        return UDF_NAME;
    }
}

我收到以下错误

Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.Metadata.HiveException: Error evaluating convertArrayMapToArrayStruct

我不知道如何在评估方法中构建要返回的对象。

我试图转换的列有如下数据:

[{"key": "key1","value": "value1"},{"key": "key2","value": "value2"},...,{"key": "keyN","value": "valueN"}]

谢谢!

解决方法

这有效:

    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {

        if (deferredObjects == null || deferredObjects.length < 1) {
            return null;
        }

        LazyArray lazyArray = (LazyArray) deferredObjects[0].get();

        if (lazyArray == null) {
            return null;
        }

        List<Object> lazyList = lazyArray.getList();

        List<Object> finalList = Lists.newArrayList();
        for (Object o : lazyList) {
            LazyMap lazyMap = (LazyMap) o;

            String key = "";
            String value = "";
            for (Map.Entry<?,?> entry : lazyMap.getMap().entrySet()) {
                if (entry.getKey().toString().equals("key")) {
                    key = entry.getValue().toString();
                } else if (entry.getKey().toString().equals("value")) {
                    value = entry.getValue().toString();
                }
            }
            finalList.add(Lists.newArrayList(key,value));
        }

        return finalList;
    }