问题描述
我正在我的平台中实现 WebSockets 连接。我在前端使用 Angular 10,在 SockJS Client 1.5.0 上使用 StompJS 2.3.3 来建立与后端的连接。
我创建了一个 WebSockets 服务来管理连接,它正在连接并且工作正常。
这是我创建的服务:
export class WebSocketsService implements OnDestroy {
/**
* The authentication service to get the token from.
*/
private authenticationService: AuthenticationService;
private serverUrl = environment['ceNotificationsServer'];
private socket;
private state: BehaviorSubject<SocketClientState>;
public constructor(authenticationService: AuthenticationService) {
this.authenticationService = authenticationService;
const client_id = environment['ceNotificationsClientId'];
const token = this.authenticationService.getToken();
const url = this.serverUrl + '/websocket' + '?clientId=' + client_id + '&Authorization=' + token;
const ws = new SockJS(url);
this.socket = StompJS.over(ws);
this.socket.reconnect_delay = 5000;
this.socket.debug = () => {
};
this.state = new BehaviorSubject<SocketClientState>(SocketClientState.ATTEMPTING);
this.socket.connect({},() => {
this.state.next(SocketClientState.CONNECTED);
console.log('Connection to the socket is open.');
});
}
/**
* Establish the connection to the websocket.
*/
connect(cfg: { reconnect: boolean } = {reconnect: false}): Observable<any> {
return new Observable(observer => {
this.state.pipe(
cfg.reconnect ? this.reconnect : o => o,filter(state => state === SocketClientState.CONNECTED))
.subscribe(() => {
observer.next(this.socket);
});
});
}
message(queue: String): Observable<any> {
return this.connect({reconnect: true})
.pipe(switchMap(client => {
return new Observable<any>(observer => {
client.subscribe(queue,message => {
observer.next(JSON.parse(message.body));
});
});
}),retrywhen((errors) => errors.pipe(delay(5))));
}
reconnect(observable: Observable<any>): Observable<any> {
return observable.pipe(
retrywhen(errors => errors.pipe(
tap(val => console.log('Trying to reconnect to the socket',val)),delayWhen(_ => timer(5000))
))
);
}
/**
* Close the connection to the websocket.
*/
close() {
if (this.socket) {
this.socket.complete();
this.state.next(SocketClientState.CLOSED);
console.log('Connection to the socket is closed');
this.socket = null;
}
}
ngOnDestroy() {
this.close();
}
}
export enum SocketClientState {
ATTEMPTING,CONNECTED,CLOSED
}
在我的 Angular 组件中,我添加了此代码以订阅 WebSockets 队列并获取一些通知以填充我的通知托盘:
const subscription1 = this.websocketsService.message('/messages')
.subscribe(outdatedProfiles => {
const notification: Notification = outdatedProfiles;
notification.message = 'notificationNotSyncedProviders';
this.notifications.addNotification(notification);
});
this.subscriptionsManager.add(subscription1);
我的问题是,当我失去连接(如果 wifi 断开连接)时,它不会再次重新连接。它会捕获错误,但不会捕获连接关闭事件。
public constructor(authenticationService: AuthenticationService) {
this.socket.connect({},() => {...});
this.socket.onclose = function(event) {
console.log("WebSocket is closed Now.");
this.connect({reconnect: true});
};
}
但它不起作用。我已阅读文档,但似乎无法在连接关闭时找到重新连接问题的答案。有什么想法吗?
解决方法
正如@BizzyBob 建议的那样,我认为更好的方法是使用 @stomp/ng2-stompjs,它取决于 rx-stomp 和 stompjs 较新版本,并且可以更轻松地完成任务。
-
安装@stomp/ng2-stompjs
npm i @stomp/ng2-stompjs
-
为 rx-stom 服务创建配置文件 (my-rxstomp.config.ts)
export const myRxStompConfig: InjectableRxStompConfig = { // Which server? brokerURL: 'ws://127.0.0.1:8080/',// Wait in milliseconds before attempting auto reconnect // Set to 0 to disable // Typical value 500 (500 milli seconds) reconnectDelay: 200 };
-
在app.module.ts的provider部分添加RxStomp服务的配置
{ provide: InjectableRxStompConfig,useValue: myRxStompConfig,},{ provide: RxStompService,useFactory: rxStompServiceFactory,deps: [InjectableRxStompConfig],
];
-
在你的组件中注入 RxStomp 服务并订阅连接状态的改变 用于在连接关闭时做某事。 如果您只想在连接关闭时重新连接,则无需订阅 连接状态,配置字段 重新连接延迟:200 将为您重新连接。
constructor(private rxStompService: RxStompService) { } ngOnInit() { this.rxStompService.connectionState$.subscribe(next => { console.log('Connection State',RxStompState[next]); if(next === RxStompState.CLOSED) { // Do something } }); }