使用websocket获取数据流时的主要问题是什么?

问题描述

我是第一次做。使用websocket在哪里读取数据流。

这是我的代码

RsvpApplication

@SpringBootApplication
public class RsvpApplication {

    private static final String MEETUP_RSVPS_ENDPOINT = "ws://stream.myapi.com/2/rsvps";

    public static void main(String[] args) {
        SpringApplication.run(RsvpApplication.class,args);
    }

    @Bean
    public ApplicationRunner initializeConnection(
        RsvpsWebSocketHandler rsvpsWebSocketHandler) {
            return args -> {
                
                System.out.println("initializeConnection");
                
                WebSocketClient rsvpsSocketClient = new StandardWebSocketClient();

                rsvpsSocketClient.doHandshake(
                    rsvpsWebSocketHandler,MEETUP_RSVPS_ENDPOINT);           
            };
        }
}

RsvpsWebSocketHandler

@Component
class RsvpsWebSocketHandler extends AbstractWebSocketHandler {

    private static final Logger logger = 
            Logger.getLogger(RsvpsWebSocketHandler.class.getName());
        
    private final RsvpsKafkaProducer rsvpsKafkaProducer;

    public RsvpsWebSocketHandler(RsvpsKafkaProducer rsvpsKafkaProducer) {    
        this.rsvpsKafkaProducer = rsvpsKafkaProducer;
    }

    @Override
    public void handleMessage(WebSocketSession session,WebSocketMessage<?> message) {
        logger.log(Level.INFO,"New RSVP:\n {0}",message.getPayload());
        
        System.out.println("handleMessage");
        
        rsvpsKafkaProducer.sendRsvpMessage(message);        
    }
}

RsvpsKafkaProducer

@Component
@EnableBinding(Source.class)
public class RsvpsKafkaProducer {
  
    private static final int SENDING_MESSAGE_TIMEOUT_MS = 10000;

    private final Source source;

    public RsvpsKafkaProducer(Source source) {
        this.source = source;
    }

    public void sendRsvpMessage(WebSocketMessage<?> message) {
        
         System.out.println("sendRsvpMessage");
        source.output()
                .send(MessageBuilder.withPayload(message.getPayload())
                        .build(),SENDING_MESSAGE_TIMEOUT_MS);   
    }
}

据我所了解和了解的有关websocket的知识,它需要一次连接,数据流将一直不断流动,直到任何一方(客户端或服务器)停止。

我是第一次构建它,因此尝试涵盖可能在每分钟处理10000条以上消息时出现的主要情况。卡夫卡经纪人总数是两个,有足够的空间。

  1. 该怎么办,如果连接丢失,一旦重新连接到最后一次失败的位置,再次开始使用webscoket的消息并将消息推送到进一步的Kafka代理中?

  2. 如果已达到未处理消息的阈值限制(在代理程序中),可以搁置websocket以继续在代理程序中推送消息吗?

  3. 该怎么办,当代理达到其阈值时,运行一个单独的过程以检查代理中的可用空间以推送更多消息,并给出指示以继续在kafka代理中推送消息?

  4. 分享其他问题,设置此内容时需要考虑这些问题?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)