logstash:要与同一过滤器中的“单个grok模式”和“多个grok模式”匹配

问题描述

我的日志文件行如下所示。

[2020-07-10T10:00:04.979+00:00] [app_server1] [NOTIFICATION] [report-thread] [tid: 1346887] [userId: user10.id2] start-getchunk: Report=/Report Folder Path/Report name,TemplateName=Template1004,OutFormat=excel,Locale=en_US
[2020-07-10T10:00:25.085+00:00] [app_server2] [NOTIFICATION] [report-thread] [tid: 1346887] [userId: user1.id1] end-getchunk: Report=/Report Folder Path/Report name,TemplateName=Template2007,OutFormat=html,Locale=en_US
[2020-07-10T10:00:25.080+00:00] [app_server2] [NOTIFICATION] [report-thread] [tid: 1346887] [userId: user2.id1][
    Start report processing details
    -----------------------------------------------------
    Report path: /Report Folder Path/Report name
    Datamodel name: /Report Folder Path/Datamodel name
    User name: user2
    Output format: 1
    chunk size limit: 524288000
    -----------------------------------------------------
    End report processing details
]
[Log lines with some other patterns]

我希望在自己的logstash配置文件中具有以下几种匹配模式以匹配不同的行。

(a) match => {  "message" => "\[%{TIMESTAMP_ISO8601:process_timstamp}\] \[%{WORD:app_server}] \[%{WORD:log_level}] \[] \[%{DATA:thread_type}\] \[tid: %{NUMBER:tid}\] \[userId: %{DATA:userId}\] (?<event_type>\sstart-getchunk): Report=(?<report_name>[^,]*),TemplateName=(?<template_name>[^,OutFormat=(?<output_format>[^,Locale=(?<locale>[^,]*)" }
(b) match => {  "message" => "\[%{TIMESTAMP_ISO8601:process_timstamp}\] \[%{WORD:app_server}] \[%{WORD:log_level}] \[] \[%{DATA:thread_type}\] \[tid: %{NUMBER:tid}\] \[userId: %{DATA:userId}\] (?<event_type>\send-getchunk): Report=(?<report_name>[^,]*)" }
(c) to match a multiline having this match
    break_on_match => false
    match => {
        "message => [
            "Report path: (?<report_path>[^,\r\n]*)","Datamodel name: (?<datamodel_name>[^,"User name: (?<user_name>[^,(?<event_type>Start report processing details)
        ]
    }
(d) other n numbers of matches like the one mentioned in points (a) & (b) above

现在,让我们假设我的配置中只有(a),(b)和(c)。

我的配置与下面类似(更改了“ match =>”行的顺序)。

filter{
    grok{
        break_on_match => false
        ## single grok pattern match
        match => {  "message" => "\[%{TIMESTAMP_ISO8601:process_timstamp}\] \[%{WORD:app_server}] \[%{WORD:log_level}] \[] \[%{DATA:thread_type}\] \[tid: %{NUMBER:tid}\] \[userId: %{DATA:userId}\] (?<event_type>\sstart-getchunk): Report=(?<report_name>[^,]*)" }
        match => {  "message" => "\[%{TIMESTAMP_ISO8601:process_timstamp}\] \[%{WORD:app_server}] \[%{WORD:log_level}] \[] \[%{DATA:thread_type}\] \[tid: %{NUMBER:tid}\] \[userId: %{DATA:userId}\] (?<event_type>\send-getchunk): Report=(?<report_name>[^,]*)" }
        ## multple grok pattern match
        match => {
            "message => [
                "Report path: (?<report_path>[^,(?<event_type>Start report processing details)
            ]
        }
    }
}

现在是问题说明:- 该配置仅与配置文件中提到的三个“ match =>”中的“ THE LAST”模式匹配。 所以,如果在配置文件

[A] the sequence of "match =>" lines are (a) -> (b) -> (c) (as shown in above config): it matches patterns for only (c).
[B] the sequence of "match =>" lines are (c) -> (b) -> (a): it matches patterns for only (a).
[C] the sequence of "match =>" lines are (c) -> (a) -> (b): it matches patterns for only (b).

但是,我希望所有(a),(b),(c)在输入中都匹配。

此外,我想保持(a),(b),(c)的匹配模式。 实际上,对于(c),我有一个类似(a)和(b)的怪异模式。当时,配置正在尝试匹配所有(a),(b)和(c)。但是由于(c)出现“ _groktimeout”错误,因此将模式更改为Ref在(c)中提到的模式。

为了解决这个问题,我尝试过

  • 同一过滤器下的多个grok {}块{}块AND
  • 同一配置文件中的
  • 多个过滤器{}块 通过将“(a),(b)”和“(c)”分成不同的块来解决该问题。但是在以上两种情况下,它不会产生任何输出

我尝试了另一个选项,如下所示。这是可行的,但这不是提高性能的选择。 因此,在不使用“ if”条件下使用grok的情况下寻求更好,更高效的选择

