问题描述
我正在研究用IAsyncEnumerable替换某些常规C#事件模式实例的影响。这可以通过IAsyncEnumerable的延迟实例化/激活并缓存该引用以供所有调用者/侦听器使用的方式来完成。一些快速测试(请参见下文)显示了该方法的有效性,但是我还没有看到其他以此方式在线使用IAsyncEnumerable的示例。
我意识到这并不是为IAsyncEnumerable创建的,在这种情况下,大多数人会主张使用ReactiveX(https://github.com/dotnet/reactive)。但是,我很乐意分析为什么有人愿意或不想这样做(而不是仅仅通过Rx来做到这一点)。我在下面提供了一些示例。我的候选事件模式替换是一个更多的事件流(例如,从串行连接或UDP套接字产生的反序列化消息等)
示例1:
class Program
{
public static async Task Main( string[] args )
{
// Cache an async enumerable (imagine stream of events e.g. deserialized messages arriving)
var asyncEnumerable = GetNumbersAsync( 10 );
// Now multiple consumers want to respond to incoming events/messages (in this case just numbers)
await Task.WhenAll( Task.Run( () => ProcessNumbersAsync( 1,asyncEnumerable ) ),Task.Run( () => ProcessNumbersAsync( 2,asyncEnumerable ) ) );
Console.WriteLine( "DONE!");
}
private static async Task ProcessNumbersAsync( int id,IAsyncEnumerable<int> numbers )
{
await foreach ( var n in numbers )
Console.WriteLine( $"{id}: Processing {n}" );
}
private static async IAsyncEnumerable<int> GetNumbersAsync( int maxnumber )
{
// This would really be async read operations from a remote source
for ( var i = 0; i < maxnumber; i++ )
{
await Task.Delay( 100 );
yield return i;
}
}
}
1: Processing 0
2: Processing 0
2: Processing 1
1: Processing 1
2: Processing 2
1: Processing 2
2: Processing 3
1: Processing 3
2: Processing 4
1: Processing 4
2: Processing 5
1: Processing 5
1: Processing 6
2: Processing 6
1: Processing 7
2: Processing 7
2: Processing 8
1: Processing 8
2: Processing 9
1: Processing 9
DONE!
前面的示例将每个使用者置于不同的线程上,但基于上下文(也许是WPF应用程序),同一线程上可能有多个使用者(对于IEnumerable无法实现,但通过IAsyncEnumerable打开门)。以下是控制台应用程序中的内容,但可以想象在WPF应用程序的UI线程上创建的生产者和使用者。
示例2:
class Program
{
public static async Task Main( string[] args )
{
var producer = new Producer();
var consumer1 = new Consumer( 1,producer );
var consumer2 = new Consumer( 2,producer );
var consumer3 = new Consumer( 3,producer );
await Task.WhenAll( consumer1.ConsumeMessagesAsync(),consumer2.ConsumeMessagesAsync(),Task.Run( async () => await consumer3.ConsumeMessagesAsync() ) );
Console.WriteLine( "DONE!");
}
// Singleton producer
private interface IProducer
{
IAsyncEnumerable<int> GetMessagesAsync();
}
// Transient consumer
private interface IConsumer
{
Task ConsumeMessagesAsync();
}
private class Producer : IProducer
{
private const int _maxFakeMessages = 10;
private readonly object _mutex = new Object();
private IAsyncEnumerable<int> _actualIncomingMessagesEnumerable;
public IAsyncEnumerable<int> GetMessagesAsync()
{
// Todo: use AsyncEx AsyncLock
lock ( _mutex )
{
if ( _actualIncomingMessagesEnumerable == null)
_actualIncomingMessagesEnumerable = ReadIncomingMessagesAsync();
}
return _actualIncomingMessagesEnumerable;
}
private static async IAsyncEnumerable<int> ReadIncomingMessagesAsync()
{
for ( var i = 0; i < _maxFakeMessages; i++ )
{
await Task.Delay( 100 );
yield return i;
}
}
}
private class Consumer
{
private readonly int _id;
private readonly IProducer _producer;
public Consumer( int id,IProducer producer )
{
_id = id;
_producer = producer;
}
public async Task ConsumeMessagesAsync()
{
await foreach( var n in _producer.GetMessagesAsync() )
Console.WriteLine( $"{_id}: Processing {n}" );
}
}
}
1: Processing 0
2: Processing 0
3: Processing 0
2: Processing 1
3: Processing 1
1: Processing 1
2: Processing 2
1: Processing 2
3: Processing 2
1: Processing 3
2: Processing 3
3: Processing 3
1: Processing 4
2: Processing 4
3: Processing 4
3: Processing 5
1: Processing 5
2: Processing 5
1: Processing 6
3: Processing 6
2: Processing 6
2: Processing 7
1: Processing 7
3: Processing 7
3: Processing 8
2: Processing 8
1: Processing 8
1: Processing 9
3: Processing 9
2: Processing 9
DONE!
这种模式固有的一个好处是,消费者/调用者可以将其回调/类型T处理代码出现在自己的SynchronizationContext中。通常,来自SerialPort或Timer或其他源的事件可能发生在后台线程上,并且用户(尤其是在UI线程上)可能需要执行自己的同步。在这种情况下,UI线程上的使用者始终可以在UI线程上执行其代码,而控制台应用程序中的用户将在线程池上执行代码。
我想念什么吗?
解决方法
我们对第一个示例的GetNumbersAsync
方法的“事件源”的实现进行一些更改:
private static int _current = 0;
private static async IAsyncEnumerable<int> GetNumbersAsync(int maxNumber)
{
// This would really be async read operations from a remote source
for (var i = 0; i < maxNumber; i++)
{
await Task.Delay(100);
yield return Interlocked.Increment(ref _current);
}
}
以下是此更改后的输出:
1: Processing 1
2: Processing 2
2: Processing 4
1: Processing 3
2: Processing 5
1: Processing 6
1: Processing 8
2: Processing 7
2: Processing 9
1: Processing 10
1: Processing 12
2: Processing 11
1: Processing 14
2: Processing 13
1: Processing 15
2: Processing 16
1: Processing 17
2: Processing 18
1: Processing 19
2: Processing 20
每个消费者都收到不同的“事件”!
尽管示例中的IAsyncEnumerable
是单个缓存的实例,但是每次您尝试使用await foreach
语句枚举时,都会创建一个新的IAsyncEnumerator
,其寿命与此相关。具体的枚举。 IAsyncEnumerator
既不是线程安全的也不是可重用的,并且如果您尝试缓存一个并在使用者之间共享它,并且每个使用者在不同步的情况下调用其MoveNextAsync
方法,您将得到未定义的行为。 / p>
如果您希望可以随时安全地订阅/取消订阅IAsyncEnumerable
的源,并将所有消息传播到可能以不同的速度消耗它们的订阅者,那么这与缓存{{ 1}}由C#迭代器创建(一种包含IAsyncEnumerable
语句的方法)。您可以找到yield
here的实现。
您正在寻找类似频道的内容。