Logstash:是否可以以某种方式添加日期差为两个或更多日志行的字段?

问题描述

问题如下,我使用filebeat和logstash将日志上传到elasticsearch。

"03.08.2020 10:56:38","Event LClick","Type Menu","t=0","beg"

"03.08.2020 10:56:38","Detail Impale","t=109","end"

"03.08.2020 10:56:40","t=1981","beg"

"03.08.2020 10:56:40","t=2090","end"

"03.08.2020 10:56:41","Type ToolBar","t=3026","beg"

"03.08.2020 10:56:44","Event FormActivate","Name SomeName","t=5444"

"03.08.2020 10:56:43","Detail Test","t=4477","end"

这些是用户在Web表单中执行的操作的日志。每个动作都有一个开始(在行的末尾是“ beg”)和一个结束(在行的末尾是“ end”)。

我需要计算用户执行操作的时间差,并尽可能将其输出为字段(即使该值为零)。

例如:“ 03.08.2020 10:56:44”-“ 03.08.2020 10:56:41” = 3秒(这应该是一个新字段)

也许我需要以某种方式合并这些字段?

如果在logstash中有一个减去日期的解决方案,那么我该如何对在开始和结束之间有其他动作的动作(例如“ Event FormActivate”)实施该动作。

也许这可以通过Elasticsearch内部已经存在的某些查询解决

我是一名新手,非常感谢您的帮助。 现在,我的logstash配置:

input {
    beats {
        port => '5044'
    }
}
 filter {
    mutate {
        remove_field => [ '@version','input','host','ecs','agent' ]
        remove_tag => [ 'beats_input_codec_plain_applied' ]
    }
    grok {
        patterns_dir => ['./patterns']
        match => { 'message' => '%{TIME:timestamp}(","Event\s)(?<event>([^"]+))(","Form\s)?(?<form>([^"]+))?(","ParentType\s)?(?<parent_type>([^"]+))?(","ParentName\s)?(?<parent_name>([^"]+))?(","Type\s)?(?<type>([^"]+))?(","Name\s)?(?<name>([^"]+))?(","Detail\s)?(?<detail>([^"]+))?(","t=)?(?<t>([\d]+))?' }
    }
    date {
        match => [ 'timestamp','dd.MM.yyyy HH:mm:ss' ]
        timezone => 'Europe/Moscow'
        target => '@timestamp'
        remove_field => 'timestamp'
    }
    mutate {
        rename => ['log','user_path']
        rename => ['@timestamp','logdate']
    }
}
output {
    elasticsearch {
        hosts => ['localhost:9200']
        index => 'test'
    }
}

更新:

我试图理解Val建议的线程中的动作。但是我还是没有成功。这就是我对logstash配置所做的:

 filter {
    grok {
        patterns_dir => ['./patterns']
        match => { 'message' => '%{TIME:timestamp}(","t=)?(?<t>([\d]+))?(",")?(?<status>(end|beg))?' }
        add_tag => [ '%{status}' ]
    }
    date {
        match => [ 'timestamp','dd.MM.yyyy HH:mm:ss' ]
    }
    elapsed {
        unique_id_field => 'event'
        start_tag => 'beg'
        end_tag => 'end'
        new_event_on_match => true
        add_tag => ['1->2']
    }
    if '1->2' in [tags] and 'elapsed' in [tags] {
        aggregate {
            task_id => '%{event}'
            code => 'map["report"] = [(event["elapsed_time"]*1000).to_i]'
            map_action => 'create'
            end_of_task => true
        }
    }
}

但是它根本不起作用。在我看来,我很困惑:(

也许如果我在弹性搜索中展示我想看到的东西,那会更好。对于七行日志(帖子开头的日志),它应如下所示:

{
                 "username" => "I will get the username from the log path and I want it to get here too","elapsed_time" => date difference,"event" => "event from line","elapsed_timestamp_start" => "start time"
}

从Elasticsearch的七行日志中,应该有三条这样的记录。 请帮助我为此任务编写一个过滤器。谢谢!

关于聚合过滤器插件的文档的另一个问题:

 You should be very careful to set Logstash filter workers to 1 (-w 1 flag) for this filter to work correctly otherwise events may be processed out of sequence and unexpected results will occur.

我找不到需要添加此标志的答案。也许就是这个问题。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...