问题描述
我需要桥接我的消息传递框架,以便在事件发生(TimeAggregate包含名为Reading
和Stream
的成员)时,我可以传递到Reactive Extensions中并做一些花哨的事情:
public class TestHandler : ITopicNotificationHandler<TimeAggregate>
{
public TestHandler(/* singleton variables are injected here */)
{
}
public async Task Handle(TimeAggregate notification,string topic,CancellationToken cancellationToken)
{
// notification contains TimeAggregate.Reading (which is a decimal)
// and TimeAggregate.Stream (which is a string,i.e. Office1,OfficeA etc)
// I want to perform an average on TimeAggregate.Reading but split by TimeAggregate.Reading
}
}
我将如何链接两者(事件处理程序和Reactive Extension)并拆分,以便每个TimeAggregate.Stream键分别对每个Reading名称进行平均(可观察到的平均字典)?
终身详细信息
还有一个技术方面,每次调用事件处理程序(TestHandler
)都是从头开始构建的,则TestHandler的生存期是短暂的。
我可以通过制作一个静态注册管理器来解决这个问题(将IoC注入到TestHandler构造函数中,或者我们直接将其称为static)。
解决方法
一种方法是注入具有单例生存期的服务,该服务可以在事件处理程序中调用。它可以使用Subject
生成可观察值,并公开/利用可观察的运行中平均消息:
public class MonitoringService : IMonitoringService
{
private Subject<TimeAggregate> _subject;
// Calculate tuples of (key,average)
public IObservable<(string,decimal)> Averages => _subject
// Group by key
.GroupBy(s => s.Group)
.SelectMany(g => g
.Select(g => g.Reading)
// Collect element count and running total
.Scan((elements: 0,total: (decimal)0),(agg,v) => (agg.elements + 1,agg.total + v))
// Calculate running average
.Select(t => t.total / t.elements)
// Associate key and average for SelectMany
.Select(average => (g.Key,average)));
public MonitoringService()
{
_subject = new Subject<TimeAggregate>();
}
public void PostNotification(TimeAggregate notification)
{
_subject.OnNext(notification);
}
}
然后可以这样使用:
var monitoringService = new MonitoringService();
var handler = new TestHandler(monitoringService);
var dict = new Dictionary<string,decimal>();
monitoringService.Averages.Subscribe<(string group,decimal average)>(t =>
{
// Do something with running average. In this case populate a dictionary
dict[t.group] = t.average;
});
注意:如果您想在附加观察者之前开始计算平均值,则可能需要研究可连接对象(Publish
等),并且您希望避免使用get仅生成这样的属性,而是在构造函数中分配一次。但是请记住,这种方法只会将更新推送到平均值,因此,如果邮件不是闲聊的话,则平均邮件将被延迟。
例如
public class MonitoringService : IMonitoringService
{
private Subject<TimeAggregate> _subject;
// Calculate tuples of (key,decimal)> Averages { get; }
public MonitoringService()
{
_subject = new Subject<TimeAggregate>();
Averages = _subject
// Group by key
.GroupBy(s => s.Group)
.SelectMany(g => g
.Select(g => g.Reading)
// Collect element count and running total
.Scan((elements: 0,average)))
.Publish()
// Connect immediately
.AutoConnect(0);
}
public void PostNotification(TimeAggregate notification)
{
_subject.OnNext(notification);
}
}
完整示例:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1
{
class Program
{
public static async Task Main()
{
var monitoringService = new MonitoringService();
var handler = new TestHandler(monitoringService);
var dict = new Dictionary<string,decimal>();
monitoringService.Averages.Subscribe<(string group,decimal average)>(t =>
{
// Do something with running average. In this case populate a dictionary
dict[t.group] = t.average;
});
await handler.Handle(new TimeAggregate
{
Group = "Test1",Reading = 100
},"Test",CancellationToken.None);
await handler.Handle(new TimeAggregate
{
Group = "Test1",Reading = 200
},CancellationToken.None);
await handler.Handle(new TimeAggregate
{
Group = "Test2",Reading = 300
},CancellationToken.None);
}
}
public class MonitoringService : IMonitoringService
{
private Subject<TimeAggregate> _subject;
// Calculate tuples of (key,average)
public IObservable<(string,decimal)> Averages => _subject
// Group by key
.GroupBy(s => s.Group)
.SelectMany(g => g
.Select(g => g.Reading)
// Collect element count and running total
.Scan((elements: 0,agg.total + v))
// Calculate running average
.Select(t => t.total / t.elements)
// Associate key and average for SelectMany
.Select(average => (g.Key,average)));
public MonitoringService()
{
_subject = new Subject<TimeAggregate>();
}
public void PostNotification(TimeAggregate notification)
{
_subject.OnNext(notification);
}
}
public class TestHandler : ITopicNotificationHandler<TimeAggregate>
{
private readonly IMonitoringService _monitoringService;
public TestHandler(IMonitoringService monitoringService)
{
_monitoringService = monitoringService;
}
public Task Handle(TimeAggregate notification,string topic,CancellationToken cancellationToken)
{
_monitoringService.PostNotification(notification);
return Task.CompletedTask;
}
}
public interface IMonitoringService
{
void PostNotification(TimeAggregate notification);
IObservable<(string group,decimal average)> Averages { get; }
}
public class TimeAggregate
{
public string Group { get; set; }
public decimal Reading { get; set; }
}
public interface ITopicNotificationHandler<T>
{
}
}