Logstash:elasticsearch输出和非结构化数据

问题描述

Filebeat.yml文件:

filebeat.inputs:
- type: log
  paths:
    - C:\Program Files\Filebeat\test_logs\*\*\*\*.txt
  exclude_lines: ['^Infobase.+']
output.logstash:
  hosts: ["localhost:5044"]
  worker: 1

Filebeat从这样的文件夹结构收集日志:

C:\Program Files\Filebeat\test_logs\*\*\*\*.txt

这里有很多文件夹,每个文件夹的末尾至少有几个日志。

日志文件示例(在多个日志文件中,时间可能相同,因为日志来自不同的用户):

"03.08.2020 10:56:38","Event LClick","Type Menu","t=0","beg"
"03.08.2020 10:56:38","Detail SomeDetail","t=109","end"
"03.08.2020 10:56:40","t=1981","beg"
"03.08.2020 10:56:40","t=2090","end"
"03.08.2020 10:56:41","Type ToolBar","t=3026","beg"
"03.08.2020 10:56:43","Detail User_Desktop","t=4477","end"
"03.08.2020 10:56:44","Event FormActivate","Name Form_Name:IsaA","t=5444"
"03.08.2020 10:56:51","t=12543","beg"
"03.08.2020 10:56:51","t=12605","end"
"03.08.2020 10:56:52","Form ","Type Label","Name Application.for.training","t=13853","beg"
"03.08.2020 10:57:54","Form Application.for.training","t=75442","end"
"03.08.2020 10:57:54","Name List.form","t=75785"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85769","beg"
"03.08.2020 10:58:04","end"
"03.08.2020 10:58:04","t=85847","t=85879","t=85925","end"
"03.08.2020 10:58:08","t=89373","beg"
"03.08.2020 10:58:08","Detail Data","t=89451","end"
"03.08.2020 10:58:15","t=96580","beg"
"03.08.2020 10:58:15","t=96643","end"

Logstash confing文件:

input {
    beats {
        port => '5044'
    }
}
 filter {
    grok {
        patterns_dir => ['./patterns']
        match => { 'message' => '%{TIME:timestamp}(","Event\s)(?<Event>([^"]+))(","Form\s)?(?<Form>([^"]+))?(","ParentType\s)?(?<parent_type>([^"]+))?(","ParentName\s)?(?<parent_name>([^"]+))?(","Type\s)?(?<type>([^"]+))?(","Name\s)?(?<Name_of_form>([^"]+))?(","Detail\s)?(?<Detail>([^"]+))?(","t=)?(?<t>([\d]+))?(",")?(?<Status>(end|beg))?' }
        add_tag => [ '%{Status}' ]
    }
    dissect {
        mapping => {
            '[log][file][path]' => 'C:\Program Files\Filebeat\test_logs\%{somethingtoo}\%{something}\%{User_Name}\%{filename}.txt'
        }
    }
    date {
        match => [ 'timestamp','dd.MM.yyyy HH:mm:ss' ]
    }
    elapsed {
        unique_id_field => 'Event'
        start_tag => 'beg'
        end_tag => 'end'
        new_event_on_match => false
    }

    if 'elapsed' in [tags] {
        aggregate {
            task_id => '%{Event}'
            code => 'map["duration"] = [(event.get("elapsed_time")*1000).to_i]'
            map_action => 'create'
        }
    }
    mutate {
        remove_field => ['timestamp','ecs','log','tags','message','@version','something','somethingtoo','filename','input','host','agent','t','parent_type','parent_name','type']
        rename => {'elapsed_time' => 'Event_duration'}
    }
}
output {
    elasticsearch {
        hosts => ['localhost:9200']
        index => 'test'
    }
}

在我的logstash.conf中,我正在使用聚合过滤器,并将worker 1(-w 1)设置为可以正常工作。

当我仅使用一个日志文件进行测试和配置时,我将-w 1设置为一切正常。但是,当我开始从每个目录收集所有日志时,问题就开始了。 数据未正确放入Elasticsearch(根据汇总结果,从奇数清楚可见)

我尝试在logstash输出(工人:1)的filebeat.yml中进行设置,但这仍然无济于事。

问题:

  1. 也许您知道如何解决此问题?因为对于一个日志文件或一个目录末尾的多个日志文件来说,一切正常都很好,而当添加更多目录时,一切突然崩溃。
  2. 如果我正确地理解了该理论,那么elasticsearch具有索引和类型。每个日志都有一个时间和一个用户名,该用户名的日志是,也许我应该按日志时间将数据放在索引中,并按用户名键入,以便不同用户在同一时间的日志不会重叠。我应该如何实施呢?我试图查找信息,仅发现有关document_type的信息,该信息已被弃用。

解决方法

您使用elapsedaggregate的字段不是唯一的,您可以在不同文件中为Event字段使用相同的值,这可以使{{1 }}过滤器使用一个文件的开始事件和另一个文件的结束事件。

之所以发生这种情况,是因为filebeat收割机是并行文件,然后将其批量发送到logstash。在您的情况下,配置中的elapsed选项没有用,它与运送数据而不是收集数据的工作人员数量有关。

您可以尝试使用选项worker来限制并行收割机的数量,但这会减慢数据处理的速度,并且不能保证不会混淆您的过滤器。而且,Filebeat不能保证事件的顺序,而只能保证一次发送。

最好的解决方案是创建一个将harvester_limit: 1字段与Event字段连接在一起的唯一字段,这样就不会混淆来自不同文件的事件。

您可以通过在filename过滤器之前添加mutate过滤器来做到这一点。

elapsed

这将创建一个名为mutate { add_field => { "uniqueEvent" => "%{Event}_%{filename}" } } 的字段,其值类似于uniqueEvent,然后您将在Lclick_filenameelapsed过滤器中使用此新字段。

如果在不同文件夹中具有相同的文件名,则需要使用路径中的另一个字段,直到将aggregate的值设为唯一值为止。

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...