问题描述
阅读一些文章后,我了解到ksqdb表实际上是聚合流。意思是它显示当前业务状态。不是业务历史。业务历史包含流。但是当我在表中使用推式查询时,它显示的是历史列表而不是当前状态。我的过程定义如下。
流创建:
CREATE STREAM products2 (product_name VARCHAR KEY,cost DOUBLE)
WITH (kafka_topic='products2',partitions=1,value_format='json');
表创建:
CREATE TABLE products2_t (product_name VARCHAR PRIMARY KEY,value_format='json');
插入查询:
insert into PRODUCTS2 (product_name,cost) values ('a',1);
insert into PRODUCTS2 (product_name,2);
insert into PRODUCTS2 (product_name,cost) values ('b',1);
insert into PRODUCTS2 (product_name,2);
流输出:没问题,
select * from products2 emit changes;
期望的输出列表将是:
a 13
b 5
所以请帮助我。这里哪里错了。
感谢所有人。
解决方法
该表将显示当前状态,但是当状态{em>更改时,使用EMIT CHANGES
(即推送查询),您将收到输出。如果您在输入暂停时重新运行该查询,则只会看到当前状态。
如果要查询当前状态而不是后续更改,则可以运行请求查询,需要在表中具体化该状态。例如:
CREATE TABLE PRODUCTS_T AS
SELECT PRODUCT_NAME,LATEST_BY_OFFSET(COST) AS LATEST_COST
FROM PRODUCTS2
GROUP BY PRODUCT_NAME;
现在您可以直接查询它了-请注意查询返回并且不等待任何将来的状态更改:
ksql> SELECT LATEST_COST FROM PRODUCTS_T WHERE PRODUCT_NAME='a';
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|LATEST_COST |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2.0 |
Query terminated
ksql>