问题描述
根据反引号的建议进行更新:
this.ble.connect(macAddr)
.pipe(
tap(outer => console.log(`outer observable`)),switchMap(() =>
this.ble.startNotification(macAddr,ENV.CUSTOM_SERVICE,ENV.VALUE_CHaraCTERISTIC)
.pipe(
tap(inner => console.log(`inner observable`)),timeout(3000) // <- no further messages
)
),// timeout(3000),// <- errors after 18 seconds
retry(5)
)
.subscribe(
(data) => console.log(`incoming buffer: ${new Uint8Array(data).join(':')}`),(error) => console.log(`outer observable ${error}`)
);
在内部 observable 超时后,消息就会停止,没有进一步的日志。
10:03:45.269 outer observable
10:03:46.058 inner observable
10:03:46.059 incoming buffer: 8:0:138:255:0:0:0:0
在外管道超时的情况下,它会在 18 秒后击中主订阅的错误块。这将对应于 3 秒间隔加 1 的 5 次重试。这表明它正在重试内部 observable 但没有记录该管道中的水龙头。
09:58:08.426 inner observable
09:58:08.426 incoming buffer: 48:0:138:255:0:0:0:0
09:58:26.516 outer observable handling final TimeoutError: Timeout has occurred
期望的行为是它在出现错误时重试连接 observable 并重新订阅通知特性。
注意:异步是存在的,因为我必须使用承诺在初始连接之后和通知订阅之前在设备上设置模式。为简单起见省略。
await this.ble.write(macAddress,ENV.MODE_CHaraCTERISTIC,mode);
我一直在换入和换出 retrywhen/switchMap/mergeMap/concatMaps 的变体,这是我能找到的可行解决方案。
this.ble.connect(macAddress)
.pipe(
retry(5),switchMap(async (value,index) => {
console.log(`in higher order mapping ${index}`);
return this.ble.startNotification(macAddress,ENV.VALUE_CHaraCTERISTIC);
.pipe(
timeout(BLE_NOTIFICATION_TIMEOUT),).subscribe(
result =>
console.log(`incoming buffer: ${new Uint8Array(result).join(':')}`),error => {
console.log(`listening for notifications`,error);
return throwError(error);
}
);
})
)
.subscribe(
data => console.log(`'next' block of outer observable`,data),error => console.log(`outer observable handling final ${error}`)
)
当应用程序连接到 BLE 设备时,它会订阅具有 Notify 属性的特性。错误可能发生在连接或通知可观察对象中。在第一种情况下,它足够干净,重新建立连接并订阅通知。在后一种情况下,除非连接中断,否则错误不会出现在外部 observable 中,并且不会发生重试。我不确定我应该以何种方式组合这两个 observable,但如果其中任何一个出现错误,我想重试连接,并重新启动通知。
解决方法
有几个问题对您不利:
首先,对 subscribe
的内部调用(在 switchMap
内)是非惯用的,并且会表现得很奇怪。
作为一个概念,运算符的存在是为了将 subscribe
抽象为一种转换/控制 observable 的机制。因此,在操作符的回调中直接订阅 observable 与操作符的概念背道而驰,操作符是接受一个 observable 并返回一个新的 observable 的函数。您只需在此处删除对 subscribe 的调用 - 当您在整个构造中调用 subscribe 时,将自动处理内部 observables 的订阅。
第二,retry
运算符的放置意味着重试将仅根据连接可观察对象抛出的错误启动。如果您还想捕获通知错误,则需要将其放在 switchMap
之后。
第三,您的 switchMap
回调被声明为 async
,即使没有 await
关键字,它也会将回调的返回值包装在一个承诺。如果回调的返回值已经是 observable(或 observable-like),这可能不是您想要做的。
最后(虽然这可能是在您粘贴示例时无意中引入的),在 .pipe()
回调中的 switchMap
调用之前有一个分号,这会导致语法错误。
这很可能是您想要的结构。
this.ble
.connect(macAddress)
.pipe(
switchMap(() =>
this.ble
.startNotification(
macAddress,ENV.CUSTOM_SERVICE,ENV.VALUE_CHARACTERISTIC
)
.pipe(timeout(BLE_NOTIFICATION_TIMEOUT))
),retry(5) // Capture failures of connection *or* notification timeout
)
.subscribe(
(data) => console.log(`'next' block of outer observable`,data),(error) => console.log(`outer observable handling final ${error}`)
);