条件延迟+油门运算符

问题描述

我正在编写一个自定义 RX 运算符,它结合了 Throttle 和 Delay 的功能,具有以下签名

public static IObservable<T> DelayWhen(this IObservable<T> self,TimeSpan delay,Func<T,bool> condition);

规则如下:

  1. 如果 condition(t) 返回 false,则立即发出。
  2. 如果 condition(t) 返回 true,则延迟 delay 时间。
  3. 如果 self 在延迟期间发出值,则执行以下操作:
    1. 如果 condition(t) 返回 false,取消/跳过计划延迟发射的值并发射新值
    2. 如果 condition(t) 返回 true,则跳过/忽略此新值(即,如果 self 在延迟期间不再发出任何值,则延迟值将发出)。

从规则中可以看出,有些行为让人联想到这里正在发生的节流。

解决此问题的各种尝试包括一些变得复杂的 async 方法。我真的觉得这应该可以使用现有的运算符来解决。例如。参见 https://stackoverflow.com/a/16290788/2149075,它非常巧妙地使用了 Amb,我觉得它非常接近我想要实现的目标。

解决方法

问题不完全清楚,因此使用以下测试用例作为场景:

Observable.Interval(TimeSpan.FromSeconds(1))
    .Take(10)
    .DelayWhen(TimeSpan.FromSeconds(1.5),i => i % 3 == 0 || i % 2 == 0)

这应该导致以下结果:

//        T: ---1---2---3---4---5---6---7---8---9---0---1----
// original: ---0---1---2---3---4---5---6---7---8---9
//   delay?: ---Y---N---Y---Y---Y---N---Y---N---Y---Y
// expected: -------1---------2-----5-------7-------------8
//
// 0: Delayed,but interrupted by 1,// 1: Non-delayed,emit immediately
// 2: Delayed,emit after 1.5 seconds
// 3: Delayed,since emitted during a delay,ignored
// 4: Delayed,but interrupted by 5.
// 5: Non-delayed,emit immediately
// 6: Delayed,but interrupted by 7.
// 7: Non-delayed,emit immediately
// 8: Delayed,but interrupted by 9
// 9: Delayed,ignored

如果这不符合要求,请澄清问题。 @Theodore 的解决方案获得了正确的时机,但会发出 3 和 9,忽略“取消/跳过计划延迟发出的值并发出新值”子句。

这在功能上等同于 Theodore 的代码,但 (IMO) 更易于使用和理解:

public static IObservable<T> DelayWhen2<T>(this IObservable<T> source,TimeSpan delay,Func<T,bool> condition,IScheduler scheduler)
{
    return source
        .Select(x => (Item: x,WithDelay: condition(x)))
        .Publish(published => published
            .SelectMany(t => t.WithDelay 
                ? Observable.Return(t)
                    .Delay(delay,scheduler)
                    .TakeUntil(published.Where(t2 => !t2.WithDelay))
                : Observable.Return(t)
            )
        )
        .Select(e => e.Item);
}

从那里开始,我必须嵌入您是否延迟 .Scan 的状态:

public static IObservable<T> DelayWhen3<T>(this IObservable<T> source,bool> condition)
{
    return DelayWhen3(source,delay,condition,Scheduler.Default);
}

public static IObservable<T> DelayWhen3<T>(this IObservable<T> source,WithDelay: condition(x)))
        .Publish(published => published
            .Timestamp(scheduler)
            .Scan((delayOverTime: DateTimeOffset.MinValue,output: Observable.Empty<T>()),(state,t) => {
                if(!t.Value.WithDelay)  
                    //value isn't delayed,current delay status irrelevant,emit immediately,and cancel previous delay.
                    return (DateTimeOffset.MinValue,Observable.Return(t.Value.Item));
                else
                    if (state.delayOverTime > t.Timestamp)
                        //value should be delayed,but current delay already in progress. Ignore value.
                        return (state.delayOverTime,Observable.Empty<T>());
                    else
                        //value should be delayed,no delay in progress. Set delay state,and return delayed observable.
                        return (t.Timestamp + delay,Observable.Return(t.Value.Item).Delay(delay,scheduler).TakeUntil(published.Where(t2 => !t2.WithDelay)));
            })
        )
        .SelectMany(t => t.output);
}

