使用 redis pubsup 订阅 Graphql.net

问题描述

我在 .net core 2.1 后端使用 graphql.net 订阅。基本上,订阅与消息服务通信。为了确保消息传递给调用者,将我的消息服务创建为单例。许多 Hangfire 作业需要使用此消息服务。每个作业都从 DI 获取消息服务。我需要的是将 Redis 用于 PubsUp。这样,我就不需要单例消息服务了。

订阅

public void Activate(ObjectGraphType objectGraph,IHostingEnvironment env,IServiceProvider sp)
        {
            objectGraph.AddField(new EventStreamFieldType
            {
                Name = "messageSubscription",Type = typeof(MyMessageGType),Resolver = new FuncFieldResolver<MyMessage>(context => context.source as MyMessage),Arguments = new QueryArguments(
                    new QueryArgument<NonNullGraphType<StringGraphType>> { Name = "operationId" },new QueryArgument<NonNullGraphType<CommandWatchModeGType>> { Name = "watchMode" }
                ),Subscriber = new EventStreamResolver<MyMessage>(context =>
                {
                    var subscriptionService = (IMyMessageService)sp.GetService(typeof(IMyMessageService));
                    var operationId = context.GetArgument<string>("operationId");
                    var watchMode = context.GetArgument<CommandWatchMode>("watchMode");

                    return subscriptionService.GetMessages(operationId,watchMode);
                })
            });
        }

消息服务,

public class MyMessageService: IMyMessageService
    {    
        private readonly ConcurrentDictionary<string,IObservable<MyMessage>> Subscriptions = 
            new ConcurrentDictionary<string,IObservable<MyMessage>>();

        private readonly ISubject<MyMessage> _messageStream = new ReplaySubject<MyMessage>(0);

        public MyMessage AddBscmessage(MyMessage message)
        {
            _messageStream.OnNext(message);
            return message;
        }

        public IObservable<MyMessage> GetMessages(string operationId,CommandWatchMode watchMode)
        {
            var key = $"{operationId}_{watchMode}";

            if (!Subscriptions.ContainsKey(key))
            {
                var mess = _messageStream
                    .Where(message =>
                        message.OperationId == operationId 
                    ).Select(s => s)
                    .AsObservable();

                Subscriptions.TryAdd(key,mess);
                return mess;
            }
            else
            {
                return Subscriptions[key];
            }
        }
    }

DI 注册

services.AddSingleton<IBscAfadExecuteOtherService,BscAfadExecuteOtherService>();

这种场景下如何使用Redis

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)