问题描述
我正在尝试使用Async
的{{1}}版,但是我很难确定如何使用Reactive Extensions进行会话管理。
这里有一些背景:
- 一个人可以使用
CimSession.CreateAsync
创建一个CIM会话,并返回Microsoft.Management.Infrastructure
-
IObservable
返回一个CimAsyncResult<T>
,它包装了CimAsyncDelegatedObservable<T>
-
Cimsession.CreateAsync
does not disposeCimSession
随时随地,因此,在不再需要CimAsyncDelegatedobservable<T>
时,必须手动处理。{li> - 获得
Cimsession
的句柄之后,我试图使用QueryInstancesAsync
(例如Cimsession
)运行WQL查询
如果session.QueryInstancesAsync(@"root\cimv2","WQL","SELECT Username FROM Win32_ComputerSystem")
返回了Cimsession
而不是Task
,则代码将非常简单:
Observable
但是由于using var session = Cimsession.CreateAsync(".");
var results = session.QueryInstancesAsync(@"root\cimv2","SELECT Username FROM Win32_ComputerSystem");
方法是使用Async
s实现的,因此我不确定如何将其转换为Observable
s的惯用用法。当然,我可以使用Observable
将方法转换为Task
,但是我想学习如何正确使用Observables。
总而言之,我的问题是:
- 从
System.Reactive.Linq.Observable.ForEachAsync
到Cimsession
的绑定寿命 - 如果没有订阅者,如何确保
Observable
被处置?我是否应该强制至少一个订户但具有工厂方法? - 当
Cimsession
的结果到达时,如何处置可观察到的Cimsession
?
解决方法
界面有点古怪,但是尝试一下,让我知道它是否有效:
IObservable<string> query =
from session in CimSession.CreateAsync(".")
from x in Observable.Using(
() => session,s => s.QueryInstancesAsync(@"root\cimv2","WQL","SELECT Username FROM Win32_ComputerSystem"))
from y in Observable.Using(
() => x,z => Observable.Return(z.GetCimSessionComputerName()))
select y;
如果可行,我建议将其包装在单独的类中:
public static class CimSessionEx
{
public static IObservable<T> CreateObservable<T>(string computerName,string namespaceName,string queryDialect,string queryExpression,Func<CimInstance,IObservable<T>> factory) =>
from session in CimSession.CreateAsync(computerName)
from instance in Observable.Using(() => session,s => s.QueryInstancesAsync(namespaceName,queryDialect,queryExpression))
from y in Observable.Using(() => instance,factory)
select y;
}
然后您可以像这样使用它:
IObservable<string> query =
CimSessionEx.CreateObservable(
".",@"root\cimv2","SELECT Username FROM Win32_ComputerSystem",i => Observable.Return(i.GetCimSessionComputerName()));