反应性扩展和IDisposable管理

问题描述

我正在尝试使用Async的{​​{1}}版,但是我很难确定如何使用Reactive Extensions进行会话管理。

这里有一些背景:

如果session.QueryInstancesAsync(@"root\cimv2","WQL","SELECT Username FROM Win32_ComputerSystem")返回了Cimsession而不是Task,则代码将非常简单:

Observable

但是由于using var session = Cimsession.CreateAsync("."); var results = session.QueryInstancesAsync(@"root\cimv2","SELECT Username FROM Win32_ComputerSystem"); 方法是使用Async s实现的,因此我不确定如何将其转换为Observable s的惯用用法。当然,我可以使用Observable方法转换为Task,但是我想学习如何正确使用Observables。

总而言之,我的问题是:

  • System.Reactive.Linq.Observable.ForEachAsyncCimsession的绑定寿命
  • 如果没有订阅者,如何确保Observable被处置?我是否应该强制至少一个订户但具有工厂方法
  • Cimsession的结果到达时,如何处置可观察到的Cimsession

解决方法

界面有点古怪,但是尝试一下,让我知道它是否有效:

IObservable<string> query =
    from session in CimSession.CreateAsync(".")
    from x in Observable.Using(
        () => session,s => s.QueryInstancesAsync(@"root\cimv2","WQL","SELECT Username FROM Win32_ComputerSystem"))
    from y in Observable.Using(
        () => x,z => Observable.Return(z.GetCimSessionComputerName()))
    select y;

如果可行,我建议将其包装在单独的类中:

public static class CimSessionEx
{
    public static IObservable<T> CreateObservable<T>(string computerName,string namespaceName,string queryDialect,string queryExpression,Func<CimInstance,IObservable<T>> factory) =>
        from session in CimSession.CreateAsync(computerName)
        from instance in Observable.Using(() => session,s => s.QueryInstancesAsync(namespaceName,queryDialect,queryExpression))
        from y in Observable.Using(() => instance,factory)
        select y;
}

然后您可以像这样使用它:

IObservable<string> query =
    CimSessionEx.CreateObservable(
        ".",@"root\cimv2","SELECT Username FROM Win32_ComputerSystem",i => Observable.Return(i.GetCimSessionComputerName()));

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...