EventHandler到Reactive Extensions,其中事件处理程序是短暂的生活方式

问题描述

我需要桥接我的消息传递框架,以便在事件发生(TimeAggregate包含名为ReadingStream的成员)时,我可以传递到Reactive Extensions中并做一些花哨的事情:

public class TestHandler : ITopicNotificationHandler<TimeAggregate>
{
    public TestHandler(/* singleton variables are injected here */)
    {
    }

    public async Task Handle(TimeAggregate notification,string topic,CancellationToken cancellationToken)
    {
        // notification contains TimeAggregate.Reading (which is a decimal)
        // and TimeAggregate.Stream (which is a string,i.e. Office1,OfficeA etc)
        // I want to perform an average on TimeAggregate.Reading but split by TimeAggregate.Reading
    }
}

我将如何链接两者(事件处理程序和Reactive Extension)并拆分,以便每个TimeAggregate.Stream键分别对每个Reading名称进行平均(可观察到的平均字典)?

终身详细信息

还有一个技术方面,每次调用事件处理程序(TestHandler)都是从头开始构建的,则TestHandler的生存期是短暂的。

我可以通过制作一个静态注册管理器来解决这个问题(将IoC注入到TestHandler构造函数中,或者我们直接将其称为static)。

解决方法

一种方法是注入具有单例生存期的服务,该服务可以在事件处理程序中调用。它可以使用Subject生成可观察值,并公开/利用可观察的运行中平均消息:

public class MonitoringService : IMonitoringService
{
    private Subject<TimeAggregate> _subject;

    // Calculate tuples of (key,average)
    public IObservable<(string,decimal)> Averages => _subject
        // Group by key
        .GroupBy(s => s.Group)
        .SelectMany(g => g
            .Select(g => g.Reading)
            // Collect element count and running total
            .Scan((elements: 0,total: (decimal)0),(agg,v) => (agg.elements + 1,agg.total + v))
            // Calculate running average
            .Select(t => t.total / t.elements)
            // Associate key and average for SelectMany
            .Select(average => (g.Key,average)));

    public MonitoringService()
    {
        _subject = new Subject<TimeAggregate>();
    }
    public void PostNotification(TimeAggregate notification)
    {
        _subject.OnNext(notification);
    }
}

然后可以这样使用:

var monitoringService = new MonitoringService();
var handler = new TestHandler(monitoringService);

var dict = new Dictionary<string,decimal>();
monitoringService.Averages.Subscribe<(string group,decimal average)>(t =>
{
    // Do something with running average. In this case populate a dictionary
    dict[t.group] = t.average;
});

dictionary of averages

注意:如果您想在附加观察者之前开始计算平均值,则可能需要研究可连接对象(Publish等),并且您希望避免使用get仅生成这样的属性,而是在构造函数中分配一次。但是请记住,这种方法只会将更新推送到平均值,因此,如果邮件不是闲聊的话,则平均邮件将被延迟。 例如

public class MonitoringService : IMonitoringService
{
    private Subject<TimeAggregate> _subject;

    // Calculate tuples of (key,decimal)> Averages { get; }

    public MonitoringService()
    {
        _subject = new Subject<TimeAggregate>();
        Averages = _subject
        // Group by key
        .GroupBy(s => s.Group)
        .SelectMany(g => g
            .Select(g => g.Reading)
            // Collect element count and running total
            .Scan((elements: 0,average)))
        .Publish()
        // Connect immediately
        .AutoConnect(0);
    }
    public void PostNotification(TimeAggregate notification)
    {
        _subject.OnNext(notification);
    }
}

完整示例:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        public static async Task Main()
        {
            var monitoringService = new MonitoringService();
            var handler = new TestHandler(monitoringService);

            var dict = new Dictionary<string,decimal>();
            monitoringService.Averages.Subscribe<(string group,decimal average)>(t =>
            {
                // Do something with running average. In this case populate a dictionary
                dict[t.group] = t.average;
            });

            await handler.Handle(new TimeAggregate
            {
                Group = "Test1",Reading = 100
            },"Test",CancellationToken.None);

            await handler.Handle(new TimeAggregate
            {
                Group = "Test1",Reading = 200
            },CancellationToken.None);

            await handler.Handle(new TimeAggregate
            {
                Group = "Test2",Reading = 300
            },CancellationToken.None);
        }  
    }

    public class MonitoringService : IMonitoringService
    {
        private Subject<TimeAggregate> _subject;

        // Calculate tuples of (key,average)
        public IObservable<(string,decimal)> Averages => _subject
            // Group by key
            .GroupBy(s => s.Group)
            .SelectMany(g => g
                .Select(g => g.Reading)
                // Collect element count and running total
                .Scan((elements: 0,agg.total + v))
                // Calculate running average
                .Select(t => t.total / t.elements)
                // Associate key and average for SelectMany
                .Select(average => (g.Key,average)));

        public MonitoringService()
        {
            _subject = new Subject<TimeAggregate>();
        }
        public void PostNotification(TimeAggregate notification)
        {
            _subject.OnNext(notification);
        }
    }

    public class TestHandler : ITopicNotificationHandler<TimeAggregate>
    {
        private readonly IMonitoringService _monitoringService;

        public TestHandler(IMonitoringService monitoringService)
        {
            _monitoringService = monitoringService;
        }

        public Task Handle(TimeAggregate notification,string topic,CancellationToken cancellationToken)
        {
            _monitoringService.PostNotification(notification);
            return Task.CompletedTask;
        }
    }

    public interface IMonitoringService
    {
        void PostNotification(TimeAggregate notification);
        IObservable<(string group,decimal average)> Averages { get; }
    }

    public class TimeAggregate
    {
        public string Group { get; set; }
        public decimal Reading { get; set; }
    }

    public interface ITopicNotificationHandler<T>
    {
    }
}