当物化视图中出现新的 MAX 值时,具有唯一的 Kafka ksqlDB 事件

问题描述

对 ksqlDB 事件和物化视图感到困惑。当表聚合/物化视图中确定的新高股价出现时,我想在专用主题/流中收到通知,但我正在为每个事件报告高价,而不仅仅是在出现新高价事件时.

这是我的工作示例/设置。

为股票创建基础流和主题

ksql> create stream stocks (symbol VARCHAR KEY,company VARCHAR,price DECIMAL(9,2))
>  WITH (KAFKA_TOPIC='stocks',PARTITIONS=1,VALUE_FORMAT='json');

Message
----------------
Stream created
----------------

添加一些 Acme Corp 股票价格的初始数据

ksql> insert into stocks (symbol,company,price) values ('Acme','Acme Corp',111.11);
ksql> insert into stocks (symbol,111.12);
ksql> insert into stocks (symbol,111.13);

打印基础主题以证明数据存在。

ksql> print 'stocks' from beginning limit 3;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/21 14:26:57.169 Z,key: 1094929733,value: {"COMPANY":"Acme Corp","PRICE":111.11}
rowtime: 2021/03/21 14:27:01.717 Z,"PRICE":111.12}
rowtime: 2021/03/21 14:27:04.546 Z,"PRICE":111.13}
Topic printing ceased

创建聚合/物化视图以显示最高股价。

ksql> create table stock_highs as
>  select symbol,max(price) as high
>  from stocks
>  group by symbol
>  emit changes;

 Message
--------------------------------------------
 Created query with ID CTAS_STOCK_HIGHS_115
--------------------------------------------

查询以进行目视检查。

ksql> select * from stock_highs where symbol = 'Acme';
+--------------+------------------------+
|SYMBOL        |HIGH                    |
+--------------+------------------------+
|Acme          |111.13                  |
Query terminated

在单独的终端(终端 2)中使用消费者(又名打印 STOCK_HIGHS)来观察股票高价的变化。

ksql> print STOCK_HIGHS from beginning;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/21 14:27:04.546 Z,value: {"HIGH":111.13}

回到原始终端(终端 1),使用 ksql 客户端插入更多数据以强制更新库存高位。

ksql> insert into stocks (symbol,111.20);
ksql> insert into stocks (symbol,111.15);

上述插入应该给出 111.20 的新高价,而忽略 111.15 的价格,因此,在我的想法和用例中,我想从 STOCK_HIGHS 主题中获得一个事件/消息,显示 111.20 的新高股价,但是不是 111.15 的价格。但是,我在消费者(终端 2)中收到了两个新事件。

ksql> print STOCK_HIGHS from beginning;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/21 14:27:04.546 Z,value: {"HIGH":111.13}
rowtime: 2021/03/21 14:37:51.234 Z,value: {"HIGH":111.20}
rowtime: 2021/03/21 14:39:03.301 Z,value: {"HIGH":111.20}

问题是我真的只想在新高价格出现时收到通知或“事件”,这样我就可以让消费者在新高价格出现时离开并做一些有意义的事情。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)