问题描述
我不需要所有重复的项目,但我想保留更改前的最后一个项目。项目是数字,将被绘制。我需要更改前的最后一项以保持波形的形状,否则最终会变成三角波而不是方波。
解决方法
我的方法类似于上面的@Olivers 评论:“因此,只要当前和最后一个不同,就同时发出”。
celery -A app.celery_tasks worker
我包含了 NullableDistinctWithChange 版本,因为 OP 建议您希望此解决方案适用于数字。
可以使用 private static IObservable<T> NullableDistinctWithChange<T>(IObservable<T> source)
where T : struct
{
return source
.Scan(
(Value: default(T?),Result: Array.Empty<T>()),(last,current) => last.Value switch
{
null => (Value: current,Result: new T[] { current }),T x when x.Equals(current) => (Value: current,T x => (Value: current,Result: new[] { x,current })
})
.SelectMany(tuple => tuple.Result);
}
private static IObservable<T> DistinctWithChange<T>(IObservable<T> source)
where T : class
{
return source
.Scan(
(Value: default(T),current })
})
.SelectMany(tuple => tuple.Result);
}
private static readonly Recorded<Notification<string>>[] Xs = new Recorded<Notification<string>>[]
{
new Recorded<Notification<string>>(TimeSpan.FromSeconds(5).Ticks,Notification.CreateOnNext("A")),new Recorded<Notification<string>>(TimeSpan.FromSeconds(15).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(65).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(95).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(125).Ticks,Notification.CreateOnNext("B")),new Recorded<Notification<string>>(TimeSpan.FromSeconds(155).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(185).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(195).Ticks,Notification.CreateOnNext("C")),new Recorded<Notification<string>>(TimeSpan.FromSeconds(205).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(215).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(225).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(235).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(245).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(250).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(255).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(260).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(265).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(270).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(280).Ticks,Notification.CreateOnCompleted<string>())
};
private static readonly Recorded<Notification<string>>[] Expected = new Recorded<Notification<string>>[]
{
new Recorded<Notification<string>>(TimeSpan.FromSeconds(5).Ticks + ReactiveTest.Subscribed,new Recorded<Notification<string>>(TimeSpan.FromSeconds(125).Ticks + ReactiveTest.Subscribed,new Recorded<Notification<string>>(TimeSpan.FromSeconds(195).Ticks + ReactiveTest.Subscribed,new Recorded<Notification<string>>(TimeSpan.FromSeconds(205).Ticks + ReactiveTest.Subscribed,new Recorded<Notification<string>>(TimeSpan.FromSeconds(245).Ticks + ReactiveTest.Subscribed,new Recorded<Notification<string>>(TimeSpan.FromSeconds(280).Ticks + ReactiveTest.Subscribed,Notification.CreateOnCompleted<string>())
};
[Test]
public void ShouldPerformDistinctWithChange()
{
var scheduler = new TestScheduler();
var xs = scheduler.CreateColdObservable(Xs);
var observed = scheduler.Start(() => DistinctWithChange(xs),TimeSpan.FromSeconds(300).Ticks);
Assert.That(observed.Messages,Is.EqualTo(Expected));
}
类型统一和改进这两个函数,但我不想让答案过于复杂。
此外,防止单项重复(通过在 Scan 元组中引入计数值)和使用 IEqualityComparer 来委派变化的预测是微不足道的。
,这是一个可以满足您要求的查询:
IObservable<string> query =
subject
.Publish(ss =>
Observable
.Concat(
ss.Take(1),ss.
DistinctUntilChanged()
.Publish(dss => dss.Zip(dss.Skip(1),(m,n) => (m,n)))
.SelectMany(z => new [] { z.m,z.n })));
Publish
运算符确保对原始来源只有一个订阅。
通过这个测试:
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("B");
subject.OnNext("B");
subject.OnNext("B");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnCompleted();
我得到这些值:
A
A
B
B
A
,
我找到了满足(相当有限的)要求的解决方案:
var subject = new Subject<string>();
var distinct = subject.DistinctUntilChanged();
var combinedLatesDistinct = Observable.CombineLatest(distinct,subject,Selector).DistinctUntilChanged();
Observable.Merge(distinct,combinedLatesDistinct).Subscribe(i => Console.WriteLine($"Result: {i}"));
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("B");
subject.OnNext("B");
subject.OnNext("B");
subject.OnNext("C");
subject.OnNext("B");
subject.OnNext("B");
subject.OnNext("B");
subject.OnNext("B");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnCompleted();
Console.ReadLine();
结果是:
Result: A
Result: A
Result: B
Result: B
Result: C
Result: C
Result: B
Result: B
Result: A
Result: A
只出现一次的项目将被复制,但这对我来说没问题。