问题描述
|
我有大量的(XXM-XXXM)字符串看起来像(一个小样本):
我不知道所有可能的错误字符串,也不知道其排列。我想将所有类似的错误归为一组,并生成一些统计信息以显示每个错误字符串组的错误计数。
因此,从本质上讲,我想将最相似的字符串分组在一起,并且字符串可以属于多个组。
谢谢!
解决方法
免责声明:我从来没有解决过这样的问题。
我可以想到几种解决您的问题的方法:
您正在尝试将每一行聚类为一组聚类
检查数据挖掘算法
群集中每条线之间的差异较小,两个不同群集中的线之间的差异较大
您可以通过比较两条线的集合交点来快速收集相似的线:
set(line1.split) & set(line2.split)
-结果集中的元素计数指示了这两条线的接近程度。
一点python代码可能看起来像这样:
import fileinput
CLUSTER_COUNT = 5
MAX_DISTANCE = 5
def main():
clusters = [Cluster() for i in range(CLUSTER_COUNT)]
MAXDISTANCE = 3
for line in fileinput.input():
words = set(line.split())
cluster = sorted(clusters,key=lambda c: c.distanceTo(words))[0]
cluster.addLine(words,line)
# print out results (FIXME: write clusters to separate files)
for cluster in clusters:
print \"CLUSTER:\",cluster.intersection
for line in cluster.lines:
print line
print \"-\" * 80
print
class Cluster(object):
def __init__(self):
self.intersection = set()
self.lines = []
def distanceTo(self,words):
if len(self.intersection) == 0:
return MAX_DISTANCE
return len(words) - len(self.intersection & words)
def addLine(self,words,line):
self.lines.append(line)
if len(self.intersection) == 0:
self.intersection = words
else:
self.intersection = self.intersection & words
if __name__ == \'__main__\':
main()
如果在主数据上运行它,那么最终应该有几个集群。注意:更改代码以将群集写入单独的文件。我认为您将需要递归地再次通过代码运行集群,直到找到您感兴趣的子集为止。
, 我在这里要做的是编写一个参数识别器。这会将以下子字符串识别为参数(当被空白包围时):
十进制数
a.b.c.d,其中a,b,c,d是十进制数
filename.php
毫无疑问,还会有更多,但清单不应太大。然后,用占位符替换每个参数:%d,%url,%phpfile。现在,您可以对字符串进行排序。
通过浏览很少出现的输出字符串,可以找到无法识别的参数类型。例如,如果一天中某个时间的参数类型为h:m:s,则包含此未取代参数的字符串将是唯一的,或者几乎是唯一的,并且您可以通过简单地将100或100的列表作为目视来查找此新参数类型。因此,“几乎是唯一的”字符串。然后将h:m:s添加到列表中,将所有此类事件替换为%time,然后再次运行。
, 在阅读了CRM114可控正则表达式Mutilator的页面上的更多内容之后:
垃圾邮件是CRM114的主要目标,但它不是专用的仅电子邮件工具。 CRM114已用于对网页,简历,博客条目,日志文件和许多其他事情进行排序。准确度可高达99.9%。换句话说,CRM114学习得很快。
简而言之,这可能正是您所需要的。您可以依靠它进行优化。
, 有趣的问题……只是一个可以很快编码的自发想法:
通过计算数据集中样本中每个单词的出现频率,确定每个单词是数据(11.22.33.55、3829,somepage.php等)还是描述(负载,连接,页面等) 。描述字可能比特定数据字更频繁地出现。您必须调整阈值才能找到一个将单词分为两类的阈值。如果这不适用于所有情况,例如,因为某个IP地址经常出现,则应使用手动黑名单解决此问题。
通过仅从其描述字集中计算签名来处理每一行(串联的描述字的字符串哈希应起作用)。计算每个签名的出现。
性能(非常粗略的估算):在1.阶段就足以对数据进行采样,在2.阶段中,您必须顺序处理每行,因此您位于O(n)内,其中n是数据中的行数设置(在1.阶段将O(1)插入哈希映射,在2.阶段将O(1)测试使用哈希映射)。内存消耗取决于不同单词(1.阶段)和不同句子(2.阶段)的数量。如果单词变化很大,则用于计算第一阶段出现次数的字典可能会出现问题。
在Python中,作为数据结构,我将尝试使用特殊(性能)dict数据类型Counter。
,好吧,我认为比较的想法超出了这个问题的范围。即使比较散列/映射值也要花费很多时间。因此基于错误消息(字符串)的映射进行存储,然后进行验证可能是可行的方法。
为此,您需要找到一些映射单个字符串的想法(它只是散列技术的反面,它更加强调避免碰撞),但我想这里几乎相同的字符串之间的冲突是可以的。因此,映射函数应该是字母的位置属性(字符串的长度)的函数。
完成映射后,我们将创建一个存储桶,其值范围组为1:(0-10)(或根据值类似)组2:(10-20).....等等,因为我们表示相似的字符串应该具有相似的值,相似的字符串放在相同的存储桶中
因此,每当遇到新消息时,我们都会用等价的数字对其进行映射,然后放入适当的存储桶中。
, 如果没有任何输入假设,很难回答。
我的方法是:
查看您的输入并制定有关分组的规则,并实施这些规则(可能使用正则表达式)。
然后为不匹配的用户创建一个“杂项”组。
收集数据,并查看该组中是否有可以为其编写正则表达式的模式。
重复直到您有一个令人满意的小型“杂项”组。
只有您可以(无假设)告诉您是否要将一组特定的“相似”字符串组合在一起。错误消息类型必须有限,因此经过几次迭代后,这应该会产生足够好的结果。
假设相似性是基于字符串中的数字和IP地址,则另一种方法可能是搜索它们,然后使用这些数字对项目进行分类。同样,这涉及更具体地制定您的规则,但可能使您避免重复制定规则(如果假设实际上是有效的)。
可以考虑测量字符串之间的levenshtein距离(或使用类似的算法),但是要进行分类,则需要太多步骤。
, 嗯,为什么要重新发明轮子-看一下splunk的免费版本,它是为这类任务而设计的。
, 如果由我决定,我将使用python和正则表达式。免责声明:我从未在您所谈论的规模上使用过python。
这是示例输出中的前两项,以正则表达式表示法重写:
r \“由于错误\\ d + \,客户端[。\\ d] +无法加载页面\\ w + .php页面”
r \“与[。\\ d] +端口\\ d +的连接超时,客户端[。\\ d] +源端口\\ d + \超时”
还不错吧?您可以将它们用作顶级存储桶的“键”。
然后,用于将给定字符串匹配到存储桶的代码变得非常简单:
for bucket in buckets:
if re.match(bucket.regex,s):
bucket.matchingStrings.append(s)
break
# else if no buckets match,generate a new bucket/regex for s
生成这些正则表达式将是棘手的部分。您需要有一些规则来选择需要泛化的字符串部分。在您的情况下,一般部分似乎是数字,IP地址和文件名。您必须针对每种情况提出正确的表达式,但是这里有一个简单的替代方法,即使用代表数字的regex模式替换字符串中的数字。
pattern = r\"\\d+\"
re.sub(pattern,pattern,\"Failed to count from 0 to 600\")
# returns r\"Failed to count from \\d+ to \\d+\"
我敢打赌,用\\ d +替代您会走的很远,仅此而已。
, 它本身不是算法解决方案,但可能会派上用场,并为您提供更多的计算自由度:日志文件分析是MapReduce实现Hadoop及其朋友的主要用例之一。因此,如果遇到可伸缩性问题,您可能会开始考虑在可管理子集上解决问题(映射步骤),然后将子集的输出合并为一个(归约步骤)。例如,您可能会在子集中找到存储桶,然后比较所有存储桶集合并合并相似的存储桶。
, 从最近的编辑来看,似乎在进行交点时正在比较字符串。不要比较字符串;只是比较哈希。 64位哈希冲突的机会基本上为零。通过节省许多缓存未命中,这将加快字符串比较时间。
,我做了一些非常相似的事情,需要从数据库中找到相似的字符串。带有一些附加功能的Trie将对此有很大帮助。 try的常见实现仅支持插入和搜索/搜索前缀。但是,也可以计算到Trie中所有数据的Levensthein距离,然后使用它来获取最接近的字符串。
在trie中实现的Levensthein算法还可用于找出两个字符串之间的变化并生成模板。像这样:
similars = get_similar_strings( input_string,max_distance );
for each similar in similars do
if is string then
//construct template from string
else
// increase count
done
, 我会做这样的事情。
map<string,int> mStringToInt;
struct structOneRecord
{
vector<int> vWords;
vector<vector<int>> vLines; // All the Lines under this record
vector<int> vIndexOfMutableWords;
bool operator < (const structOneRecord &n) const
{
if(vWords.size() != n.vWords.size())
return vWords.size() < n.vWords.size();
else
{
// Count diferences
vector<int> vCurrentIndexs;
for(int i=0; i<vWords.size(); i++)
{
if(vWords[i] != n.vWords[i])
vCurrentIndexs.push_back(i);
}
if(vCurrentIndexs.size() == 0)
return false;
int iHalf = vWords.size() / 2; // The diferences can\'t be bigger than hald the phrase
if(vCurrentIndexs.size() < iHalf)
{
if(vIndexOfMutableWords.size() == 0)
return false;
else
{
if(vIndexOfMutableWords.size() == vCurrentIndexs.size())
{
for(int i=0; i<vIndexOfMutableWords.size(); i++)
{
if(vIndexOfMutableWords[i] != vCurrentIndexs[i])
vWords[vCurrentIndexs[0]] < n.vWords[vCurrentIndexs[0]]; // Not equal
}
}
}
}
return vWords[vCurrentIndexs[0]] < n.vWords[vCurrentIndexs[0]];
}
}
};
vector<string> SplitString(const string &strInput,char cDelimiter,bool bSkipEmpty)
{
vector<string> vRetValue;
stringstream ss(strInput);
string strItem;
while(std::getline(ss,strItem,cDelimiter))
{
// Skip Empty
if(bSkipEmpty && strItem.size()==0)
continue;
vRetValue.push_back(strItem);
}
return vRetValue;
}
void main()
{
// To Test
vector<string> vInput;
vInput.push_back(\"Connection to 11.22.33.44 port 3940 timed out client 1.2.3.4 source port 3940\");
vInput.push_back(\"Error loading page somepage.php by client 2.3.4.5\");
vInput.push_back(\"Load of page someotherpage.php by client 2.3.4.8 failed due to error 4930\");
vInput.push_back(\"Connection to 11.22.33.55 port 3829 timed out client 1.2.3.6 source port 3944\");
vInput.push_back(\"Load of page alt.php by client 2.3.4.92 failed due to error 3829\");
vInput.push_back(\"Load of page alt2.php by client 2.3.4.95 failed due to error 3829\");
set<structOneRecord> sRecords;
for(int i=0; i<vInput.size(); i++)
{
vector<string> vWords = CMkDevStringUtilities::SplitString(vInput[i],\' \',true);
structOneRecord stRecord;
stRecord.vWords.resize(vWords.size());
for(int j=0; j<vWords.size(); j++)
{
map<string,int>::iterator mIte = mStringToInt.find(vWords[j]);
if(mIte == mStringToInt.end())
mIte = mStringToInt.insert(mStringToInt.begin(),make_pair(vWords[j],mStringToInt.size()));
stRecord.vWords[j] = mIte->second;
}
set<structOneRecord>::iterator sIte = sRecords.find(stRecord);
if(sIte != sRecords.end())
{
sIte->vLines.push_back(stRecord.vWords);
if(sIte->vIndexOfMutableWords.size() == 0)
{
// Count diferences
vector<int> vCurrentIndexs;
for(int i=0; i<stRecord.vWords.size(); i++)
{
if(sIte->vWords[i] != stRecord.vWords[i])
vCurrentIndexs.push_back(i);
}
sIte->vIndexOfMutableWords = vCurrentIndexs;
}
}
else
{
stRecord.vLines.push_back(stRecord.vWords);
sRecords.insert(stRecord);
}
}
}
这将为您提供一个输出,通过为每个记录重构一个字符串并用vWord指向的子字符串(并用\'%% \'替换Mutable单词中的索引处的字符串),可以轻松地将其打印为输出可以给您按记录“类型”排序的行。
*编辑
忘记提及在每个structOneRecord下,vLines.size()会为您提供错误计数。
, 我认为将需要一些人工干预,而频率计数将是最好的方法。