在运行时为Rebus生成消息处理程序

问题描述

我要使用Rebus订阅和处理Rabbitmq中的消息时遇到了一个问题。第三方程序集中定义了多种消息类型,新的消息类型将定期添加到该程序集中。

我需要以某种方式使Rebus订阅并处理所有这些消息类型,并将它们转发(发布)到另一个Rabbitmq实例。我的服务本质上是转发消息,并在这样做时添加自定义的重载标头。

问题是我不想为每种消息类型生成处理程序类(因为功能是相同的,而不管消息类型如何)。我也不想每次在第三方程序集中添加新消息类型时更新代码(编写新的处理程序类)。

我尝试使用TypeBuilder为通过反射找到的每种类型动态创建消息处理程序类,但是感觉有点混乱,所以我希望有另一种方法吗?

下面的代码概述了即使代码未编译,我希望达到的目标。

public void SubscribeAndHandleMessages()
        {
            // These types will be determined runtime by using reflection but thats omitted for clarity
            var messageTypes = new List<Type>(){typeof(MessageA),typeof(MessageB)}; 

            var activator = new BuiltinHandlerActivator();

            Configure.With(activator)
                .Transport(t => t.UseRabbitMq(_rabbitConnectionString,"MyQueue"))
                .Start();

            //Subscribe and register handlers
            foreach (var type in messageTypes)
            {
                activator.Bus.Subscribe(type); //This works,I can see the queue subscribing to the correct topics
                activator.Handle<type>(async (bus,context,message) => //This doesnt work since type is not kNown at compile time
                {
                    //Forwarding to another rabbit instance,same handling for all types of messages
                });
            }
        }

解决方法

建立必要的订阅后,您只需要能够处理收到的所有消息。

使用Rebus的最佳方法是避免普通的消息处理管道(反序列化=>查找处理程序=>调度),而是以原始格式(即,以“传输消息”格式)处理消息。

您可以使用Rebus的传输消息转发功能来做到这一点。有了它,一个100%的通用消息处理程序可能看起来像这样:

Configure.With(activator)
    .Transport(t => t.UseInMemoryTransport(new InMemNetwork(),"router-tjek"))
    .Routing(r => r.AddTransportMessageForwarder(async transportMessage =>
    {
        var headers = transportMessage.Headers; //< Dictionary<string,string>
        var body = transportMessage.Body;       //< byte[]

        // handle the message here,e.g.
        // by deserializing the body into a JObject,// storing the bytes in a database,or by
        // forwarding the message to another queue
        return // appropriate forward action here
    }))
    .Start();

您可以在此处了解更多信息:Transport message forwarding

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...