问题描述
我正在尝试使用 Spring 云流将数据从 kafka 发送到 Rsocket,然后在 React 上表示数据
这是我的配置。
@Configuration
public class RsocketConsumerConfiguration {
@Bean
public Sinks.Many<Data> sender(){
return Sinks.many().multicast().directBestEffort();
}
}
@控制器 公共类 ServerController {
@Autowired
private Sinks.Many<Data> integer;
@MessageMapping("integer")
public Flux<Data> integer() {
return integer.asFlux();
}
@EnableBinding(IClientProcessor.class)
public class Listener {
@Autowired
private Sinks.Many<Data> integer;
@StreamListener(IClientProcessor.INTEGER)
public void integer(Data val) {
System.out.println(val);
integer.tryEmitNext(val);
}
}
let client = new RSocketClient({
transport: new RSocketWebSocketClient(
{
url: 'ws://localhost:7000/ws',wsCreator: (url) => new WebSocket(url),debug: true,},BufferEncoders,),setup: {
dataMimeType: "application/json",MetadataMimeType: MESSAGE_RSOCKET_COMPOSITE_MetaDATA.string,keepAlive: 5000,lifetime: 60000,});
client
.then(rsocket => {
console.log("Connected to rsocket");
rsocket.requestStream({
Metadata: Buffer.from(encodeCompositeMetadata([
[MESSAGE_RSOCKET_ROUTING,encodeRoute("integer")],])),})
.subscribe({
onSubscribe: s => {
s.request(2147483647)
},onNext: (p) => {
let newData = {
time: new Date(JSON.parse(p.data).time).getUTCSeconds(),integer: JSON.parse(p.data).integer
}
newData.integer >100?setInteger(currentData => [newData,...currentData]):setInt(currentData => [newData,...currentData])
console.log(newData)
},onError: (e) => console.error(e),onComplete: () => console.log("Done")
});
spring.cloud.stream.bindings.integer.destination=整数 无法在反应应用程序中看到它。请指教。我做错了什么?
解决方法
鉴于数据似乎直接从 Kafka(通过 Spring)传输到客户端,也许 stream Kafka messages via an internet-messaging broker to Internet-facing clients over WebSockets 更有意义。
披露:我不是那篇文章的作者,但在作者工作的那家公司工作。我们经常看到这个用例,所以希望这种方法可能有用。