日志存储未在 Elasticsearch 中加载确切数量的记录,并且每次命中结果都在不断变化

问题描述

问题陈述:Logstash 没有将所有记录从数据库正确加载到 elasticsearch,每次我点击相同的 api 都会得到不同的结果(但有时正确但每次点击都会发生变化,并且只显示称呼嵌套字段下的记录子集)。 logstash 机制看起来零星,加载结果不一致,尤其是在一对多场景中。 http://localhost:9200/staffsalutation/_search 我观察到 logstash logstash-7.8.0 的奇怪行为,同时从 2 个表中加载记录,查询和配置如下

查询
选择s.update_time,s.staff_id 作为staff_id、birth_date、first_name、last_name、gender、hire_date、 st.title AS title_nm,st.from_date AS title_frm_dt,st.to_date AS title_to_dt 来自员工 左加入称呼 st ON s.staff_id = st.staff_id 按 s.update_time 排序

    input {
        
        jdbc {

            jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"
            jdbc_driver_library => "C:\\Users\\NS\\.m2\\repository\\org\\postgresql\\postgresql\\42.2.11\\postgresql-42.2.11.jar"
            jdbc_user => "postgres"
            jdbc_password => "postgres"
            jdbc_driver_class => "org.postgresql.Driver"
            schedule => "* * * * *"     
            statement => "select  e.update_time,e.emp_no as staff_id,birth_date,first_name,last_name,gender,hire_date,t.title AS title_nm,t.from_date AS title_frm_dt,t.to_date AS title_to_dt 
            from employees e 
            LEFT JOIN titles t 
            ON e.emp_no  = t.emp_no  
            order by e.update_time"

            add_field => { "doctype" => "employee" }
            tracking_column_type => "timestamp"
            use_column_value =>true
            tracking_column => update_time
            
            jdbc_fetch_size => "50000"
        }

    }
    filter {
    aggregate {
            task_id => "%{staff_id}"
                code => "
                    map['staff_id'] = event.get('staff_id')
                    map['birth_date'] = event.get('birth_date')
                    map['first_name'] = event.get('first_name')
                    map['last_name'] = event.get('last_name')
                    map['gender'] = event.get('gender')
                    map['hire_date'] = event.get('hire_date')
                    map['salutations'] ||= []
                    map['salutations'] << {
                    'title_nm' => event.get('title_nm'),'title_frm_dt' => event.get('title_frm_dt'),'title_to_dt' => event.get('title_to_dt')
                    }
                    event.cancel()
                "
            push_prevIoUs_map_as_event => true
            timeout => 30
            }
    }
    output {
        elasticsearch {
        document_id => "%{staff_id}"
        index => "staffsalutation"
        }
        file {
        path => "test.log"  
        codec => line
       }
    }

解决方法

找到解决方案!

  1. 需要在查询中使用 order by 子句,以便记录按 emp_no 和 logstash 可以搜索和聚合依赖实体,如标题(如一对多)。

选择e.update_time、e.emp_no作为staff_id、birth_date、first_name、 姓氏、性别、聘用日期、t.title AS title_nm、t.from_date AS title_frm_dt,t.to_date AS title_to_dt 来自员工 e LEFT JOIN titles t ON e.emp_no = t.emp_no order by e.emp_no

  1. 因为这里使用了聚合,所以需要有单线程来处理记录 else 它会导致聚合问题(这就是您根据上面的 url 多次调用搜索索引时会得到的随机结果)。虽然它看起来是一个性能损失,因为只有 1 个工作线程将处理记录,但可以通过调用具有异构记录集的多个 Logstash 配置文件来缓解它,例如一个文件中的前 100 个 emp_no 和另一个文件中的第 200 个,以便 logstash 可以并行执行它们。 所以执行如下

logstash -f logstash_config.conf -w 1