问题描述
我有一个“源”,可以通过网络将项目发送到目标。为了跟踪目标的连接状态,每当没有要发送的项目时,源就应该每TimeSpan
发送一个“ keepalive”信号。
|time>
source x----x---x--x--------------------------x---x---x-x--x
|time>|time>|time>
keepalive -------------------o-----o-----o-----o---------------
When source is silent for 'time',keepalive triggers once every 'time'.
不太困难,但是当TimeSpan
的长度也来自IObservable
来源时,我正在尝试弄清楚它的工作方式。
times t1---------------t2----------------------------------
|t1> |.t2.>
source -----------x--x--x---x-------------------------x----x
|t1>|t1> |.t2.>|.t2.>|.t2.>
keepalive ---o---o--------------------o-----o-----o-----o------
When source is silent for 't1',keepalive triggers once every 't1',when
new time t2 is provided,the timeout and keepalive period is updated.
解决方法
此答案是对您的问题和答案的答复。
您应该尽可能避免使用.Publish()
,因为它会创建一个IConnectableObservable<T>
,这可能会很麻烦。尽可能使用.Publish(inner => ...)
重载,为您提供简单的IObservable<T>
。
在查询的主要部分,Merge
,Throttle
和Never
都是不必要的。
您是正确的,您需要一个双Switch
。
在您的问答中,您没有声明可观察对象的类型-从您的代码中还不清楚。我假设source
是IObservable<long>
,并且最终查询也是IObservable<long>
,其中-1
是在源未发送的每个间隔之后发送的。 >
这是代码:
IObservable<long> source = ...
IObservable<TimeSpan> heartbeatTimes = ...
IObservable<long> query =
source
.Do(x => SendItem(x))
.StartWith(0)
.Publish(pxs =>
heartbeatTimes
.Select(t =>
pxs
.StartWith(0)
.Select(px => Observable.Interval(t).Select(x => -1L).StartWith(px))
.Switch())
.Switch());
这是我的测试代码:
void Main()
{
var random = new Random();
IObservable<long> source = Observable.Generate(0L,x => x < 100L,x => x + 1L,x => x,x => TimeSpan.FromSeconds(random.NextDouble() * 5.0));
var heartbeatTimes = new Subject<TimeSpan>();
IObservable<long> query = ...
IDisposable subscription = query.Subscribe(x => SendHeartbeat());
heartbeatTimes.OnNext(TimeSpan.FromSeconds(1.0));
Thread.Sleep(TimeSpan.FromSeconds(10.0));
heartbeatTimes.OnNext(TimeSpan.FromSeconds(0.1));
}
void SendHeartbeat()
{
Console.Write("!");
}
void SendItem(long item)
{
Console.Write($"{item}.");
}
我的测试似乎可以按照您的要求进行。
heartbeatTimes
应该是可观察的,可以立即产生一个值,否则查询将不会返回任何内容。
好的,我为此感到自豪,因此,如果您打算将其击落,请轻柔地进行:)
var items = source.Publish();
items.Subscribe(item => SendItem(item)); //not interesting,but for completeness' sake
heartbeatTimes.Select(time => //for each incoming heartbeat time
//merge two sources,starts and stops,both derived from time:
Observable.Merge(
//starts
// (need initial value otherwise if no items,interval is never started)
items.StartWith((0,null))
.Throttle(time) // when itemsource is silent for [time]
.Select(_ => Observable.Interval(time) // start interval every [time]
// interval fires at end of [time],so send an initial trigger immediately
.StartWith(0)),//stops
// as soon as a item comes,send Never (to replace Observable.Interval)
items.Select(_ => Observable.Never<long>())
).Switch() //chose latest source from merged stops/starts
)
//switches from one start/stop source,to the next
// (each time heartbeatTime produces a new value)
.Switch()
.Subscribe(_ => SendHeartbeat());
items.Connect();