c# – 来自StackExchange Redis Pub Sub订阅的可观察流

目的:

我正在使用StackExchange Redis Client.我的目标是从客户端公开的Pub Sub Subscriber创建一个Observable流,然后可以支持Observables的1-n订阅,每个订阅都有自己的LINQ过滤器. (发布按计划运行,问题纯粹围绕特定频道上的事件流订阅.)

背景:

我使用Redis Pub Sub作为事件源CQRS应用程序的一部分.具体用例是将事件发布给多个订阅者,然后更新各种读取模型,发送电子邮件等.

这些订阅者中的每一个都需要过滤他们处理的事件类型,为此我希望使用Rx .Net(Reactive Extensions)和LINQ,
在事件流上提供过滤条件,以有效地处理仅对感兴趣的事件做出反应.使用这种方法消除了使用事件总线实现注册处理程序的需要,并允许我通过部署1-n微服务向系统添加新投影,每个微服务都有1-n Observables使用自己的特定过滤器订阅事件流.

我做了什么:

1)我创建了一个继承自ObservableBase的类,重写了SubscribeCore方法,该方法接收来自Observables的订阅请求,将它们存储在ConcurrentDictionary中,并且当每个Redis通知从通道到达时,循环通过已注册的Observable订阅者并调用它们的OnNext方法传递RedisValue.

2)我创建了一个Subject,它也接受来自Observables的订阅,并调用它们的OnNext方法.同样,对象的使用似乎被许多人所厌恶.

问题:

我尝试过的方法功能(至少表面上看),具有不同的性能水平,但感觉像是一个黑客,而且我没有按照预期的方式使用Rx.

我看到许多评论认为应尽可能使用内置的Observable方法,例如Observable.FromEvent,但这似乎与StackExchange Redis客户端订阅API无关,至少对我来说是这样.

我也理解接收流并转发到多个观察者的首选方法是使用ConnectableObservable,它似乎是为我面临的场景设计的(每个微服务将在内部订阅1-n Observables).目前,我无法理解如何将ConnectableObservable连接到StackExchange Redis的通知,或者它是否提供了对Observable的真正好处.

更新:

尽管在我的场景中完成不是问题(Disposal很好),但错误处理很重要;例如隔离在一个订户中检测到的错误,以防止所有订阅终止.

解决方法

这是一个可用于创建IObservable< RedisValue>的扩展方法.来自ISubscriber和RedisChannel:
public static IObservable<RedisValue> WhenMessageReceived(this ISubscriber subscriber,RedisChannel channel)
{
    return Observable.Create<RedisValue>(async (obs,ct) =>
    {
        await subscriber.SubscribeAsync(channel,(_,message) =>
        {
            obs.OnNext(message);
        }).ConfigureAwait(false);

        return Disposable.Create(() => subscriber.Unsubscribe(channel));
    });
}

由于没有完成Redis通道,因此生成的IObservable永远不会完成,但是您可以删除IDisposable订阅以取消订阅Redis通道(这将由许多Rx操作符自动完成).

用法可能是这样的:

var subscriber = connectionMultiplexer.GetSubscriber();

var gotMessage = await subscriber.WhenMessageReceived("my_channel")
    .AnyAsync(msg => msg == "expected_message")
    .ToTask()
    .ConfigureAwait(false);

或者根据你的例子:

var subscriber = connectionMultiplexer.GetSubscriber();

var sendEmailEvents = subscriber.WhenMessageReceived("my_channel")
    .Select(msg => ParseEventFromMessage(msg))
    .Where(evt => evt.Type == EventType.SendEmails);

await sendEmailEvents.ForEachAsync(evt => 
{
    SendEmails(evt);
}).ConfigureAwait(false);

其他微服务可能会有不同的过滤.

相关文章

项目中经常遇到CSV文件的读写需求,其中的难点主要是CSV文件...
简介 本文的初衷是希望帮助那些有其它平台视觉算法开发经验的...
这篇文章主要简单记录一下C#项目的dll文件管理方法,以便后期...
在C#中的使用JSON序列化及反序列化时,推荐使用Json.NET——...
事件总线是对发布-订阅模式的一种实现,是一种集中式事件处理...
通用翻译API的HTTPS 地址为https://fanyi-api.baidu.com/api...