适用于所有使用者/所有消息类型的大众运输过滤器

问题描述

我正在尝试创建一个将对所有消息类型执行的过滤器。理想情况下,您只需要注册一次过滤器,而不必为每个使用者进行过滤。 (我也想在发布方面做同样的事情)。我需要它在生命周期范围内。只是从标头中弹出一个值,并将其分配给我的DI容器将提供的Lifetime范围内的对象(发布端执行相反操作)

我在中间件上观看了克里斯·帕特森(Chris Patterson)的抽搐视频,我想快要在38分钟左右就可以了,但是他为特定消费者注册了过滤器。在消费者方面,我认为我需要一个ConsumContext过滤器,但是我只是不知道如何以将其用于所有消费者的方式注册该过滤器。我正在使用MT 7和Autofac。谁能给我展示一些示例代码,说明如何注册适用于所有使用者的作用域筛选器? (而且如果非常不同,那么哪一种适用于所有发布商)?

解决方法

您是否检查过文档?它具有configuration example,并且在端点级别和使用者级别上配置了重试。

Bus.Factory.CreateUsingInMemory(cfg =>
{
    cfg.ReceiveEndpoint("input-queue",e =>
    {
        e.UseMessageRetry(r => 
        {
            r.Immediate(5);
            r.Handle<DataException>(x => x.Message.Contains("SQL"));
        });
        e.Consumer<MyConsumer>(c => c.UseMessageRetry(r => 
            {
                r.Interval(10,TimeSpan.FromMilliseconds(200));
                r.Ignore<ArgumentNullException>();
                r.Ignore<DataException>(x => x.Message.Contains("SQL"));
            });
        );
    });
});
,

如果需要使用期限范围内的过滤器,则需要使用作用域过滤器(需要MassTransit v7)。这将为任何使用者注册筛选器,以便执行该筛选器。您确实需要使过滤器具有通用性,并以T作为消息类型,可以选择使用还是忽略。

public class MyFilter<T> :
    IFilter<ConsumeContext<T>>
    where T : class
{
    SomeScopedObject _obj;

    public MyFilter(SomeScopedObject obj) 
    {
        _obj = obj;
    }
  
    public async Task Send(ConsumeContext<T> context,IPipe<ConsumeContext<T>> next)
    {
        // do your thing with _obj

        await next.Send(context);
    }

    public void Probe(ProbeContext context)
    {
    }
}

然后,在接收端点上,在使用方之前配置过滤器。

e.UseConsumeFilter(typeof(MyFilter<>));

这将为每个使用者/消息配置在该使用者的容器范围内执行的过滤器版本。

您可以对发布/发送执行相同的操作。

Documentation在网站上。