在特定持续时间内使用 hdfs 的 logstash

问题描述

嗨,我是新的 logstash,我已经完成了从 tcp 读取数据并写入 hdfs...那部分是不,但我想将数据写入 4 个不同的 hdfs 文件

这是示例代码

input {
tcp {

host => "X.X.X.X"
port => 5051
codec => json_lines
}

}
filter 
{
  mutate 
{
 remove_field => [ "@version","path","host","logger_name","tags","stack_info","level","port","type"]
 }
 mutate {
         add_field => { "count" => "1"} 
 }

 }

 output {
 webhdfs 
    {
    
        host => "127.0.0.1"                
        port => 50070  
        path => "/folder/%{+YYYY-MM-dd_HH-mm}.csv"          
        user => "hduser"                
        codec => line { format => "%{message}"} 
    }

这里的问题是我已经写入文件夹,但后来我想写入 3 个不同的文件夹,如文件夹 1、文件夹 2、文件夹 3 一段时间......

解决方法

有可能,您需要使用一些 mutate 过滤器和一些条件。

首先需要从事件的@timestamp中获取分钟的值并将该值添加到新字段中,可以使用[@metadata]对象,该对象可用于过滤,但它不会出现在输出事件中。

mutate {
    add_field => { "[@metadata][minute]" => "%{+mm}" }
}

然后您需要指定哪一分钟将保存在哪个文件夹中。

例如,如果你想要这样的东西:

00:00 到 00:59 - 文件夹 1
01:00 至 01:59 - 文件夹 2
02:00 至 02:59 - 文件夹 3
03:00 到 03:59 - 文件夹 4

在下一分钟,从 04:00 到 04:59,从文件夹 1 开始,您将需要这样的东西,仅考虑前 8 分钟。

if [@metadata][minute] in ["00","04"] {
    mutate {
        add_field => {"[@metadata][folder]" => "folder1" }
    }
}
if [@metadata][minute] in ["01","05"] {
    mutate {
        add_field => {"[@metadata][folder]" => "folder2" }
    }
}
if [@metadata][minute] in ["02","06"] {
    mutate {
       add_field => {"[@metadata][folder]" => "folder3" }
    }
}
if [@metadata][minute] in ["03","07"] {
    mutate {
        add_field => {"[@metadata][folder]" => "folder4" }
    }
}

然后在您的输出中,您将在路径中使用 [@metadata][folder]

path => "/[@metadata][folder]/%{+YYYY-MM-dd_HH-mm}.csv"

您只需要将条件扩展到一小时内的其他分钟。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...