问题描述
我正在使用Kafka Stream来创建仅包含特定于client_id的数据(不是主题密钥)的ktable。我是Kafka Streams的新手,它看起来很简单,但是我对社区中提供的多个示例确实感到困惑。
我正在尝试获取具有client_id = 0123456的inputTopic数据。在Ksql中,下面的命令类似于:
CREATE STREAM TOPIC1_CLIENT1 AS
SELECT * FROM TOPIC1
WHERE client_id= '0123456'
EMIT CHANGES;
下面,我尝试重现相同的行为。有人可以在下面告诉我我在做什么错吗?它没有按照我的预期进行过滤。
final KStream<String,String> stream = builder.stream(inputTopic,Consumed.with(stringSerde,stringSerde));
final KTable<String,String> convertedTable = stream.filter((client_id,v) -> v.equals("0123456")).toTable(Materialized.as("stream-converted-to-table"));
stream.to(streamsOutputTopic,Produced.with(stringSerde,stringSerde));
convertedTable.toStream().to(tableOutputTopic,stringSerde));
解决方法
v
是消息的整个值。要在KSQL中命名字段,流上有一个关联的模式,例如数据是JSON还是Avro,这意味着clientid只是值的一部分