有没有办法在 Node.js 中使用 ksqldb 表推送查询?

问题描述

我测试了主题'topic1' 的 Kafka 集群(架构注册表中的架构)。

我从这个主题创建了

CREATE STREAM topic1_basic_stream
WITH (KAFKA_TOPIC='topic1',VALUE_FORMAT='AVRO',TIMESTAMP='triggertime',TIMESTAMP_FORMAT='yyyyMMddHHmmssX');

然后我创建了聚合表作为选择 PUSH 查询(没有窗口),只有今天的数据:

CREATE TABLE topic1_agg_table_JSON WITH(VALUE_FORMAT='JSON') AS
select
   concat(product_type,' - ',region) id,sum(sale_charge) sum_sale_charge,count(1) cnt
from topic1_basic_stream
where TIMESTAMPTOSTRING(rowtime,'yyyyMMdd') = TIMESTAMPTOSTRING(UNIX_TIMESTAMP(),'yyyyMMdd')
group by concat(product_type,region)
EMIT CHANGES;

从 ksqlDB CLI 我运行并看到 OK 结果:

SET 'auto.offset.reset'='earliest';
select * from topic1_agg_table_JSON EMIT CHANGES;
+----------------------+---------------------------+-----+
|id                    |sum_sale_charge            |CNT  |
+----------------------+---------------------------+-----+
|Pen - London          |90.0                       |45   |
|Book - Paris          |45.0                       |9    |
|Pen - Amsterdam       |26.0                       |13   |
|Keyboard - oslo       |60.0                       |6    |
|Pen - London          |92.0                       |46   |

Press CTRL-C to interrupt

我还可以在主题列表中看到 topic1_agg_table_JSON,其中包含 json 消息。

目标:我想在 node.js 中编写消费者以将这些消息(使用 websockets)发送到浏览器(客户端)并在客户端对其(实时)进行可视化。

已经尝试过: kafka-node 模块。示例代码取自 https://github.com/SOHU-Co/kafka-node/blob/master/example/consumer.js 使用简单的原始(简单)Kafka 主题在这代码中一切正常,但如果我将主题更改为我的 topic1_agg_table_JSON,它不会抛出任何错误,也不会打印任何消息。

问题:使用 Node.js 使用来自 topic1_agg_table_JSON 的数据的正确方法是什么?

解决方法

已解决。问题出在过时的 kafka-node 模块版本中。更新 (npm update kafka-node) 后,应用程序成功使用了主题中的所有消息,但没有像我预期的那样(目标不是接收所有消息,而是接收每条消息的最新版本,这是另一个问题)。