如何获取ksqldb表的当前状态?

问题描述

阅读一些文章后,我了解到ksqdb表实际上是聚合流。意思是它显示当前业务状态。不是业务历史。业务历史包含流。但是当我在表中使用推式查询时,它显示的是历史列表而不是当前状态。我的过程定义如下。

流创建:

CREATE STREAM products2 (product_name VARCHAR KEY,cost DOUBLE)
WITH (kafka_topic='products2',partitions=1,value_format='json');

表创建:

CREATE TABLE products2_t (product_name VARCHAR PRIMARY KEY,value_format='json');

插入查询

insert into PRODUCTS2 (product_name,cost) values ('a',1); 
insert into PRODUCTS2 (product_name,2);
insert into PRODUCTS2 (product_name,cost) values ('b',1);
insert into PRODUCTS2 (product_name,2); 

输出没问题,

select * from products2 emit changes;

enter image description here


输出根据ksqldb表定义不正确:

enter image description here

期望的输出列表将是:

a 13 
b 5

所以请帮助我。这里哪里错了。
感谢所有人。

解决方法

该表将显示当前状态,但是当状态{em>更改时,使用EMIT CHANGES(即推送查询),您将收到输出。如果您在输入暂停时重新运行该查询,则只会看到当前状态。

如果要查询当前状态而不是后续更改,则可以运行请求查询,需要在表中具体化该状态。例如:

CREATE TABLE PRODUCTS_T AS
SELECT PRODUCT_NAME,LATEST_BY_OFFSET(COST) AS LATEST_COST
FROM PRODUCTS2
GROUP BY PRODUCT_NAME;

现在您可以直接查询它了-请注意查询返回并且不等待任何将来的状态更改:

ksql> SELECT LATEST_COST FROM PRODUCTS_T WHERE PRODUCT_NAME='a';
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|LATEST_COST                                                                                                                                                                            |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2.0                                                                                                                                                                                    |
Query terminated
ksql>

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...