问题描述
我在做什么: 我有一个微服务,它是订单处理系统的一部分,它不断消耗来自 RabbitMQ 的订单消息,我需要将它们临时保存在我的微服务数据库中,直到它们被处理(接受/拒绝)。订单消息被转发到具有两个分支的 TPL 数据流管道。
第一个用于处理“创建的”订单。 创建订单时,我会执行一些操作,例如订单的统计信息、验证、持久保存到数据库以及通过 SignalR 通知用户。 (这个分支我有 5 个块,有些包括 I/O 网络调用)。
第二个分支(2 个区块)用于接受/拒绝的订单。当我收到状态为已接受/拒绝的订单时,我需要将其从我的数据库中删除,并通过 SignalR 通知用户。
“已创建”和“已接受/拒绝”订单的消息仅在其状态属性上有所不同。 根据订单状态,通过 TPL Dataflow 的链接谓词转发到两个分支中的每一个。
我的问题: 有时,拒绝/接受订单的消息会在创建相同订单的消息后 50-150 毫秒到达。通常,50-150 ms 在计算中是很多时间,但第一个数据流分支依赖于对其他服务的外部调用,这可能会导致处理延迟。
我想确保我已经完全处理了状态为“已创建”的消息,并且仅在此之后处理同一订单的消息被“接受/拒绝”。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace ConsoleApp12
{
public static class Program
{
static void Main(string[] args)
{
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
var executionoptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,// potentially would be increased
BoundedCapacity = 50
};
var deserialize = new TransformBlock<OrderStatus,Order>(o =>
{
return new Order { Status = o };
},executionoptions);
#region Creted order
var b11 = new TransformBlock<Order,Order>(async o =>
{
await Task.Delay(15); // do something
return o;
},executionoptions);
var b12 = new TransformBlock<Order,executionoptions);
var b13 = new TransformBlock<Order,Order>(async o =>
{
await Task.Delay(15);
Console.WriteLine("Saved In DB");
return o;
},executionoptions);
var b14 = new ActionBlock<Order>(async o =>
{
await Task.Delay(5);
Console.WriteLine("SignalR order created");
},executionoptions);
#endregion
#region Accepted/Declined
var b21 = new ActionBlock<Order>(async o =>
{
await Task.Delay(5);
Console.WriteLine("Deleted from DB");
},executionoptions);
var b22 = new ActionBlock<Order>(async o =>
{
await Task.Delay(5);
Console.WriteLine("SignalR order deleted");
},executionoptions);
#endregion
var deleteFromDbAndSignalRInParallelJob = new List<ITargetBlock<Order>> { b21,b22 }.CreateGuaranteedbroadcastBlock();
deserialize.LinkTo(b11,linkOptions,x => x.Status == OrderStatus.Created);
b11.LinkTo(b12,linkOptions);
b12.LinkTo(b13,linkOptions);
b13.LinkTo(b14,linkOptions);
deserialize.LinkTo(deleteFromDbAndSignalRInParallelJob,linkOptions);
deserialize.Post(OrderStatus.Created);
Thread.Sleep(30); // delay between messaged
deserialize.Post(OrderStatus.Declined);
Console.ReadKey();
}
}
class Order
{
public OrderStatus Status { get; init; }
}
enum OrderStatus
{
Created = 1,Declined = 2
}
public static class DataflowExtensions
{
public static ITargetBlock<T> CreateGuaranteedbroadcastBlock<T>(this IEnumerable<ITargetBlock<T>> targets)
{
var targetsList = targets.ToList();
return new ActionBlock<T>(async item =>
{
var tasks = targetsList.Select(t => t.SendAsync(item));
await Task.WhenAll(tasks);
},new ExecutionDataflowBlockOptions { BoundedCapacity = 100 });
}
}
}
这是一个带有简化模型和逻辑的示例,模拟“创建的”订单分支需要更多时间来完成。 输出为:
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)