如何在空闲一段时间后强制观察对象始终发出

问题描述

我有一个应扫描特定ble设备的应用程序。我使用的扫描功能将继续扫描所有设备,甚至应报告重复(同一设备可被扫描多次)。为了组织扫描的数据,我创建了一个可观察的对象,该对象发出列表中找到的所有设备,并删除在一定时间内未扫描的设备:

public findMyDevice() {
    let my_devices: any[] = [];

    return this.ble.startScan().pipe(
        filter(
            (device) => this.deviceIsMyDevice(device)
        ),map(
            (device) => {
                device = this.treatData(device);
                my_devices =  my_devices.filter((dev) => dev.id != device.id); // in case it is already in the list
                let Now = new Date();
                device.time_scan = Now;
                my_devices.push(device);
                my_devices.sort((a,b) => a.sn - b.sn);
                my_devices = this.removeOldDevices(my_devices); // remove scans older than 5 seconds
                return my_devices;
            }
        )
    )
}

如果附近至少有一个设备处于开启状态,则可以正常工作。

问题是:如果所有设备都关闭,则我的扫描功能将不会发出,并且永远不会调用removeOldDevices ...这样,某些旧设备将保留在列表中...

我试图将其添加到管道中以解决该问题:

timeoutWith(3000,of().pipe(
        map(
            () => {my_devices = this.removeOldDevices(my_devices); return my_devices}
        )
    )
)

,但似乎超时后完成订阅解决此问题的最佳方法是什么?在没有完成订阅的情况下,如何强制观察对象在空闲时间后发出?还有其他rxjs运算符可帮助解决这种情况吗?

解决方法

听起来像您想要的是间隔流。

interval(3000),创建一个每3秒发出一个数字的流。

const intervalUpdate$ = interval(3000).pipe(
    map(_ => my_devices = this.removeOldDevices(my_devices))
)

这将每3秒删除一次旧设备。旁注:请记住,如果您从不退订,这将永远有效。然后,您可以(例如)将其与merge一起使用:

const scanUpdate$ = this.ble.startScan().pipe(
    filter(
    [... more code here]
);
return merge(intervalUpdate$,scanUpdate$);
    

如果想变得更复杂,可以使用switchMap在流中混合一个间隔,以便每次发射设备时启动一个新计时器:

// Make a fake object so we can trigger switchMap immediately and get our
// interval/timer stream engaged. There's probably a better way to do this.
const falseStart = {isFakeDevice: true};
// Compose the return stream
return this.ble.startScan().pipe(
    filter(device => this.deviceIsMyDevice(device)),// startWith() as a hack to make sure out interval stream gets 
    // switched into right away
    startWith(falseStart),// switchMap will create a new stream that emits the given value
    // (normally a device,but will be 'falseStart' to start) and then  
    // emits null every 3 seconds. That logic is restarted every time a  
    // new device arrives,effectively resetting the interval stream
    switchMap(device => 
        interval(3000).pipe(
            mapTo(null),startWith(device)
        )
    ),// Filtering out our hacked falseStart object; we don't want it
    filter(device => !device?.isFakeDevice),map(device => {
        // Your code here,remembering that device might be null if it
        // was triggered by the interval stream
        [...]
        return this.removeOldDevices(my_devices);
    })
);

更“复杂”的方法的缺点是传递的第一个值并不是真正可取的。我可能会用'null'代替falseStart,而不用将其过滤掉。订阅此流时,您会立即得到响应,但这通常不会对性能造成很大的影响,甚至可能是理想的。

“复杂”方法的好处是,只要您仍在查找设备,那么interval(3000)可能就永远不需要发出值。根据this.removeOldDevices()的价格,这可能为您节省很多性能。