.Scan 运算符中,您嵌入了前一个 Delay 到期的时间。这样你就知道可以处理一个应该在现有延迟内延迟的值。我向时间敏感函数添加了 scheduler 参数以启用测试:

var ts = new TestScheduler();

var target = Observable.Interval(TimeSpan.FromSeconds(1),ts)
    .Take(10)
    .DelayWhen3(TimeSpan.FromSeconds(1.5),i => i % 3 == 0 || i % 2 == 0,ts);

var observer = ts.CreateObserver<long>();
target.Subscribe(observer);
ts.Start();

var expected = new List<Recorded<Notification<long>>> {
    new Recorded<Notification<long>>(2000.MsTicks(),Notification.CreateOnNext<long>(1)),new Recorded<Notification<long>>(4500.MsTicks(),Notification.CreateOnNext<long>(2)),new Recorded<Notification<long>>(6000.MsTicks(),Notification.CreateOnNext<long>(5)),new Recorded<Notification<long>>(8000.MsTicks(),Notification.CreateOnNext<long>(7)),new Recorded<Notification<long>>(10500.MsTicks(),Notification.CreateOnNext<long>(8)),new Recorded<Notification<long>>(10500.MsTicks() + 1,Notification.CreateOnCompleted<long>()),};

ReactiveAssert.AreElementsEqual(expected,observer.Messages);

和 MsTicks 的代码:

public static long MsTicks(this int i)
{
    return TimeSpan.FromMilliseconds(i).Ticks;
}
,

以下是 class MyPage extends GetView<MyPageController> { @override Widget build(BuildContext context) { return Scaffold( appBar: AppBar( title: Text('My Title'),),body: /// this is the part where I'm lost on what to put SafeArea( child: IndexedStack( index: controller.curTabIdx.value,children: [ controller.mainContent.value,],) ),bottomNavigationBar: BottomNavigationBar( onTap: (value) => controller.onNavTap(value),currentIndex: controller.curTabIdx.value,items: [ BottomNavigationBarItem(label: 'Page 1'),BottomNavigationBarItem(label: 'Page 2'),BottomNavigationBarItem(label: 'Page 3'),); } } 运算符的实现,它基于内置的 Window 运算符:


更新:我的原始实现 (Revision 1) 不满足问题的要求,因此我通过用定制的延迟替换 DelayWhen 运算符来更改它/ 节流运算符。

Delay

源序列被划分为连续的窗口(子序列),每个窗口以/// <summary> /// Either delays the emission of the elements that satisfy the condition,by the /// specified time duration,or ignores them,in case they are produced before /// the emission of previously delayed element. Elements that don't satisfy the /// condition are emitted immediately,and they also cancel any pending emission of /// all previously delayed elements. /// </summary> public static IObservable<T> DelayWhen<T>(this IObservable<T> source,IScheduler scheduler = null) { // Arguments validation omitted scheduler ??= DefaultScheduler.Instance; return source .Select(x => (Item: x,WithDelay: condition(x))) .Publish(published => published.Window(published.Where(e => !e.WithDelay))) .Select(w => Observable.Merge( DelayThrottleSpecial(w.Where(e => e.WithDelay),scheduler),w.Where(e => !e.WithDelay) )) .Switch() .Select(e => e.Item); /// <summary> /// Time shifts the observable sequence by the specified time duration,ignoring /// elements that are produced while a previous element is scheduled for emission. /// </summary> static IObservable<T2> DelayThrottleSpecial<T2>(IObservable<T2> source,TimeSpan dueTime,IScheduler scheduler) { return Observable.Using(() => new SemaphoreSlim(1,1),semaphore => source .SelectMany(x => Observable.If(() => semaphore.Wait(0),Observable.Return(x).DelaySubscription(dueTime,scheduler) .Finally(() => semaphore.Release())))); } } (非延迟)元素结尾。然后将每个窗口投影到一个新窗口,该窗口的 false(延迟)元素根据要求延迟/节流。最后,使用 Switch 运算符将投影的窗口合并回单个序列,以便每次发出新窗口时都会丢弃窗口的所有待处理元素。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...