Logstash :sql_last_value 显示错误的垃圾日期将 6 个月的旧日期显示为上次运行时间

问题描述

我观察到非常奇怪的问题 我正在使用 logstash + jdbc 将数据从 Oracle db 加载到 Elasticsearch 下面是我的配置文件的样子

input{
  jdbc{
    clean_run => "false"
    jdbc_driver_library => "<path_to_ojdbc8-12.1.0.jar>"
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
    jdbc_connection_string => "<connection_string>"
    jdbc_user => "<usename>"
    jdbc_password_filepath => ".\pwd.txt"
    statement=> "SELECT * FROM customers WHERE CUSTOMER_NAME  LIKE 'PE%' AND UPD_DATE  > :sql_last_value "
    schedule=>"*/1 * * * * "
    use_column_value => true
    tracking_column_type => "timestamp"
    
    tracking_column => "upd_date"
    
    last_run_Metadata_path =>"<path to logstash_Metadata>"
    record_last_run => true
    }
}

filter {
  mutate {
    copy => { "id" => "[@Metadata][_id]"}
    remove_field => ["@version","@timestamp"]
  }
}
output {
      elasticsearch{
    hosts => ["<host>"]
    index => "<index_name>"
    document_id=>"%{[@Metadata][_id]}"
    user => "<user>"
    password => "<pwd>"
}

   stdout{
      codec => dots
   }
}

现在,我在 2021 年 3 月 8 日的今天每分钟触发一次此文件。 当我为第一次加载时,一切都很好 -:sql_last_value 是 '1970-01-01 00:00:00.000000 +00:00'

但是在第一次加载之后,理想情况下,logstash_Metadata 应该显示 '2021-03-08 ' 但奇怪的是它正在更新为 --- 2020-09-11 01:05: 09.000000000 Z 在 logstash_Metadata (:sql_last_value)

如您所见,差异接近 180 天

我尝试了多次,但仍然以相同的方式更新,因此我的增量负载被搞砸了

我的logstash版本是7.10.2

非常感谢您的帮助!

注意:我没有使用分页,因为结果集中的结果数量对于我的查询来说总是非常少

解决方法

记录日期是最后处理行的日期。

看到您的查询,您没有从数据库读取的记录的特定顺序。 Logstash jdbc 输入插件将您的查询包含在一个按 [1] 对行进行排序的查询中,1 是它排序所依据的列的序数。

因此,要以正确的顺序处理记录并获取最新的 upd_date 值,您需要将 upd_date 设为 select 语句中的第一列。

input{
  jdbc{
    clean_run => "false"
    jdbc_driver_library => "<path_to_ojdbc8-12.1.0.jar>"
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
    jdbc_connection_string => "<connection_string>"
    jdbc_user => "<usename>"
    jdbc_password_filepath => ".\pwd.txt"
    statement=> "SELECT c.UPD_DATE,c.CUSTOMER_NAME,c.<Other field> 
                 FROM customers c 
                 WHERE c.CUSTOMER_NAME LIKE 'PE%' AND c.UPD_DATE > :sql_last_value 
                 ORDER BY c.UPD_DATE ASC"
    schedule=>"*/1 * * * * "
    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "upd_date"        
    last_run_metadata_path =>"<path to logstash_metadata>"
    record_last_run => true
    }
}

另请注意,即使您设置了 jdbc_page_size,这种方法也会在第一次运行 logstash 时耗尽表。如果你想要这个,那很好。

但是如果你想让logstash每分钟运行一批X行并停止直到下一次执行,那么你必须结合使用jdbc_page_size和带有限制的查询来让logstash准确检索你想要的记录数量,在正确的顺序。在 SQL Server 中它是这样工作的:

input{
  jdbc{
    jdbc_driver_library => ...
    jdbc_driver_class => ...
    jdbc_connection_string => ...
    jdbc_user => ...
    jdbc_password_filepath => ...
    statement=> "SELECT TOP 10000 c.UPD_DATE,c.CUSTOMER_NAME
                 FROM customers c 
                 WHERE c.CUSTOMER_NAME  LIKE 'PE%' AND c.UPD_DATE  > :sql_last_value 
                 ORDER BY c.UPD_DATE ASC"
    schedule=>"*/1 * * * * "
    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "upd_date"
    jdbc_page_size => 10000
    last_run_metadata_path =>"<path to logstash_metadata>"
    record_last_run => true
    }
}

对于 Oracle DB,您必须根据版本更改查询,或者使用 仅获取第一行 x 行;使用 Oracle 12,或使用旧版本的 ROWNUM。

无论如何,我建议您查看日志以检查 logstash 运行的查询。