问题描述
我是第一次做。使用websocket在哪里读取数据流。
这是我的代码段
RsvpApplication
@SpringBootApplication
public class RsvpApplication {
private static final String MEETUP_RSVPS_ENDPOINT = "ws://stream.myapi.com/2/rsvps";
public static void main(String[] args) {
SpringApplication.run(RsvpApplication.class,args);
}
@Bean
public ApplicationRunner initializeConnection(
RsvpsWebSocketHandler rsvpsWebSocketHandler) {
return args -> {
System.out.println("initializeConnection");
WebSocketClient rsvpsSocketClient = new StandardWebSocketClient();
rsvpsSocketClient.doHandshake(
rsvpsWebSocketHandler,MEETUP_RSVPS_ENDPOINT);
};
}
}
RsvpsWebSocketHandler
@Component
class RsvpsWebSocketHandler extends AbstractWebSocketHandler {
private static final Logger logger =
Logger.getLogger(RsvpsWebSocketHandler.class.getName());
private final RsvpsKafkaProducer rsvpsKafkaProducer;
public RsvpsWebSocketHandler(RsvpsKafkaProducer rsvpsKafkaProducer) {
this.rsvpsKafkaProducer = rsvpsKafkaProducer;
}
@Override
public void handleMessage(WebSocketSession session,WebSocketMessage<?> message) {
logger.log(Level.INFO,"New RSVP:\n {0}",message.getPayload());
System.out.println("handleMessage");
rsvpsKafkaProducer.sendRsvpMessage(message);
}
}
RsvpsKafkaProducer
@Component
@EnableBinding(Source.class)
public class RsvpsKafkaProducer {
private static final int SENDING_MESSAGE_TIMEOUT_MS = 10000;
private final Source source;
public RsvpsKafkaProducer(Source source) {
this.source = source;
}
public void sendRsvpMessage(WebSocketMessage<?> message) {
System.out.println("sendRsvpMessage");
source.output()
.send(MessageBuilder.withPayload(message.getPayload())
.build(),SENDING_MESSAGE_TIMEOUT_MS);
}
}
据我所了解和了解的有关websocket的知识,它需要一次连接,数据流将一直不断流动,直到任何一方(客户端或服务器)停止。
我是第一次构建它,因此尝试涵盖可能在每分钟处理10000条以上消息时出现的主要情况。卡夫卡经纪人总数是两个,有足够的空间。
-
该怎么办,如果连接丢失,一旦重新连接到最后一次失败的位置,再次开始使用webscoket的消息并将消息推送到进一步的Kafka代理中?
-
如果已达到未处理消息的阈值限制(在代理程序中),可以搁置websocket以继续在代理程序中推送消息吗?
-
该怎么办,当代理达到其阈值时,运行一个单独的过程以检查代理中的可用空间以推送更多消息,并给出指示以继续在kafka代理中推送消息?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)