问题描述
更新:查看底部的示例
我需要在班级之间发消息。发布者将无限期地循环,调用某种方法来获取数据,然后将该调用的结果传递到OnNext
中。订阅者可以有很多,但是只能有一个IObservable和一项长期运行的任务。这是一个实现。
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
namespace UnitTestProject1
{
[TestClass]
public class UnitTest1
{
private static string GetSomeData() => "Hi";
[TestMethod]
public async Task RunMessagingAsync()
{
var subject = new Subject<string>();
//Create a class and inject the subject as IObserver
new Publisher(subject);
//Create a class and inject the subject as IObservable
new Subscriber(subject,1.ToString());
new Subscriber(subject,2.ToString());
new Subscriber(subject,3.ToString());
//Run the loop for 3 seconds
await Task.Delay(3000);
}
class Publisher
{
public Publisher(IObserver<string> observer)
{
Task.Run(async () =>
{
//Loop forever
while (true)
{
//Get some data,publish it with OnNext and wait 500 milliseconds
observer.OnNext(GetSomeData());
await Task.Delay(500);
}
});
}
}
class Subscriber
{
public string Name;
//Listen for OnNext and write to the debug window when it happens
public Subscriber(IObservable<string> observable,string name)
{
Name = name;
var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
}
}
}
}
输出:
名称:1条消息:嗨
名称:2条消息:嗨
名称:3则消息:您好
名称:1条消息:嗨
名称:2条消息:嗨
名称:3则消息:您好
这很好。请注意,只有一个IObserver
发送消息,但是所有订阅都接收到该消息。 但是,如何分隔IObservable
和IObserver
??它们像Subject
一样粘在一起。这是另一种方法。
[TestMethod]
public async Task RunMessagingAsync2()
{
var observers = new List<IObserver<string>>();
var observable = Observable.Create(
(IObserver<string> observer) =>
{
observers.Add(observer);
Task.Run(async () =>
{
while (true)
{
try
{
observer.OnNext(GetSomeData());
}
catch (Exception ex)
{
observer.OnError(ex);
}
await Task.Delay(500);
}
});
return disposable.Create(() => { });
});
//Create a class and inject the subject as IObservable
new Subscriber(observable);
new Subscriber(observable);
//Run the loop for 10 seconds
await Task.Delay(10000);
Assert.IsTrue(ReferenceEquals(observers[0],observers[1]));
}
这里的问题是,这将创建两个单独的Task
和两个单独的IObserver
。每个订阅都会创建一个新的IObserver。您可以确认,因为此处的Assert
失败。这对我来说真的没有任何意义。根据我对反应式编程的了解,我不希望这里的Subscribe
方法每次都会创建一个新的IObserver
。签出this gist。这是对Observable.Create example的略微修改。它显示了Subscribe方法如何导致每次调用时创建IObserver。 如何在不使用Subject
的情况下实现第一个示例的功能?
这是另一种根本不使用Reactive UI的方法...您可以根据需要从发布者创建Subject
,但这不是必需的。
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Threading.Tasks;
namespace UnitTestProject1
{
[TestClass]
public class UnitTest1
{
private static string GetSomeData() => "Hi";
class Publisher
{
public Publisher(Action<string> onNext)
{
Task.Run(async () =>
{
//Loop forever
while (true)
{
//Get some data,publish it with OnNext and wait 500 milliseconds
onNext(GetSomeData());
await Task.Delay(500);
}
});
}
}
class Subscriber
{
//Listen for OnNext and write to the debug window when it happens
public void ReceiveMessage(string message) => Debug.WriteLine(message);
}
[TestMethod]
public async Task RunMessagingAsync()
{
//Create a class and inject the subject as IObservable
var subscriber = new Subscriber();
//Create a class and inject the subject as IObserver
new Publisher(subscriber.ReceiveMessage);
//Run the loop for 10 seconds
await Task.Delay(10000);
}
}
}
最后,我应该补充说ReactiveUI曾经有一个MessageBus class。我不确定是否将其删除,但不再建议使用。他们建议我们改用什么?
工作示例
此版本正确。我想我现在唯一要问的是如何用Observable.Create
来做到这一点? Observable.Create
的问题在于它为每个订阅运行操作。那不是预期的功能。无论有多少个订阅,这里运行时间很长的任务只会运行一次。
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace UnitTestProject1
{
class Subscriber
{
public string Name;
//Listen for OnNext and write to the debug window when it happens
public Subscriber(IObservable<string> observable,string name)
{
Name = name;
var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
}
}
internal class BasicObservable<T> : IObservable<T>
{
List<IObserver<T>> _observers = new List<IObserver<T>>();
public BasicObservable(
Func<T> getData,TimeSpan? interval = null,CancellationToken cancellationToken = default
) =>
Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await Task.Delay(interval ?? new TimeSpan(0,1));
var data = getData();
_observers.ForEach(o => o.OnNext(data));
}
catch (Exception ex)
{
_observers.ForEach(o => o.OnError(ex));
}
}
_observers.ForEach(o => o.OnCompleted());
},cancellationToken);
public Idisposable Subscribe(IObserver<T> observer)
{
_observers.Add(observer);
return disposable.Create(observer,(o) => _observers.Remove(o));
}
}
public static class ObservableExtensions
{
public static IObservable<T> CreateObservable<T>(
this Func<T> getData,CancellationToken cancellationToken = default)
=> new BasicObservable<T>(getData,default,cancellationToken);
public static IObservable<T> CreateObservable<T>(
this Func<T> getData,interval,cancellationToken);
}
[TestClass]
public class UnitTest1
{
string GetData() => "Hi";
[TestMethod]
public async Task Messaging()
{
var cancellationSource = new CancellationTokenSource();
var cancellationToken = cancellationSource.Token;
Func<string> getData = GetData;
var publisher = getData.CreateObservable(cancellationToken);
new Subscriber(publisher,"One");
new Subscriber(publisher,"Two");
for (var i = 0; true; i++)
{
if (i >= 5)
{
cancellationSource.Cancel();
}
await Task.Delay(1000);
}
}
}
}
解决方法
首先,您必须熟悉"cold" and "hot" observables的理论。这是Introduction to RX的定义。
- 冷是被动序列,可根据请求(订阅时)开始生成通知。
- 热门是活动的序列,无论订阅如何,都会产生通知。
您想要的是一个热观测值,而问题是Observable.Create
方法创建了冷观测值。但是,您可以使用Publish
运算符使任何可观察到的热点。该运算符提供了一种方法,可以让多个独立的观察者共享单个基础订阅。示例:
int index = 0;
var coldObservable = Observable.Create<int>(observer =>
{
_ = Task.Run(async () =>
{
while (true)
{
observer.OnNext(++index);
await Task.Delay(1000);
}
});
return Disposable.Empty;
});
IConnectableObservable<int> hotObservable = coldObservable.Publish();
hotObservable.Connect(); // Causes the start of the loop
hotObservable.Subscribe(s => Console.WriteLine($"Observer A received #{s}"));
hotObservable.Subscribe(s => Console.WriteLine($"Observer B received #{s}"));
在调用coldObservable
方法时,将订阅由Observable.Create
创建的hotObservable.Connect
,然后,由该单个订阅生成的所有通知都将传播到{{1}的所有订阅者}。
输出:
hotObservable
重要提示:以上示例的目的是演示Observer A received #1
Observer B received #1
Observer A received #2
Observer B received #2
Observer A received #3
Observer B received #3
Observer A received #4
Observer B received #4
Observer A received #5
Observer B received #5
Observer A received #6
Observer B received #6
...
运算符,而不是作为高质量RX代码的示例。它的问题之一是,从理论上讲,在连接到源之后订阅观察者成为可能,因此第一通知不会发送给部分或全部观察者,因为它可能是在其订阅之前创建的。换句话说,这是比赛条件。
还有另一种管理Publish
(运营商RefCount
)的生存期的方法:
返回一个可观察序列,只要该观察序列至少有一个订阅,该序列就一直与源保持连接。
IConnectableObservable
通过这种方式,您无需手动var hotObservable = coldObservable.Publish().RefCount();
。该连接在第一次订阅时自动发生,而在最后一次取消订阅时自动解除。
我将其添加为答案是因为我觉得Christian在他的答案中发布的代码很危险,因为它混合了Tasks和Rx并存在竞争条件。
这是解决大多数这些问题的替代方法:
public class UnitTest1
{
private string GetData() => "Hi";
private IDisposable Subscriber(IObservable<string> observable,string name) =>
observable.Subscribe(s => Debug.WriteLine($"Name: {name} Message: {s}"));
public async Task Messaging()
{
var coldObservable =
Observable
.Timer(TimeSpan.Zero,TimeSpan.FromSeconds(1.0))
.Select(_ => GetData());
var publisher = coldObservable.Publish();
var subscriptions =
new CompositeDisposable(
Subscriber(publisher,"One"),Subscriber(publisher,"Two"),publisher.Connect());
await Task.Delay(TimeSpan.FromSeconds(5.0));
subscriptions.Dispose();
}
}
不过,更好的是,我将考虑以这种方式进行操作:
public class UnitTest1
{
private string GetData() => "Hi";
private IObservable<string> Subscriber(IObservable<string> observable,string name) =>
observable.Select(s => $"Name: {name} Message: {s}");
public async Task Messaging()
{
var coldObservable =
Observable
.Timer(TimeSpan.Zero,TimeSpan.FromSeconds(1.0))
.Select(_ => GetData())
.Do(_ => Debug.WriteLine("Called GetData()"))
.Publish(published =>
Observable
.Merge(
Subscriber(published,Subscriber(published,"Two")))
.TakeUntil(Observable.Timer(TimeSpan.FromSeconds(5.0)))
.Do(x => Debug.WriteLine(x));
await coldObservable;
}
}
始终最好将内置运算符用于Rx,而不是将混合方法用于任务。
,由于上面的回答,我最终无需实现IObservable
就获得了期望的结果。西奥多是正确的。答案是使用IObservable
方法将Publish()
转换为hot。
我写了一篇有关here
的文章虽然这可行,但上面谜题的答案要好得多。
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Observables
{
class Subscriber
{
public string Name;
//Listen for OnNext and write to the debug window when it happens
public Subscriber(IObservable<string> observable,string name)
{
Name = name;
observable.Subscribe(s => Debug.WriteLine($"Name: {Name} Message: {s}"));
}
}
[TestClass]
public class UnitTest1
{
static string GetData() => "Hi";
[TestMethod]
public async Task Messaging()
{
var cancellationSource = new CancellationTokenSource();
var cancellationToken = cancellationSource.Token;
var coldObservable = Observable.Create<string>(observer =>
{
_ = Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
var data = GetData();
observer.OnNext(data);
await Task.Delay(1000);
}
},cancellationToken);
return Disposable.Empty;
});
var publisher = coldObservable.Publish();
var connection = publisher.Connect();
new Subscriber(publisher,"One");
new Subscriber(publisher,"Two");
for (var i = 0; i < 5; i++)
{
if (i == 4)
{
cancellationSource.Cancel();
}
await Task.Delay(1000);
}
connection.Dispose();
}
}
}