c# 同步异步消息处理 - TPL 数据流管道

问题描述

我在做什么: 我有一个微服务,它是订单处理系统的一部分,它不断消耗来自 RabbitMQ 的订单消息,我需要将它们临时保存在我的微服务数据库中,直到它们被处理(接受/拒绝)。订单消息被转发到具有两个分支的 TPL 数据流管道。

一个用于处理“创建的”订单。 创建订单时,我会执行一些操作,例如订单的统计信息、验证、持久保存到数据库以及通过 SignalR 通知用户。 (这个分支我有 5 个块,有些包括 I/O 网络调用)。

第二个分支(2 个区块)用于接受/拒绝的订单。当我收到状态为已接受/拒绝的订单时,我需要将其从我的数据库删除,并通过 SignalR 通知用户

“已创建”和“已接受/拒绝”订单的消息仅在其状态属性上有所不同。 根据订单状态,通过 TPL Dataflow 的链接谓词转发到两个分支中的每一个

我的问题: 有时,拒绝/接受订单的消息会在创建相同订单的消息后 50-150 毫秒到达。通常,50-150 ms 在计算中是很多时间,但第一个数据流分支依赖于对其他服务的外部调用,这可能会导致处理延迟。

我想确保我已经完全处理了状态为“已创建”的消息,并且仅在此之后处理同一订单的消息被“接受/拒绝”。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApp12
{
    public static class Program
    {
        static void Main(string[] args)
        {
            var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
            var executionoptions = new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 1,// potentially would be increased
                BoundedCapacity = 50
            };
            var deserialize = new TransformBlock<OrderStatus,Order>(o =>
            {
                return new Order { Status = o };
            },executionoptions);

            #region Creted order
            var b11 = new TransformBlock<Order,Order>(async o =>
            {
                await Task.Delay(15); // do something
                return o;
            },executionoptions);
            var b12 = new TransformBlock<Order,executionoptions);
            var b13 = new TransformBlock<Order,Order>(async o =>
            {
                await Task.Delay(15);
                Console.WriteLine("Saved In DB");
                return o;
            },executionoptions);
            var b14 = new ActionBlock<Order>(async o =>
            {
                await Task.Delay(5);
                Console.WriteLine("SignalR order created");
            },executionoptions);
            #endregion

            #region Accepted/Declined
            var b21 = new ActionBlock<Order>(async o =>
            {
                await Task.Delay(5);
                Console.WriteLine("Deleted from DB");
            },executionoptions);
            var b22 = new ActionBlock<Order>(async o =>
            {
                await Task.Delay(5);
                Console.WriteLine("SignalR order deleted");
            },executionoptions);
            #endregion

            var deleteFromDbAndSignalRInParallelJob = new List<ITargetBlock<Order>> { b21,b22 }.CreateGuaranteedbroadcastBlock();

            deserialize.LinkTo(b11,linkOptions,x => x.Status == OrderStatus.Created);
            b11.LinkTo(b12,linkOptions);
            b12.LinkTo(b13,linkOptions);
            b13.LinkTo(b14,linkOptions);

            deserialize.LinkTo(deleteFromDbAndSignalRInParallelJob,linkOptions);

            deserialize.Post(OrderStatus.Created);
            Thread.Sleep(30); // delay between messaged
            deserialize.Post(OrderStatus.Declined);

            Console.ReadKey();
        }
    }
    class Order
    {
        public OrderStatus Status { get; init; }
    }
    enum OrderStatus
    {
        Created = 1,Declined = 2
    }
    public static class DataflowExtensions
    {
        public static ITargetBlock<T> CreateGuaranteedbroadcastBlock<T>(this IEnumerable<ITargetBlock<T>> targets)
        {
            var targetsList = targets.ToList();
            return new ActionBlock<T>(async item =>
            {
                var tasks = targetsList.Select(t => t.SendAsync(item));
                await Task.WhenAll(tasks);
            },new ExecutionDataflowBlockOptions { BoundedCapacity = 100 });
        }
    }
}

这是一个带有简化模型和逻辑的示例,模拟“创建的”订单分支需要更多时间来完成。 输出为:

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)