每当源静音时,如何实现可配置间隔的“ keepalive”信号?

问题描述

我有一个“源”,可以通过网络将项目发送到目标。为了跟踪目标的连接状态,每当没有要发送的项目时,源就应该每TimeSpan发送一个“ keepalive”信号。

                       |time>
source    x----x---x--x--------------------------x---x---x-x--x
                             |time>|time>|time>
keepalive -------------------o-----o-----o-----o---------------

When source is silent for 'time',keepalive triggers once every 'time'.

不太困难,但是当TimeSpan的长度也来自IObservable来源时,我正在尝试弄清楚它的工作方式。

times     t1---------------t2----------------------------------
          |t1>                  |.t2.>
source    -----------x--x--x---x-------------------------x----x
             |t1>|t1>                 |.t2.>|.t2.>|.t2.>
keepalive ---o---o--------------------o-----o-----o-----o------


When source is silent for 't1',keepalive triggers once every 't1',when
new time t2 is provided,the timeout and keepalive period is updated.

解决方法

此答案是对您的问题和答案的答复。

您应该尽可能避免使用.Publish(),因为它会创建一个IConnectableObservable<T>,这可能会很麻烦。尽可能使用.Publish(inner => ...)重载,为您提供简单的IObservable<T>

在查询的主要部分,MergeThrottleNever都是不必要的。

您是正确的,您需要一个双Switch

在您的问答中,您没有声明可观察对象的类型-从您的代码中还不清楚。我假设sourceIObservable<long>,并且最终查询也是IObservable<long>,其中-1是在源未发送的每个间隔之后发送的。 >

这是代码:

IObservable<long> source = ...
IObservable<TimeSpan> heartbeatTimes = ...

IObservable<long> query =
    source
        .Do(x => SendItem(x))
        .StartWith(0)
        .Publish(pxs =>
            heartbeatTimes
                .Select(t =>
                    pxs
                        .StartWith(0)
                        .Select(px => Observable.Interval(t).Select(x => -1L).StartWith(px))
                        .Switch())
                .Switch());

这是我的测试代码:

void Main()
{
    var random = new Random();
    IObservable<long> source = Observable.Generate(0L,x => x < 100L,x => x + 1L,x => x,x => TimeSpan.FromSeconds(random.NextDouble() * 5.0));
    var heartbeatTimes = new Subject<TimeSpan>();
    
    IObservable<long> query = ...
    
    IDisposable subscription = query.Subscribe(x => SendHeartbeat());
    
    heartbeatTimes.OnNext(TimeSpan.FromSeconds(1.0));

    Thread.Sleep(TimeSpan.FromSeconds(10.0));
    heartbeatTimes.OnNext(TimeSpan.FromSeconds(0.1));
}

void SendHeartbeat()
{
    Console.Write("!");
}

void SendItem(long item)
{
    Console.Write($"{item}.");
}

我的测试似乎可以按照您的要求进行。

heartbeatTimes应该是可观察的,可以立即产生一个值,否则查询将不会返回任何内容。

,

好的,我为此感到自豪,因此,如果您打算将其击落,请轻柔地进行:)

var items = source.Publish();
items.Subscribe(item => SendItem(item)); //not interesting,but for completeness' sake

heartbeatTimes.Select(time =>  //for each incoming heartbeat time
    //merge two sources,starts and stops,both derived from time:
    Observable.Merge(               
        //starts
        // (need initial value otherwise if no items,interval is never started)
        items.StartWith((0,null))     
            .Throttle(time)                        // when itemsource is silent for [time]
            .Select(_ => Observable.Interval(time) // start interval every [time]
                  // interval fires at end of [time],so send an initial trigger immediately
                 .StartWith(0)),//stops
        // as soon as a item comes,send Never (to replace Observable.Interval)
        items.Select(_ => Observable.Never<long>()) 
    ).Switch() //chose latest source from merged stops/starts
)
//switches from one start/stop source,to the next 
// (each time heartbeatTime produces a new value)
.Switch()  
.Subscribe(_ => SendHeartbeat());

items.Connect();

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...