如何推迟事件触发/接收

问题描述

总和:如何延迟天蓝色事件网格中的事件触发或事件接收?

我设计的系统需要对低频对象状态(创建,开始,检查启动状态中的长时间状态,结束)做出反应。看起来像是事件处理的候选者。我想用azure函数实现它... 问题是我需要对处于启动状态的10分钟(可配置)的对象做出反应。事件发生后10分钟做出反应。如何延迟事件起火?如何安排事件触发?如何等待事件? 我正在寻找不会消耗我的付费资源(函数处理时间)的解决方案。 知道如何解决这个问题吗?谢谢。

功能1:

  • 时间触发(轮询)
  • 检查条件
    • 可以创建OBJECT创建的事件

功能2:

  • http触发
  • 用户从UI启动OBJECT
    • 创建由OBJECT启动的事件,但从现在开始10分钟触发该事件? (使用功能代码进行睡眠/计时器将消耗10分钟的资源,并且成本太高)

功能3:???

  • ???
  • 检查状态并检测10分钟的延迟
    • 触发OBJECT-10分钟启动的事件

...

解决方法

在事件发生后10分钟做出反应。

首先,我将强调事件应该代表过去发生的事情,而不是将来发生的事情。事件的期望是通知发生了什么。

第二,如果您对需要发生的事情有期望,那么命令胜于事件。在这种情况下,您希望工作在10分钟内在实际可用的状态下在Blob上进行。我将通过向队列(Azure服务总线)发送包含在Blob上工作所需的信息的延迟消息来响应事件。这样,您可以通过准备将来/延迟的工作项来响应事件。

,

如何延迟天蓝色事件网格中的事件触发或事件接收?

Azure事件网格(AEG)并未内置此功能,但是使用Azure Service Bus(ASB)实体来扩展延迟(计划的)消息非常容易,如Sean的答案中所述。

以下屏幕片段显示了具有延迟的推拉式订户的概念:

enter image description here

事件消息被推送到ASB主题中,并根据其订阅规则将事件消息作为计划消息转发到队列实体。

主题订阅需要设置以下属性:

  • ForwardTo

    name of the queue/topic entity
    
  • $默认规则

    过滤器:

    1=1 
    

    操作(例如10分钟):

    SET sys.TimeToLive = '00:10:00';
    SET EnqueuedTimeUtc = sys.EnqueuedTimeUtc;
    SET ScheduledEnqueueTimeUtc = sys.ExpiresAtUtc;
    SET sys.ScheduledEnqueueTimeUtc = sys.ExpiresAtUtc;
    SET sys.Label = 'Delay';
    SET sys.TimeToLive = '01:00:00';
    

目的地队列:

 EnableDeadLetteringOnMessageExpiration = true

基于上述设置,队列中的计划消息必须在TTL之内使用,例如“ 01:00:00”,否则该消息将发送到DLQ。在Sean的评论中有更多详细信息。

使用 ServiceBusTrigger 函数,可以像AEG用户一样以透明的方式从队列中提取延迟的事件消息。

在这种情况下,将延迟的事件发送回AEG以进行 Fan-Out 分发并使用 Push-and-Push 模式时,以下示例显示此ServiceBusTrigger的实现,其输出绑定到AEG自定义主题:

run.csx:

#r "Newtonsoft.Json"
#r "Microsoft.Azure.EventGrid"
#r "Microsoft.Azure.ServiceBus"


using System.Net;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.EventGrid.Models;
using Microsoft.Azure.ServiceBus;


public static async Task Run(Message queueItem,IAsyncCollector<EventGridEvent> outputEvents,ILogger log)
{   
    string jsontext = JToken.Parse(Encoding.UTF8.GetString(queueItem.Body)).ToString(Formatting.Indented); 
    log.LogInformation(jsontext);

    EventGridEvent eventGridEvent = JsonConvert.DeserializeObject<EventGridEvent>(jsontext);
    eventGridEvent.Topic = null;
    eventGridEvent.Subject += "/delayed";
    await outputEvents.AddAsync(eventGridEvent);

    await Task.CompletedTask;
}

function.json:

{
  "bindings": [
    {
      "name": "queueItem","type": "serviceBusTrigger","direction": "in","queueName": "aeg","connection": "rk2016_SERVICEBUS"
    },{
      "type": "eventGrid","direction": "out","name": "outputEvents","topicEndpointUri": "AEG_TOPIC_XX_ENDPOINT","topicKeySetting": "AEG_TOPIC_XX_KEY"
    }
  ]
}

如您所见,subject属性已使用后缀 / delayed 进行了修改,以实现过滤目的,例如避免循环等。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...