filter{
    grok{
        ## single grok pattern match
        match => {  "message" => "\[%{TIMESTAMP_ISO8601:process_timstamp}\] \[%{WORD:app_server}] \[%{WORD:log_level}] \[] \[%{DATA:thread_type}\] \[tid: %{NUMBER:tid}\] \[userId: %{DATA:userId}\] (?<event_type>\sstart-getchunk): Report=(?<report_name>[^,]*)" }
        match => {  "message" => "\[%{TIMESTAMP_ISO8601:process_timstamp}\] \[%{WORD:app_server}] \[%{WORD:log_level}] %{GREEDYDATA}(?<event_type>Start report processing details)"}
        if " Start report processing details" in [event_type]{
            grok {
                break_on_match => false
                ## multple grok pattern match
                match => {
                    "message => [
                        "Report path: (?<report_path>[^,(?<event_type>Start report processing details)
                    ]
                }
            }
        }
    }
}

我的预期输出如下所示,为此,我要删除配置文件中的几个字段。

{"tags":["reportlog"],"userId":"user10.id2","process_timestamp":"2020-07-10T10:00:25.085+00:00","thread_type":"report-thread","template_name":"Template2007","log_level":"NOTIFICATION","event_type":"end-getchunk","output_format":"html","tid":"199163","app_server":"app_server2","report_name":"/Report Folder Path/Report name"}
{"tags":["reportlog"],"userId":"user1.id1","process_timestamp":"2020-07-10T10:00:04.979+00:00","template_name":"Template1004","event_type":"start-getchunk","output_format":"excel","tid":"800760","app_server":"app_server1","report_name":"/Report Folder Path/Report name"}
{"tags":["multiline","reportlog"],"userId":["user2.id1"],"report_path":"/Report Folder Path/Report name","process_timestamp":["2020-07-10T10:00:25.080+00:00"],"datamodel_name":"/Report Folder Path/Datamodel name","event_type":"Start report processing details","tid":["812409"],"app_server":"app_server2"}

。 [1]:https://discuss.elastic.co/t/getting-groktimeout-error-for-a-particular-filter-sometimes-need-help-optimizing-the-filter/246342

解决方法

前两条消息之间的唯一区别是,一条消息包含字符串start-getchunck,另一条消息包含字符串end-getchunck,并且对于两种消息类型,您都将该字符串保存到相同的字段中,因此您的grok模式 a b 基本相同。

以下模式将匹配这两种消息。

\[%{TIMESTAMP_ISO8601:process_timestamp}\] \[%{WORD:app_server}\] \[%{WORD:log_level}] \[%{DATA:thread_type}\] \[tid: %{NUMBER:tid}\] \[userId: %{DATA:userId}\] %{DATA:event_type}: Report=(?<report_name>[^,]*),TemplateName=(?<template_name>[^,OutFormat=(?<output_format>[^,Locale=(?<locale>[^,]*)

使用(?<event_type>\sstart-getchunk):代替使用(?<event_type>\send-getchunk):%{DATA:event_type}:

对于您的多行日志,由于您没有共享input配置,因此我使用以下multine代码配置进行复制。

codec => multiline {
    pattern => "^\["
    negate => true
    what => "previous"
}

您用于此日志的grok配置错误。

具有:

grok {
    match => { 
        "message" => [
            "Report path: (?<report_path>[^,\r\n]*)","Datamodel name: (?<datamodel_name>[^,\r\n]*)"
        ]
    }
} 

具有此配置是相同的:

grok {
    match => { "message" => "Report path: (?<report_path>[^,\r\n]*)" }
    match => { "message" => "Datamodel name: (?<datamodel_name>[^,\r\n]*)" }
} 

您需要的是一种与消息中所有字段匹配的模式,下面的模式将与多行日志匹配。

\[%{TIMESTAMP_ISO8601:process_timestamp}\] \[%{WORD:app_server}\] \[%{WORD:log_level}] \[%{DATA:thread_type}\] \[tid: %{NUMBER:tid}\] \[userId: %{DATA:userId}\]%{DATA}(?<event_type>Start report processing details)%{DATA}Report path: (?<report_path>[^,\r\n]*)%{DATA}Datamodel name: (?<datamodel_name>[^,\r\n]*)%{DATA}User name: (?<user_name>[^,\r\n]*)"

此外,您需要将break_on_match设置为true,如果您的行已经有模式匹配,则无需针对其他模式进行测试,它只会为管道增加更多处理。

以下filter将与您的所有示例行匹配。

filter{
    grok {
        break_on_match => true
        match => {  "message" => "^\[%{TIMESTAMP_ISO8601:process_timestamp}\] \[%{WORD:app_server}\] \[%{WORD:log_level}] \[%{DATA:thread_type}\] \[tid: %{NUMBER:tid}\] \[userId: %{DATA:userId}\] %{DATA:event_type}: Report=(?<report_name>[^,]*)" }
        match => {  "message" => "^\[%{TIMESTAMP_ISO8601:process_timestamp}\] \[%{WORD:app_server}\] \[%{WORD:log_level}] \[%{DATA:thread_type}\] \[tid: %{NUMBER:tid}\] \[userId: %{DATA:userId}\]%{DATA}(?<event_type>Start report processing details)%{DATA}Report path: (?<report_path>[^,\r\n]*)" }
    }
}

模式开头的^是锚定消息的位置,如果由于某种原因您的消息不是以[开头,grok甚至不会尝试对其进行解析。

由于grok依赖于正则表达式,因此可能会出现性能问题,如果仍然继续遇到超时错误,最好更改方法并开始使用条件解析某些消息。

您可以尝试使用一个grok解析所有公共字段,将消息的不同部分保存在另一个字段中,并使用条件将其定向到正确的grok,这比有一个grok试图匹配所有内容。