可以为多个使用者缓存IAsyncEnumerable吗?

问题描述

我正在研究用IAsyncEnumerable替换某些常规C#事件模式实例的影响。这可以通过IAsyncEnumerable的延迟实例化/激活并缓存该引用以供所有调用者/侦听器使用的方式来完成。一些快速测试(请参见下文)显示了该方法的有效性,但是我还没有看到其他以此方式在线使用IAsyncEnumerable的示例。

我意识到这并不是为IAsyncEnumerable创建的,在这种情况下,大多数人会主张使用ReactiveX(https://github.com/dotnet/reactive)。但是,我很乐意分析为什么有人愿意或不想这样做(而不是仅仅通过Rx来做到这一点)。我在下面提供了一些示例。我的候选事件模式替换是一个更多的事件流(例如,从串行连接或UDP套接字产生的反序列化消息等)

示例1:

  class Program
  {
    public static async Task Main( string[] args )
    {
      // Cache an async enumerable (imagine stream of events e.g. deserialized messages arriving)
      var asyncEnumerable = GetNumbersAsync( 10 );

      // Now multiple consumers want to respond to incoming events/messages (in this case just numbers)
      await Task.WhenAll( Task.Run( () => ProcessNumbersAsync( 1,asyncEnumerable ) ),Task.Run( () => ProcessNumbersAsync( 2,asyncEnumerable ) ) );

      Console.WriteLine( "DONE!");
    }
    
    private static async Task ProcessNumbersAsync( int id,IAsyncEnumerable<int> numbers )
    {
      await foreach ( var n in numbers )
        Console.WriteLine( $"{id}: Processing {n}" );
    }

    private static async IAsyncEnumerable<int> GetNumbersAsync( int maxnumber )
    {
      // This would really be async read operations from a remote source
      for ( var i = 0; i < maxnumber; i++ )
      {
        await Task.Delay( 100 );
        yield return i;
      }
    }
  }

这将产生我希望作为该模式的用户输出

1: Processing 0
2: Processing 0
2: Processing 1
1: Processing 1
2: Processing 2
1: Processing 2
2: Processing 3
1: Processing 3
2: Processing 4
1: Processing 4
2: Processing 5
1: Processing 5
1: Processing 6
2: Processing 6
1: Processing 7
2: Processing 7
2: Processing 8
1: Processing 8
2: Processing 9
1: Processing 9
DONE!

前面的示例将每个使用者置于不同的线程上,但基于上下文(也许是WPF应用程序),同一线程上可能有多个使用者(对于IEnumerable无法实现,但通过IAsyncEnumerable打开门)。以下是控制台应用程序中的内容,但可以想象在WPF应用程序的UI线程上创建的生产者和使用者。

示例2:

  class Program
  {
    public static async Task Main( string[] args )
    {
      var producer = new Producer();
      var consumer1 = new Consumer( 1,producer );
      var consumer2 = new Consumer( 2,producer );
      var consumer3 = new Consumer( 3,producer );

      await Task.WhenAll( consumer1.ConsumeMessagesAsync(),consumer2.ConsumeMessagesAsync(),Task.Run( async () => await consumer3.ConsumeMessagesAsync() ) );

      Console.WriteLine( "DONE!");
    }

    // Singleton producer
    private interface IProducer
    {
      IAsyncEnumerable<int> GetMessagesAsync();
    }

    // Transient consumer
    private interface IConsumer
    {
      Task ConsumeMessagesAsync();
    }

    private class Producer : IProducer
    {
      private const int _maxFakeMessages = 10;
      private readonly object _mutex = new Object();

      private IAsyncEnumerable<int> _actualIncomingMessagesEnumerable;
      
      public IAsyncEnumerable<int> GetMessagesAsync()
      {
        // Todo: use AsyncEx AsyncLock
        lock ( _mutex )
        {
          if ( _actualIncomingMessagesEnumerable == null)
            _actualIncomingMessagesEnumerable = ReadIncomingMessagesAsync();
        }

        return _actualIncomingMessagesEnumerable;
      }

      private static async IAsyncEnumerable<int> ReadIncomingMessagesAsync()
      {
        for ( var i = 0; i < _maxFakeMessages; i++ )
        {
          await Task.Delay( 100 );
          yield return i;
        }
      }
    }

    private class Consumer
    {
      private readonly int _id;
      private readonly IProducer _producer;
      public Consumer( int id,IProducer producer )
      {
        _id = id;
        _producer = producer;
      }

      public async Task ConsumeMessagesAsync()
      {
        await foreach( var n in _producer.GetMessagesAsync() )
          Console.WriteLine( $"{_id}: Processing {n}" );
      }
    }
  }

同样,作为用户我想要的输出

1: Processing 0
2: Processing 0
3: Processing 0
2: Processing 1
3: Processing 1
1: Processing 1
2: Processing 2
1: Processing 2
3: Processing 2
1: Processing 3
2: Processing 3
3: Processing 3
1: Processing 4
2: Processing 4
3: Processing 4
3: Processing 5
1: Processing 5
2: Processing 5
1: Processing 6
3: Processing 6
2: Processing 6
2: Processing 7
1: Processing 7
3: Processing 7
3: Processing 8
2: Processing 8
1: Processing 8
1: Processing 9
3: Processing 9
2: Processing 9
DONE!

这种模式固有的一个好处是,消费者/调用者可以将其回调/类型T处理代码出现在自己的SynchronizationContext中。通常,来自SerialPort或Timer或其他源的事件可能发生在后台线程上,并且用户(尤其是在UI线程上)可能需要执行自己的同步。在这种情况下,UI线程上的使用者始终可以在UI线程上执行其代码,而控制台应用程序中的用户将在线程池上执行代码

我想念什么吗?

解决方法

我们对第一个示例的GetNumbersAsync方法的“事件源”的实现进行一些更改:

private static int _current = 0;
private static async IAsyncEnumerable<int> GetNumbersAsync(int maxNumber)
{
    // This would really be async read operations from a remote source
    for (var i = 0; i < maxNumber; i++)
    {
        await Task.Delay(100);
        yield return Interlocked.Increment(ref _current);
    }
}

以下是此更改后的输出:

1: Processing 1
2: Processing 2
2: Processing 4
1: Processing 3
2: Processing 5
1: Processing 6
1: Processing 8
2: Processing 7
2: Processing 9
1: Processing 10
1: Processing 12
2: Processing 11
1: Processing 14
2: Processing 13
1: Processing 15
2: Processing 16
1: Processing 17
2: Processing 18
1: Processing 19
2: Processing 20

每个消费者都收到不同的“事件”!

尽管示例中的IAsyncEnumerable是单个缓存的实例,但是每次您尝试使用await foreach语句枚举时,都会创建一个新的IAsyncEnumerator,其寿命与此相关。具体的枚举。 IAsyncEnumerator既不是线程安全的也不是可重用的,并且如果您尝试缓存一个并在使用者之间共享它,并且每个使用者在不同步的情况下调用其MoveNextAsync方法,您将得到未定义的行为。 / p>

如果您希望可以随时安全地订阅/取消订阅IAsyncEnumerable的源,并将所有消息传播到可能以不同的速度消耗它们的订阅者,那么这与缓存{{ 1}}由C#迭代器创建(一种包含IAsyncEnumerable语句的方法)。您可以找到yield here的实现。

,

您正在寻找类似频道的内容。

An Introduction to System.Threading.Channels

Working with Channels in .NET

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...