问题描述
嗨,我是新的 logstash,我已经完成了从 tcp 读取数据并写入 hdfs...那部分是不,但我想将数据写入 4 个不同的 hdfs 文件夹
这是示例代码
input {
tcp {
host => "X.X.X.X"
port => 5051
codec => json_lines
}
}
filter
{
mutate
{
remove_field => [ "@version","path","host","logger_name","tags","stack_info","level","port","type"]
}
mutate {
add_field => { "count" => "1"}
}
}
output {
webhdfs
{
host => "127.0.0.1"
port => 50070
path => "/folder/%{+YYYY-MM-dd_HH-mm}.csv"
user => "hduser"
codec => line { format => "%{message}"}
}
这里的问题是我已经写入文件夹,但后来我想写入 3 个不同的文件夹,如文件夹 1、文件夹 2、文件夹 3 一段时间......
解决方法
有可能,您需要使用一些 mutate
过滤器和一些条件。
首先需要从事件的@timestamp
中获取分钟的值并将该值添加到新字段中,可以使用[@metadata]
对象,该对象可用于过滤,但它不会出现在输出事件中。
mutate {
add_field => { "[@metadata][minute]" => "%{+mm}" }
}
然后您需要指定哪一分钟将保存在哪个文件夹中。
例如,如果你想要这样的东西:
00:00 到 00:59 - 文件夹 1
01:00 至 01:59 - 文件夹 2
02:00 至 02:59 - 文件夹 3
03:00 到 03:59 - 文件夹 4
在下一分钟,从 04:00 到 04:59,从文件夹 1 开始,您将需要这样的东西,仅考虑前 8 分钟。
if [@metadata][minute] in ["00","04"] {
mutate {
add_field => {"[@metadata][folder]" => "folder1" }
}
}
if [@metadata][minute] in ["01","05"] {
mutate {
add_field => {"[@metadata][folder]" => "folder2" }
}
}
if [@metadata][minute] in ["02","06"] {
mutate {
add_field => {"[@metadata][folder]" => "folder3" }
}
}
if [@metadata][minute] in ["03","07"] {
mutate {
add_field => {"[@metadata][folder]" => "folder4" }
}
}
然后在您的输出中,您将在路径中使用 [@metadata][folder]
。
path => "/[@metadata][folder]/%{+YYYY-MM-dd_HH-mm}.csv"
您只需要将条件扩展到一小时内的其他分钟。