问题描述
我测试了主题为 'topic1'
的 Kafka 集群(架构注册表中的架构)。
我从这个主题创建了流:
CREATE STREAM topic1_basic_stream
WITH (KAFKA_TOPIC='topic1',VALUE_FORMAT='AVRO',TIMESTAMP='triggertime',TIMESTAMP_FORMAT='yyyyMMddHHmmssX');
然后我创建了聚合表作为选择 PUSH 查询(没有窗口),只有今天的数据:
CREATE TABLE topic1_agg_table_JSON WITH(VALUE_FORMAT='JSON') AS
select
concat(product_type,' - ',region) id,sum(sale_charge) sum_sale_charge,count(1) cnt
from topic1_basic_stream
where TIMESTAMPTOSTRING(rowtime,'yyyyMMdd') = TIMESTAMPTOSTRING(UNIX_TIMESTAMP(),'yyyyMMdd')
group by concat(product_type,region)
EMIT CHANGES;
从 ksqlDB CLI 我运行并看到 OK 结果:
SET 'auto.offset.reset'='earliest';
select * from topic1_agg_table_JSON EMIT CHANGES;
+----------------------+---------------------------+-----+
|id |sum_sale_charge |CNT |
+----------------------+---------------------------+-----+
|Pen - London |90.0 |45 |
|Book - Paris |45.0 |9 |
|Pen - Amsterdam |26.0 |13 |
|Keyboard - oslo |60.0 |6 |
|Pen - London |92.0 |46 |
Press CTRL-C to interrupt
我还可以在主题列表中看到 topic1_agg_table_JSON
,其中包含 json 消息。
目标:我想在 node.js 中编写消费者以将这些消息(使用 websockets)发送到浏览器(客户端)并在客户端对其(实时)进行可视化。
已经尝试过: kafka-node
模块。示例代码取自 https://github.com/SOHU-Co/kafka-node/blob/master/example/consumer.js 使用简单的原始(简单)Kafka 主题在这段代码中一切正常,但如果我将主题更改为我的 topic1_agg_table_JSON
,它不会抛出任何错误,也不会打印任何消息。
问题:使用 Node.js 使用来自 topic1_agg_table_JSON
的数据的正确方法是什么?
解决方法
已解决。问题出在过时的 kafka-node
模块版本中。更新 (npm update kafka-node
) 后,应用程序成功使用了主题中的所有消息,但没有像我预期的那样(目标不是接收所有消息,而是接收每条消息的最新版本,这是另一个问题)。