问题描述
我观察到非常奇怪的问题 我正在使用 logstash + jdbc 将数据从 Oracle db 加载到 Elasticsearch 下面是我的配置文件的样子
input{
jdbc{
clean_run => "false"
jdbc_driver_library => "<path_to_ojdbc8-12.1.0.jar>"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "<connection_string>"
jdbc_user => "<usename>"
jdbc_password_filepath => ".\pwd.txt"
statement=> "SELECT * FROM customers WHERE CUSTOMER_NAME LIKE 'PE%' AND UPD_DATE > :sql_last_value "
schedule=>"*/1 * * * * "
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "upd_date"
last_run_Metadata_path =>"<path to logstash_Metadata>"
record_last_run => true
}
}
filter {
mutate {
copy => { "id" => "[@Metadata][_id]"}
remove_field => ["@version","@timestamp"]
}
}
output {
elasticsearch{
hosts => ["<host>"]
index => "<index_name>"
document_id=>"%{[@Metadata][_id]}"
user => "<user>"
password => "<pwd>"
}
stdout{
codec => dots
}
}
现在,我在 2021 年 3 月 8 日的今天每分钟触发一次此文件。 当我为第一次加载时,一切都很好 -:sql_last_value 是 '1970-01-01 00:00:00.000000 +00:00'
但是在第一次加载之后,理想情况下,logstash_Metadata 应该显示 '2021-03-08
如您所见,差异接近 180 天
我尝试了多次,但仍然以相同的方式更新,因此我的增量负载被搞砸了
我的logstash版本是7.10.2
非常感谢您的帮助!
注意:我没有使用分页,因为结果集中的结果数量对于我的查询来说总是非常少
解决方法
记录日期是最后处理行的日期。
看到您的查询,您没有从数据库读取的记录的特定顺序。 Logstash jdbc 输入插件将您的查询包含在一个按 [1] 对行进行排序的查询中,1 是它排序所依据的列的序数。
因此,要以正确的顺序处理记录并获取最新的 upd_date 值,您需要将 upd_date 设为 select 语句中的第一列。
input{
jdbc{
clean_run => "false"
jdbc_driver_library => "<path_to_ojdbc8-12.1.0.jar>"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "<connection_string>"
jdbc_user => "<usename>"
jdbc_password_filepath => ".\pwd.txt"
statement=> "SELECT c.UPD_DATE,c.CUSTOMER_NAME,c.<Other field>
FROM customers c
WHERE c.CUSTOMER_NAME LIKE 'PE%' AND c.UPD_DATE > :sql_last_value
ORDER BY c.UPD_DATE ASC"
schedule=>"*/1 * * * * "
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "upd_date"
last_run_metadata_path =>"<path to logstash_metadata>"
record_last_run => true
}
}
另请注意,即使您设置了 jdbc_page_size,这种方法也会在第一次运行 logstash 时耗尽表。如果你想要这个,那很好。
但是如果你想让logstash每分钟运行一批X行并停止直到下一次执行,那么你必须结合使用jdbc_page_size和带有限制的查询来让logstash准确检索你想要的记录数量,在正确的顺序。在 SQL Server 中它是这样工作的:
input{
jdbc{
jdbc_driver_library => ...
jdbc_driver_class => ...
jdbc_connection_string => ...
jdbc_user => ...
jdbc_password_filepath => ...
statement=> "SELECT TOP 10000 c.UPD_DATE,c.CUSTOMER_NAME
FROM customers c
WHERE c.CUSTOMER_NAME LIKE 'PE%' AND c.UPD_DATE > :sql_last_value
ORDER BY c.UPD_DATE ASC"
schedule=>"*/1 * * * * "
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "upd_date"
jdbc_page_size => 10000
last_run_metadata_path =>"<path to logstash_metadata>"
record_last_run => true
}
}
对于 Oracle DB,您必须根据版本更改查询,或者使用 仅获取第一行 x 行;使用 Oracle 12,或使用旧版本的 ROWNUM。
无论如何,我建议您查看日志以检查 logstash 运行的查询。