带有cassandra时间戳或timeuuid的Logstash sql_last_value

问题描述

我们已经配置了JDBC logstash,以通过Logstash将数据从Cassandra迁移到elasticsearch。我们已经能够通过logstash将数据从Cassandra移至elasticsearch,我们也将基于document_id更新弹性搜索中的数据

我们要基于Cassandra的更改进行增量更新,因此我们在logstash输入JDBC中使用 sql_last_value

statement => "SELECT emp_id,emp_city,emp_name,emp_phone,emp_sal,updated_time FROM cloud.employee_list where updated_time > :sql_last_value and updated_time < toTimestamp(now()) ALLOW FILTERING;"
[ERROR] 2020-11-05 19:00:00.517 [Ruby-0-Thread-26: :1] jdbc - Java::JavaSql::SQLException: [Simba][CassandraJDBCDriver](500211) ERROR Invalid query: SELECT emp_id,updated_time FROM iqcloud.employee_list where updated_time > '2020-09-09 07:57:12.836000' and updated_time < toTimestamp(now()) ALLOW FILTERING,Cause: com.simba.cassandra.shaded.datastax.driver.core.exceptions.InvalidQueryException: Unable to coerce '2020-09-09 07:57:12.836000' to a formatted date (long).: SELECT emp_id,updated_time FROM cloud.employee_list where updated_time > '2020-09-09 07:57:12.836000' and updated_time < toTimestamp(now()) ALLOW FILTERING;

在如下所示存储的Cassandra时间戳列中 2020-09-09 07:57:12.836000 + 0000, 当我们需要获取时,我们应该删除多余的零并按如下所示获取 Updated_time ='2020-09-09 07:57:12.836'

在错误日志中,由于sql_last_value中的其他零,我们得到的查询无效。有什么方法可以在sql_last_value中使用我们的timestamp列值,还是任何其他方法来实现sql_last_value?

任何人都可以澄清一下吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)