DistinctUntilChanged,但保留更改前的最后一项

问题描述

我想减少进入日志文件的项目数量

enter image description here

我不需要所有重复的项目,但我想保留更改前的最后一个项目。项目是数字,将被绘制。我需要更改前的最后一项以保持波形的形状,否则最终会变成三角波而不是方波。

解决方法

我的方法类似于上面的@Olivers 评论:“因此,只要当前和最后一个不同,就同时发出”。

celery -A app.celery_tasks worker

我包含了 NullableDistinctWithChange 版本,因为 OP 建议您希望此解决方案适用于数字。

可以使用 private static IObservable<T> NullableDistinctWithChange<T>(IObservable<T> source) where T : struct { return source .Scan( (Value: default(T?),Result: Array.Empty<T>()),(last,current) => last.Value switch { null => (Value: current,Result: new T[] { current }),T x when x.Equals(current) => (Value: current,T x => (Value: current,Result: new[] { x,current }) }) .SelectMany(tuple => tuple.Result); } private static IObservable<T> DistinctWithChange<T>(IObservable<T> source) where T : class { return source .Scan( (Value: default(T),current }) }) .SelectMany(tuple => tuple.Result); } private static readonly Recorded<Notification<string>>[] Xs = new Recorded<Notification<string>>[] { new Recorded<Notification<string>>(TimeSpan.FromSeconds(5).Ticks,Notification.CreateOnNext("A")),new Recorded<Notification<string>>(TimeSpan.FromSeconds(15).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(65).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(95).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(125).Ticks,Notification.CreateOnNext("B")),new Recorded<Notification<string>>(TimeSpan.FromSeconds(155).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(185).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(195).Ticks,Notification.CreateOnNext("C")),new Recorded<Notification<string>>(TimeSpan.FromSeconds(205).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(215).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(225).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(235).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(245).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(250).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(255).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(260).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(265).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(270).Ticks,new Recorded<Notification<string>>(TimeSpan.FromSeconds(280).Ticks,Notification.CreateOnCompleted<string>()) }; private static readonly Recorded<Notification<string>>[] Expected = new Recorded<Notification<string>>[] { new Recorded<Notification<string>>(TimeSpan.FromSeconds(5).Ticks + ReactiveTest.Subscribed,new Recorded<Notification<string>>(TimeSpan.FromSeconds(125).Ticks + ReactiveTest.Subscribed,new Recorded<Notification<string>>(TimeSpan.FromSeconds(195).Ticks + ReactiveTest.Subscribed,new Recorded<Notification<string>>(TimeSpan.FromSeconds(205).Ticks + ReactiveTest.Subscribed,new Recorded<Notification<string>>(TimeSpan.FromSeconds(245).Ticks + ReactiveTest.Subscribed,new Recorded<Notification<string>>(TimeSpan.FromSeconds(280).Ticks + ReactiveTest.Subscribed,Notification.CreateOnCompleted<string>()) }; [Test] public void ShouldPerformDistinctWithChange() { var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable(Xs); var observed = scheduler.Start(() => DistinctWithChange(xs),TimeSpan.FromSeconds(300).Ticks); Assert.That(observed.Messages,Is.EqualTo(Expected)); } 类型统一和改进这两个函数,但我不想让答案过于复杂。

此外,防止单项重复(通过在 Scan 元组中引入计数值)和使用 IEqualityComparer 来委派变化的预测是微不足道的。

,

这是一个可以满足您要求的查询:

IObservable<string> query =
    subject
        .Publish(ss =>
            Observable
                .Concat(
                    ss.Take(1),ss.
                        DistinctUntilChanged()
                            .Publish(dss => dss.Zip(dss.Skip(1),(m,n) => (m,n)))
                            .SelectMany(z => new [] { z.m,z.n })));

Publish 运算符确保对原始来源只有一个订阅。

通过这个测试:

subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("B");
subject.OnNext("B");
subject.OnNext("B");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnCompleted();

我得到这些值:

A 
A 
B 
B 
A 
,

我找到了满足(相当有限的)要求的解决方案:

    var subject = new Subject<string>();
    var distinct = subject.DistinctUntilChanged();

    var combinedLatesDistinct = Observable.CombineLatest(distinct,subject,Selector).DistinctUntilChanged();

    Observable.Merge(distinct,combinedLatesDistinct).Subscribe(i => Console.WriteLine($"Result: {i}"));

    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnNext("B");
    subject.OnNext("B");
    subject.OnNext("B");
    subject.OnNext("C");
    subject.OnNext("B");
    subject.OnNext("B");
    subject.OnNext("B");
    subject.OnNext("B");
    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnCompleted();

    Console.ReadLine();

结果是:

Result: A
Result: A
Result: B
Result: B
Result: C
Result: C
Result: B
Result: B
Result: A
Result: A

只出现一次的项目将被复制,但这对我来说没问题。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...