问题描述
我在 .net core 2.1 后端使用 graphql.net 订阅。基本上,订阅与消息服务通信。为了确保消息传递给调用者,将我的消息服务创建为单例。许多 Hangfire 作业需要使用此消息服务。每个作业都从 DI 获取消息服务。我需要的是将 Redis 用于 PubsUp。这样,我就不需要单例消息服务了。
订阅;
public void Activate(ObjectGraphType objectGraph,IHostingEnvironment env,IServiceProvider sp)
{
objectGraph.AddField(new EventStreamFieldType
{
Name = "messageSubscription",Type = typeof(MyMessageGType),Resolver = new FuncFieldResolver<MyMessage>(context => context.source as MyMessage),Arguments = new QueryArguments(
new QueryArgument<NonNullGraphType<StringGraphType>> { Name = "operationId" },new QueryArgument<NonNullGraphType<CommandWatchModeGType>> { Name = "watchMode" }
),Subscriber = new EventStreamResolver<MyMessage>(context =>
{
var subscriptionService = (IMyMessageService)sp.GetService(typeof(IMyMessageService));
var operationId = context.GetArgument<string>("operationId");
var watchMode = context.GetArgument<CommandWatchMode>("watchMode");
return subscriptionService.GetMessages(operationId,watchMode);
})
});
}
消息服务,
public class MyMessageService: IMyMessageService
{
private readonly ConcurrentDictionary<string,IObservable<MyMessage>> Subscriptions =
new ConcurrentDictionary<string,IObservable<MyMessage>>();
private readonly ISubject<MyMessage> _messageStream = new ReplaySubject<MyMessage>(0);
public MyMessage AddBscmessage(MyMessage message)
{
_messageStream.OnNext(message);
return message;
}
public IObservable<MyMessage> GetMessages(string operationId,CommandWatchMode watchMode)
{
var key = $"{operationId}_{watchMode}";
if (!Subscriptions.ContainsKey(key))
{
var mess = _messageStream
.Where(message =>
message.OperationId == operationId
).Select(s => s)
.AsObservable();
Subscriptions.TryAdd(key,mess);
return mess;
}
else
{
return Subscriptions[key];
}
}
}
DI 注册;
services.AddSingleton<IBscAfadExecuteOtherService,BscAfadExecuteOtherService>();
这种场景下如何使用Redis?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)