问题描述
我想用本机spark函数替换我们的UDF AS查找实现。
AS查找表大约有500 000条记录,如下所示:
+-----------+------------------------+------------------------------+
| network|autonomous_system_number|autonomous_system_organization|
+-----------+------------------------+------------------------------+
| 1.0.0.0/24| 13335| CLOUDFLARENET|
| 1.0.4.0/22| 38803| Wirefreebroadband...|
|1.0.16.0/24| 2519| ARTERIA Networks ...|
|1.0.64.0/18| 18144| Energia Communica...|
+-----------+------------------------+------------------------------+
传入数据包含有关源IP地址,目标IP地址的信息:
+-----------------+----------------------+
|sourceIPv4Address|destinationIPv4Address|
+-----------------+----------------------+
| 5.34.180.207| 213.226.224.51|
| 13.230.21.159| 81.95.96.2|
| 93.188.0.21| 34.223.46.15|
| 165.22.170.210| 89.187.156.222|
+-----------------+----------------------+
由于AS查找表仅包含有关IP子网的信息,我可能需要使用Longest Prefix Match algorithm来加入这两个DF,但是我不知道如何在不使用另一个UDF的情况下实现它。有人可以指出我正确的方向吗?
这时,我正在使用Maxmind AS查找库:
val asnLookup = udf(Udf.getASN)
val dfAS = df
.withColumn("sourceAS",asnLookup(col(IpfixFields.sourceIPv4Address)))
.withColumn("destinationAS",asnLookup(col(IpfixFields.destinationIPv4Address)))
def getASN: String => String = (ipAddress: String) => {
val ia = InetAddress.getByName(ipAddress)
val result = GeoIPWrapper.reader.tryAsn(ia)
if (!result.isPresent) {
"na"
} else {
result.get().getAutonomousSystemNumber.toString
}
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)