SparkSQL与ScalaPB:从DataFrame转换为原型数据集时跳过原型字段时出错

问题描述

我有以下原始消息,需要使用ScalaPB通过Spark编写:

@Entity
@Table(name = "EMPLOYEES")
public class Employee {

    @Id
    @Column(name = "EMPLOYEE_ID")
    private Integer employeeId;

    private String firstName;

    private String lastName;

    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(name = "MANAGER_ID")
    private Employee manager;

    //getters & setters

}

message EnforcementData { required int32 id = 1; required int32 source = 2; required int32 flagsEnforceOption = 4; required int32 categoryEnforceOption = 5; optional TypeA a= 100; optional TypeB b= 101; } TypeA在接收方是TypeB的子类,它们使用protobuf-net反序列化它们。

现在,我的Spark数据框可以具有列a或列b。假设df是我的数据帧,我称以下代码

  • EnforcementData用于TypeA消息
  • df.withColumn(b,null).as[EnforcementData].map(_.toByteArray)用于TypeB消息

但是使用protobuf-net反序列化消息的接收者会抛出StackOverflow异常。我还尝试传递一个虚拟的case类而不是null,但它似乎仍然不起作用。

请让我知道如何处理?

解决方法

我能够通过重建案例类并显式跳过可选的子类字段来解决此问题。即

 //for TypeA messages,df.withColumn(b,null)
   .as[EnforcementData]
   .map{case EnforcementData(id,source,flag,cat,a,_) => EnforcementData(id,a = a) 
   } 

 //for TypeB messages,df.withColumn(s,_,b) => EnforcementData(id,b = b) 
    }