如何根据属性压缩 2 个序列zip、join

问题描述

我想根据类似于在使用枚举时连接它们的共同属性来压缩 2 个序列的项目。我如何才能通过第二次测试?

using NUnit.Framework;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Threading.Tasks;

public class SequenceTests
{
    private class Entry
    {
        public Entry(DateTime timestamp,string value)
        {
            Timestamp = timestamp;
            Value = value;
        }

        public DateTime Timestamp { get; }

        public string Value { get; }
    }

    private readonly IEnumerable<Entry> Tasks = new List<Entry>
    {
        new Entry(new DateTime(2021,6,6),"Do homework"),new Entry(new DateTime(2021,7),"Buy groceries"),// <-- This date is also in the People collection!
        new Entry(new DateTime(2021,8),"Walk the dog"),};

    private readonly IEnumerable<Entry> People = new List<Entry>
    {
        new Entry(new DateTime(2021,4),"Peter"),5),"Jane"),"Paul"),// <-- This date is also in the Tasks collection!
        new Entry(new DateTime(2021,9),"Mary"),};

    private class Assignment
    {
        public string Task { get; set; }

        public string Person { get; set; }
    }

    [Test]
    public void Join_two_collections_should_succeed()
    {
        var assignments = Tasks
            .Join(People,task => task.Timestamp,person => person.Timestamp,(task,person) => new Assignment { Task = task.Value,Person = person.Value });

        Assert.AreEqual(1,assignments.Count());
        Assert.AreEqual("Buy groceries",assignments.First().Task);
        Assert.AreEqual("Paul",assignments.First().Person);
    }

    [Test]
    public async Task Zip_two_sequences_should_succeed()
    {
        var tasks = Observable.ToObservable(Tasks);
        var people = Observable.ToObservable(People);

        var sequence = tasks
            .Zip(people)
            .Select(pair => new Assignment { Task = pair.First.Value,Person = pair.Second.Value });

        var assignments = await sequence.ToList();

        Assert.AreEqual(1,assignments.Count);
        Assert.AreEqual("Buy groceries",assignments.First().Person);
    }
}

解决方法

这里有一个自定义的 Join 运算符,可以用来解决这个问题。它基于 MergeGroupByUntilSelectMany 运算符:

/// <summary>
/// Correlates the elements of two sequences based on matching keys. Results are
/// produced for all combinations of correlated elements that have an overlapping
/// duration.
/// </summary>
public static IObservable<TResult> Join<TLeft,TRight,TKey,TResult>(
    this IObservable<TLeft> left,IObservable<TRight> right,Func<TLeft,TKey> leftKeySelector,Func<TRight,TKey> rightKeySelector,TResult> resultSelector,TimeSpan? keyDuration = null,IEqualityComparer<TKey> keyComparer = null)
{
    // Arguments validation omitted
    keyComparer ??= EqualityComparer<TKey>.Default;
    IObservable<long> GetGroupDuration() => keyDuration.HasValue ?
        Observable.Timer(keyDuration.Value) : Observable.Never<long>();
    return left
        .Select(x => (x,(TRight)default,Type: 1,Key: leftKeySelector(x)))
        .Merge(right.Select(x => ((TLeft)default,x,Type: 2,Key: rightKeySelector(x))))
        .GroupByUntil(e => e.Key,_ => GetGroupDuration(),keyComparer)
        .Select(g => (
            g.Where(e => e.Type == 1).Select(e => e.Item1),g.Where(e => e.Type == 2).Select(e => e.Item2).Replay().AutoConnect(0)
        ))
        .SelectMany(g => g.Item1.SelectMany(_ => g.Item2,resultSelector));
}

用法示例:

IObservable<Assignment> sequence = tasks
    .Join(people,t => t.Timestamp,p => p.Timestamp,(t,p) => new Assignment { Task = t.Value,Person = p.Value });

应该注意,如果不缓冲两个源序列之一产生的所有元素,就无法保证 100% 正确性地解决此问题。在上述实现中,缓冲由 ReplayAutoConnect 运算符在 right 序列上执行。显然,如果缓冲序列包含无限元素,这将无法很好地扩展。

如果为了可扩展性而牺牲绝对正确性是可以接受的,可选的 keyDuration 参数可用于配置存储的密​​钥(及其关联元素)可以在内存中保留的最大持续时间。如果 leftright 序列生成具有此键的新元素,则可能会重新生成已过期的键。

上述实现对于包含大量元素的序列表现相当好。在我的电脑中加入两个相同大小的序列,每个序列有 100,000 个元素,大约需要 8 秒。

,

我不喜欢任何已发布的答案。它们都是同一主题的变体:将两个序列的所有成员无限期地保留在内存中,并在新的左元素进入时遍历整个右序列,并在新的右元素进入时增量检查左键。 两个答案您无限期地O(L + R)记忆并且是O(R * L)时间复杂度(其中L和R是左右序列的大小)。

如果我们正在处理集合(或可枚举),那将是一个足够的答案。但我们不是:我们正在处理 observables,答案应该承认这一点。实际用例之间可能存在很大的时间间隔。这个问题是作为一个可枚举的测试用例提出的。如果它只是一个可枚举,正确的答案是转换回可枚举并使用 Linq 的 Join。如果可能有一个长时间运行且有时间间隔的进程,那么答案应该承认您可能只想加入在某个时间段内发生的元素,从而释放进程中的内存。

这满足测试答案,同时允许一个时间框:

var sequence = tasks.Join(people,_ => Observable.Timer(TimeSpan.FromSeconds(.5)),p) => (task: t,person: p)
    )
    .Where(t => t.person.Timestamp == t.task.Timestamp)
    .Select(t => new Assignment { Task = t.task.Value,Person = t.person.Value });

这会为每个元素创建一个 0.5 秒的窗口,这意味着如果左元素和右元素在 0.5 秒内弹出,它们将匹配。 0.5 秒后,每个元素从内存中释放。如果,无论出于何种原因,您不想从内存中释放并无限期地将所有对象保留在内存中,这就足够了:

var sequence = tasks.Join(people,_ => Observable.Never<Unit>(),Person = t.person.Value });
,

可观察的 bugfix/1601 运算符的工作方式与可枚举版本相同。你在第一次测试中没有使用它,所以它不像你在这里需要的运算符。

您需要的只是 Zip 运算符。

试试这个查询:

SelectMany

这适用于您的测试。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...