在 .NET 中并行消费消息的有效方法

问题描述

我有一个消息流,根据一些标准,我希望每个消费者都能够并行处理其中的一些。每个消费者都应该能够动态订阅和取消订阅

enter image description here

关于输入的更多信息:

  • 我每秒收到大约 500 条消息
  • 我有大约 15000 个消费者

到目前为止,我有几个解决方案:

  1. 活动。
public class Message
{
    public Message(int id,string data)
    {
        Id = id;
        Data = data;
    }

    public int Id { get; }

    public string Data { get; }
}

public class Consumersdispatcher
{
    public event EventHandler<Message> MessageReceived;

    public Consumersdispatcher(int id)
    {
        Id = id;
    }

    public int Id { get; }

    public void OnMessageReceived(Message message)
    {
        if (MessageReceived == null)
        {
            return;
        }

        var delegates = MessageReceived.GetInvocationList();

        Parallel.ForEach(delegates,d => d.DynamicInvoke(this,message));
    }
}

public class Consumer
{
    private readonly ICollection<Consumersdispatcher> _dispatchers;

    public Consumer(int id,string name)
    {
        Id = id;
        Name = name;
        _dispatchers = new List<Consumersdispatcher>();
    }

    public int Id { get; }

    public string Name { get; }

    public void Subscribe(Consumersdispatcher dispatcher)
    {
        if (_dispatchers.Any(m => m.Id == dispatcher.Id))
        {
            return;
        }

        _dispatchers.Add(dispatcher);
        dispatcher.MessageReceived += Foo;
    }

    private void Foo(object sender,Message message)
    {
        // process message
        Console.WriteLine($"{DateTime.Now} | Consumer: {Name} {Id} | Message: {message.Id} {message.Data} |#thread {Thread.CurrentThread.ManagedThreadId}");

        Thread.Sleep(1 * 1000);
    }
}

// Usage
 var consumersdispatcher = new Consumersdispatcher(1);
 Consumer consumer1 = new Consumer(1,"A");
 consumer1.Subscribe(consumersdispatcher);
 Consumer consumer2 = new Consumer(2,"B");
 consumer2.Subscribe(consumersdispatcher);
 Consumer consumer3 = new Consumer(3,"C");

 var consumersdispatcher1 = new Consumersdispatcher(2);

            for (int i = 0; i < 20; i++)
            {
                if (i % 2 == 0)
                {
                    var message = new Message(1,$"data {i}");
                    consumersdispatcher.OnMessageReceived(message);
                    continue;
                }

                var message1 = new Message(2,$"data {i}");
                consumersdispatcher1.OnMessageReceived(message1);
            }

  1. “消息分发器”
public class Messagedispatcher

{
    private List<Consumer> _consumers;

    public Messagedispatcher(List<Consumer> consumers)
    {
        _consumers = consumers;
    }

    public void dispatch(Message message)
    {
        IEnumerable<Consumer> consumers = _consumers.Where(a => a.Messages.Any(x => x.Id == message.Id));

        Parallel.ForEach(consumers,c => c.Foo(message));
    }
}

  1. Actor 模型(Akka.NET 或 Microsoft Orleans)

结论

  • 如果我处理事件,我会将我的对象(我不喜欢)结合起来,据我所知 DynamicInvoke() 使用延迟绑定,这可能很慢也可能不会(我有做一些性能测试)。
  • 第二个解决方案看起来比第一个慢得多。
  • Actor 模型看起来正是我需要的。每个消费者都有自己的本地队列,并且并行执行。问题是我没有使用任何 actor 模型,据我所知有很多配置(应该为 Kubernetes 配置)和努力。

有人可以建议我一个更优雅的解决方案吗?

问候

解决方法

这看起来是 TPL Dataflow 库的一个很好的用例。它提供了一个基于 actor 的编程模型,但比 Akka.NET 或 Microsoft Orleans 更轻量级。您可以通过为每个消费者提供委托并将它们链接在一起,提供过滤委托来创建几个内置的数据流块。每个块都有自己的队列,您可以对其进行配置。一切都在内存中运行。

Rx.NET 是另一种选择。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...