问题描述
我有一个消息流,根据一些标准,我希望每个消费者都能够并行处理其中的一些。每个消费者都应该能够动态订阅和取消订阅。
关于输入的更多信息:
- 我每秒收到大约 500 条消息
- 我有大约 15000 个消费者
- 活动。
public class Message
{
public Message(int id,string data)
{
Id = id;
Data = data;
}
public int Id { get; }
public string Data { get; }
}
public class Consumersdispatcher
{
public event EventHandler<Message> MessageReceived;
public Consumersdispatcher(int id)
{
Id = id;
}
public int Id { get; }
public void OnMessageReceived(Message message)
{
if (MessageReceived == null)
{
return;
}
var delegates = MessageReceived.GetInvocationList();
Parallel.ForEach(delegates,d => d.DynamicInvoke(this,message));
}
}
public class Consumer
{
private readonly ICollection<Consumersdispatcher> _dispatchers;
public Consumer(int id,string name)
{
Id = id;
Name = name;
_dispatchers = new List<Consumersdispatcher>();
}
public int Id { get; }
public string Name { get; }
public void Subscribe(Consumersdispatcher dispatcher)
{
if (_dispatchers.Any(m => m.Id == dispatcher.Id))
{
return;
}
_dispatchers.Add(dispatcher);
dispatcher.MessageReceived += Foo;
}
private void Foo(object sender,Message message)
{
// process message
Console.WriteLine($"{DateTime.Now} | Consumer: {Name} {Id} | Message: {message.Id} {message.Data} |#thread {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(1 * 1000);
}
}
// Usage
var consumersdispatcher = new Consumersdispatcher(1);
Consumer consumer1 = new Consumer(1,"A");
consumer1.Subscribe(consumersdispatcher);
Consumer consumer2 = new Consumer(2,"B");
consumer2.Subscribe(consumersdispatcher);
Consumer consumer3 = new Consumer(3,"C");
var consumersdispatcher1 = new Consumersdispatcher(2);
for (int i = 0; i < 20; i++)
{
if (i % 2 == 0)
{
var message = new Message(1,$"data {i}");
consumersdispatcher.OnMessageReceived(message);
continue;
}
var message1 = new Message(2,$"data {i}");
consumersdispatcher1.OnMessageReceived(message1);
}
- “消息分发器”
public class Messagedispatcher
{
private List<Consumer> _consumers;
public Messagedispatcher(List<Consumer> consumers)
{
_consumers = consumers;
}
public void dispatch(Message message)
{
IEnumerable<Consumer> consumers = _consumers.Where(a => a.Messages.Any(x => x.Id == message.Id));
Parallel.ForEach(consumers,c => c.Foo(message));
}
}
- Actor 模型(Akka.NET 或 Microsoft Orleans)
结论
- 如果我处理事件,我会将我的对象(我不喜欢)结合起来,据我所知 DynamicInvoke() 使用延迟绑定,这可能很慢也可能不会(我有做一些性能测试)。
- 第二个解决方案看起来比第一个慢得多。
- Actor 模型看起来正是我需要的。每个消费者都有自己的本地队列,并且并行执行。问题是我没有使用任何 actor 模型,据我所知有很多配置(应该为 Kubernetes 配置)和努力。
问候
解决方法
这看起来是 TPL Dataflow 库的一个很好的用例。它提供了一个基于 actor 的编程模型,但比 Akka.NET 或 Microsoft Orleans 更轻量级。您可以通过为每个消费者提供委托并将它们链接在一起,提供过滤委托来创建几个内置的数据流块。每个块都有自己的队列,您可以对其进行配置。一切都在内存中运行。
Rx.NET 是另一种选择。