问题描述
亲爱的rxjs专家,在stackoverflow ...
我一直在使用rxjs为nats.io client protocol实现一个非常基本的流解析器。我不确定我在性能或使用方面是否做对了。我正在使用webSocket主题连接到ws网关进行nat。
我正在将传入流映射为使用正则表达式准备操作名称和参数。 然后我订阅此流,将消息推送到另一个名为protocoll $$的主题。
protocoll $$主题实现了基本功能过滤和订阅,以使PING消息以PONG响应以保持活动状态。
然后创建另一个名为messages $$的主题,该主题会过滤用于“ MSG”操作的协议$$主题,然后对其进行转换。
在应用程序中,用户可以进一步订阅主题(这反过来又创建了一个新的Obserable,订阅了消息$$主题并过滤了subscrptionId)。
我的问题是。我做对了吗,我可以改进吗?
import { Observable,Subject } from "rxjs"
import { filter,map } from 'rxjs/operators'
import { webSocket,WebSocketSubject } from "rxjs/webSocket"
interface NatsOptions {
url: string
options: Connectionoptions
}
interface Connectionoptions {
verbose: boolean // false
pedantic: boolean // false
lang: string
version: string
}
export interface Protocol {
operation: string,args?: string
}
export interface Msg<T> {
sid: number
subject: string
replyTo: string
size: number
// Todo: Move encoded and modularize
payload: T
}
const msgPattern = /^(?<subject>[^\s]+)\s+(?<sid>[^\s]+)\s+(?<reply>([^\s\r\n]+)[^\S\r\n]+)?(?<length>\d+)\r\n(?<payload>\S+)\r\n/gi
const protocolPattern = /^([^\s]+)(\s+([\S\s]*))?/gi
@Injectable({
providedIn: "root",})
export class NatsService {
private nats$: WebSocketSubject<any>
private subscriptions: Array<string> = [];
public protocol$$: Subject<Protocol>;
public messages$$: Subject<Msg<any>>;
constructor(@Inject("natsOptions") private options: NatsOptions) {
this.nats$ = webSocket({ url: options.url,serializer: (m) => m,deserializer: (m) => m })
this.connect({
pedantic: false,verbose: false,...options.options,lang: "typescript",version: "0.0.1"
})
}
private async connect(options: Connectionoptions) {
this.protocol$$ = new Subject<Protocol>()
this.messages$$ = new Subject<Msg<any>>();
this.nats$.pipe(
map((event: MessageEvent) => {
const matches: Array<Array<any>> = Array.from(event.data.matchAll(protocolPattern))
if (matches.length > 0) {
const groups: Array<any> = matches[0]
const operation = groups[1]
let args
if (groups.length) {
args = groups[3]
}
return { operation,args }
}
return { operation: event.data }
})
).subscribe(protocol => {
this.protocol$$.next(protocol);
})
this.protocol$$.pipe(
filter((x: Protocol) => x.operation == 'PING')
).subscribe(x => this.send('PONG'))
this.protocol$$.pipe(
filter((p) => {
return p.operation == 'MSG'
})
).subscribe((p) => {
const matches: Array<Array<any>> = Array.from(p.args?.matchAll(msgPattern))
if (matches.length > 0) {
const groups = matches[0]
let msg: Msg<any> = {
subject: groups[1],sid: Number(groups[2]),replyTo: groups[3],size: Number(groups[5]),// Todo: Make encoder configurable and modularize
payload: JSON.parse(groups[6])
}
this.messages$$.next(msg);
}
})
this.nats$.next(`CONNECT ${JSON.stringify(options)}\r\n`)
}
private send(command: string) {
this.nats$.next(`${command}\r\n`)
}
public from<T>(subject: string,group?: string) {
// check wether subscription already exist. If so,just reuse
const sid = this.subscriptions.push(subject);
let subject$ = new Observable<Msg<T>>((observer) => {
this.messages$$.pipe(
filter(msg => msg.sid == sid),).subscribe(msg => observer.next(msg))
return () => {
this.send(`UNSUB ${sid}`)
}
})
//We don't support groups yet
this.send(`SUB ${subject} ${group} ${sid}`)
return subject$
}
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)