BehaviorSubject 元素的顺序正在反转预期的输出

问题描述

我有以下代码创建事件,这些事件在下面解释的情况下,发射回到前面:

var Feed = new Subject<double>();
var levels = new BehaviorSubject<double[]>(new[] { 400.0,500.0,600.0,700.0 });

levels
    .Select(thresholds => Feed
        .Buffer(2,1)
        .Where(x => x.Count == 2)
        .Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0],x[1]),PrevIoUs = x[0],Current = x[1] })
        .Where(x => x.LevelsCrossed.Any())
        .SelectMany(x => x.LevelsCrossed.Select(level => new ThresholdCrossedEvent(level,x.PrevIoUs,x.Current))))
    .Switch()
    .distinctUntilChanged(x => x.Threshold)
    .Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));

Feed.OnNext(520.0);
Feed.OnNext(400.0);
Feed.OnNext(450.0);
Feed.OnNext(650.0);  

交叉检测扩展如下:

public static class ThresholdExtensions
{
    public static IEnumerable<double> GetCrossovers(this double[] self,double prevIoUs,double current)
    {
        return self
            .Where(level => level >= prevIoUs && level <= current || level <= prevIoUs && level >= current);
    }
}

前两个发射的物体是有序的,第二个发射的物体是向后的

// the order of first 2 emitted objects should be reversed
{"Threshold":400.0,"PrevIoUs":520.0,"Current":400.0,"SlopeDirection":-1}
{"Threshold":500.0,"SlopeDirection":-1}

// the below ordered is OK
{"Threshold":400.0,"PrevIoUs":400.0,"Current":450.0,"SlopeDirection":1}
{"Threshold":500.0,"PrevIoUs":450.0,"Current":650.0,"SlopeDirection":1}
{"Threshold":600.0,"SlopeDirection":1}

前 2 个元素的反转输出与级别 BehaviorSubject 的顺序有关 - 但是反转这个顺序会导致最后 3 个元素被反转。

如何修改 DetectThresholds 以在传入的提要穿过对象时以正确的顺序发出对象?

以上工作示例如下:

using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace ConsoleApp1
{
    public class ThresholdCrossedEvent
    {
        public ThresholdCrossedEvent(double level,double current)
        {

            Threshold = level;
            PrevIoUs = prevIoUs;
            Current = current;
        }

        public double Threshold { get; set; }
        public double PrevIoUs { get; set; }
        public double Current { get; set; }
        public int SlopeDirection => Current >= PrevIoUs ? 1 : -1;
    }

    public static class ThresholdExtensions
    {
        public static IEnumerable<double> GetCrossovers(this double[] self,double current)
        {
            return self
                .Where(level => level >= prevIoUs && level <= current || level <= prevIoUs && level >= current);
        }
    }

    class Program
    {

        static void Main(string[] args)
        {
            var Feed = new Subject<double>();
            var levels = new BehaviorSubject<double[]>(new[] { 400.0,700.0 });

            levels
                .Select(thresholds => Feed
                    .Buffer(2,1)
                    .Where(x => x.Count == 2)
                    .Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0],Current = x[1] })
                    .Where(x => x.LevelsCrossed.Any())
                    .SelectMany(x => x.LevelsCrossed.Select(level => new ThresholdCrossedEvent(level,x.Current))))
                .Switch()
                .distinctUntilChanged(x => x.Threshold)
                .Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));

            Feed.OnNext(520.0);
            Feed.OnNext(400.0);
            Feed.OnNext(450.0);
            Feed.OnNext(650.0);            

            Console.ReadKey();
        }
    }
}

解决方法

需要订购带有以下内容的交叉LevelsCrossed = thresholds.GetCrossovers(x[0],x[1]).OrderByWithDirection(x => x,x[0] > x[1])

static class OrderByExtensions
{
    public static IOrderedEnumerable<TSource> OrderByWithDirection<TSource,TKey>
           (this IEnumerable<TSource> source,Func<TSource,TKey> keySelector,bool descending)
    {
        return descending ? source.OrderByDescending(keySelector)
                          : source.OrderBy(keySelector);
    }

    public static IOrderedQueryable<TSource> OrderByWithDirection<TSource,TKey>
        (this IQueryable<TSource> source,Expression<Func<TSource,TKey>> keySelector,bool descending)
    {
        return descending ? source.OrderByDescending(keySelector)
                          : source.OrderBy(keySelector);
    }
}

levels
    .Select(thresholds => feed
        .Buffer(2,1)
        .Where(x => x.Count == 2)
        .Select(x => new {
                       LevelsCrossed = thresholds.GetCrossovers(x[0],x[0] > x[1]),Previous = x[0],Current = x[1] })                    
        .Where(x => x.LevelsCrossed.Any())
        .SelectMany(x => x.LevelsCrossed.Select(level => new ThresholdCrossedEvent(level,x.Previous,x.Current))))
     .Switch()
     .DistinctUntilChanged(x => x.Threshold)
     .Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));

使用 { 400.0,500.0,600.0,700.0 } 的级别,输出现在是正确的:

{"Threshold":500.0,"Previous":520.0,"Current":400.0,"SlopeDirection":-1}
{"Threshold":400.0,"SlopeDirection":-1}
{"Threshold":500.0,"Previous":450.0,"Current":750.0,"SlopeDirection":1}
{"Threshold":600.0,"SlopeDirection":1}
{"Threshold":700.0,"SlopeDirection":1}

和下面的例子:

using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace ConsoleApp1
{
    public class ThresholdCrossedEvent
    {
        public ThresholdCrossedEvent(double level,double previous,double current)
        {

            Threshold = level;
            Previous = previous;
            Current = current;
        }

        public double Threshold { get; set; }
        public double Previous { get; set; }
        public double Current { get; set; }
        public int SlopeDirection => Current >= Previous ? 1 : -1;
    }

    public static class ThresholdExtensions
    {
        public static IEnumerable<double> GetCrossovers(this double[] self,double current)
        {
            return self
                .Where(level => level >= previous && level <= current || level <= previous && level >= current);
        }
    }

    static class OrderByExtensions
    {
        public static IOrderedEnumerable<TSource> OrderByWithDirection<TSource,TKey>
               (this IEnumerable<TSource> source,bool descending)
        {
            return descending ? source.OrderByDescending(keySelector)
                              : source.OrderBy(keySelector);
        }

        public static IOrderedQueryable<TSource> OrderByWithDirection<TSource,TKey>
            (this IQueryable<TSource> source,bool descending)
        {
            return descending ? source.OrderByDescending(keySelector)
                              : source.OrderBy(keySelector);
        }
    }

    class Program
    {
   
        static void Main(string[] args)
        {
            var feed = new Subject<double>();
            var levels = new BehaviorSubject<double[]>(new[] { 400.0,700.0 });

            levels
                .Select(thresholds => feed
                    .Buffer(2,1)
                    .Where(x => x.Count == 2)
                    .Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0],Current = x[1] })                    
                    .Where(x => x.LevelsCrossed.Any())
                    .SelectMany(x => x.LevelsCrossed.Select(level => new ThresholdCrossedEvent(level,x.Current))))
                .Switch()
                .DistinctUntilChanged(x => x.Threshold)
                .Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));

            feed.OnNext(520.0);
            feed.OnNext(400.0);
            feed.OnNext(450.0);
            feed.OnNext(750.0);            

            Console.ReadKey();
        }
    }
}

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...