问题描述
我从 CEF Format
中的防火墙获取日志作为字符串,如下所示:
ABC|XYZ|F123|1.0|DSE|DSE|4|externalId=e705265d0d9e4d4fcb218b cn2=329160 cn1=3053998 dhost=SRV2019 duser=admin msg=Process accessed NTDS fname=ntdsutil.exe filePath=\\Device\\HarddiskVolume2\\Windows\\System32 cs5="C:\\Windows\\system32\\ntdsutil.exe" "ac i ntds" ifm "create full ntdstest3" q q filehash=80c8b68240a95 dntdom=adminDomain cn3=13311 rt=1610948650000 tactic=Credential Access technique=Credential Dumping objective=Gain Access patterndisposition=Detection. outcome=0
如何从这种字符串创建一个 DataFrame,其中我得到由 = 分隔的键值对?
我的目标是使用键动态地从该字符串推断模式,即从 = 运算符的左侧提取键并使用它们创建模式。
我目前所做的非常蹩脚(恕我直言)并且方法不是很动态。(因为键值对的数量可以根据不同类型的日志而变化)
val a: String = "ABC|XYZ|F123|1.0|DSE|DCE|4|externalId=e705265d0d9e4d4fcb218b cn2=329160 cn1=3053998 dhost=SRV2019 duser=admin msg=Process accessed NTDS fname=ntdsutil.exe filePath=\\Device\\HarddiskVolume2\\Windows\\System32 cs5="C:\\Windows\\system32\\ntdsutil.exe" "ac i ntds" ifm "create full ntdstest3" q q filehash=80c8b68240a95 dntdom=adminDomain cn3=13311 rt=1610948650000 tactic=Credential Access technique=Credential Dumping objective=Gain Access patterndisposition=Detection. outcome=0"
val ttype: String = "DCE"
type parseReturn = (String,String,List[String],Int)
def cefParser(a: String,ttype: String): parseReturn = {
val firstPart = a.split("\\|")
var pD = new ListBuffer[String]()
var listSize: Int = 0
if (firstPart.size == 8 && firstPart(4) == ttype) {
pD += firstPart(0)
pD += firstPart(1)
pD += firstPart(2)
pD += firstPart(3)
pD += firstPart(4)
pD += firstPart(5)
pD += firstPart(6)
val secondPart = parseSecondPart(firstPart(7),ttype)
pD ++= secondPart
listSize = pD.toList.length
(firstPart(2),ttype,pD.toList,listSize)
} else {
val temp: List[String] = List(a)
(firstPart(2),"IRRELEVANT",temp,temp.length)
}
}
parseSecondPart 方法是:
def parseSecondPart(m:String,ttype:String): ListBuffer[String] = ttype match {
case auditactivity.ttype=>parseAuditEvent(m)
def parseAuditEvent(msg: String): ListBuffer[String] = {
val updated_msg = msg.replace("cat=","Metadata_event_type=")
.replace("destinationtranslatedaddress=","event_user_ip=")
.replace("duser=","event_user_id=")
.replace("deviceprocessname=","event_service_name=")
.replace("cn3=","Metadata_offset=")
.replace("outcome=","event_success=")
.replace("devicecustomdate1=","event_utc_timestamp=")
.replace("rt=","Metadata_event_creation_time=")
parseEvent(updated_msg)
}
def parseEvent(msg: String): ListBuffer[String] = {
val newMsg = msg.replace("\\=","$_equal_$")
val pD = new ListBuffer[String]()
val splitData = newMsg.split("=")
val mSize = splitData.size
for (i <- 1 until mSize) {
if(i < mSize-1) {
val a = splitData(i).split(" ")
val b = a.size-1
val c = a.slice(0,b).mkString(" ")
pD += c.replace("$_equal_$","=")
} else if(i == mSize-1) {
val a = splitData(i).replace("$_equal_$","=")
pD += a
} else {
logExceptions(newMsg)
}
}
pD
}
返回在第 3 个位置包含一个 ListBuffer[String]
,我使用它创建了一个 DataFrame,如下所示:
val df = ss.sqlContext
.createDataFrame(tempRDD.filter(x => x._1 != "IRRELEVANT")
.map(x => Row.fromSeq(x._3)),schema)
stackoverflow 的人们,我真的需要你的帮助来改进我的代码,无论是性能还是方法。 任何形式的帮助和/或建议将不胜感激。 提前致谢。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)