问题描述
我正在尝试创建一个将对所有消息类型执行的过滤器。理想情况下,您只需要注册一次过滤器,而不必为每个使用者进行过滤。 (我也想在发布方面做同样的事情)。我需要它在生命周期范围内。只是从标头中弹出一个值,并将其分配给我的DI容器将提供的Lifetime范围内的对象(发布端执行相反操作)
我在中间件上观看了克里斯·帕特森(Chris Patterson)的抽搐视频,我想快要在38分钟左右就可以了,但是他为特定消费者注册了过滤器。在消费者方面,我认为我需要一个ConsumContext过滤器,但是我只是不知道如何以将其用于所有消费者的方式注册该过滤器。我正在使用MT 7和Autofac。谁能给我展示一些示例代码,说明如何注册适用于所有使用者的作用域筛选器? (而且如果非常不同,那么哪一种适用于所有发布商)?
解决方法
您是否检查过文档?它具有configuration example,并且在端点级别和使用者级别上配置了重试。
Bus.Factory.CreateUsingInMemory(cfg =>
{
cfg.ReceiveEndpoint("input-queue",e =>
{
e.UseMessageRetry(r =>
{
r.Immediate(5);
r.Handle<DataException>(x => x.Message.Contains("SQL"));
});
e.Consumer<MyConsumer>(c => c.UseMessageRetry(r =>
{
r.Interval(10,TimeSpan.FromMilliseconds(200));
r.Ignore<ArgumentNullException>();
r.Ignore<DataException>(x => x.Message.Contains("SQL"));
});
);
});
});
,
如果需要使用期限范围内的过滤器,则需要使用作用域过滤器(需要MassTransit v7)。这将为任何使用者注册筛选器,以便执行该筛选器。您确实需要使过滤器具有通用性,并以T
作为消息类型,可以选择使用还是忽略。
public class MyFilter<T> :
IFilter<ConsumeContext<T>>
where T : class
{
SomeScopedObject _obj;
public MyFilter(SomeScopedObject obj)
{
_obj = obj;
}
public async Task Send(ConsumeContext<T> context,IPipe<ConsumeContext<T>> next)
{
// do your thing with _obj
await next.Send(context);
}
public void Probe(ProbeContext context)
{
}
}
然后,在接收端点上,在使用方之前配置过滤器。
e.UseConsumeFilter(typeof(MyFilter<>));
这将为每个使用者/消息配置在该使用者的容器范围内执行的过滤器版本。
您可以对发布/发送执行相同的操作。
Documentation在网站上。