问题描述
总和:如何延迟天蓝色事件网格中的事件触发或事件接收?
我设计的系统需要对低频对象状态(创建,开始,检查启动状态中的长时间状态,结束)做出反应。看起来像是事件处理的候选者。我想用azure函数实现它... 问题是我需要对处于启动状态的10分钟(可配置)的对象做出反应。事件发生后10分钟做出反应。如何延迟事件起火?如何安排事件触发?如何等待事件? 我正在寻找不会消耗我的付费资源(函数处理时间)的解决方案。 知道如何解决这个问题吗?谢谢。功能1:
- 时间触发(轮询)
- 检查条件
- 可以创建OBJECT创建的事件
功能2:
- http触发
- 用户从UI启动OBJECT
- 创建由OBJECT启动的事件,但从现在开始10分钟触发该事件? (使用功能代码进行睡眠/计时器将消耗10分钟的资源,并且成本太高)
功能3:???
- ???
- 检查状态并检测10分钟的延迟
- 触发OBJECT-10分钟启动的事件
...
解决方法
在事件发生后10分钟做出反应。
首先,我将强调事件应该代表过去发生的事情,而不是将来发生的事情。事件的期望是通知发生了什么。
第二,如果您对需要发生的事情有期望,那么命令胜于事件。在这种情况下,您希望工作在10分钟内在实际可用的状态下在Blob上进行。我将通过向队列(Azure服务总线)发送包含在Blob上工作所需的信息的延迟消息来响应事件。这样,您可以通过准备将来/延迟的工作项来响应事件。
,如何延迟天蓝色事件网格中的事件触发或事件接收?
Azure事件网格(AEG)并未内置此功能,但是使用Azure Service Bus(ASB)实体来扩展延迟(计划的)消息非常容易,如Sean的答案中所述。
以下屏幕片段显示了具有延迟的推拉式订户的概念:
事件消息被推送到ASB主题中,并根据其订阅规则将事件消息作为计划消息转发到队列实体。
主题订阅需要设置以下属性:
-
ForwardTo
name of the queue/topic entity
-
$默认规则
过滤器:
1=1
操作(例如10分钟):
SET sys.TimeToLive = '00:10:00'; SET EnqueuedTimeUtc = sys.EnqueuedTimeUtc; SET ScheduledEnqueueTimeUtc = sys.ExpiresAtUtc; SET sys.ScheduledEnqueueTimeUtc = sys.ExpiresAtUtc; SET sys.Label = 'Delay'; SET sys.TimeToLive = '01:00:00';
目的地队列:
EnableDeadLetteringOnMessageExpiration = true
基于上述设置,队列中的计划消息必须在TTL之内使用,例如“ 01:00:00”,否则该消息将发送到DLQ。在Sean的评论中有更多详细信息。
使用 ServiceBusTrigger 函数,可以像AEG用户一样以透明的方式从队列中提取延迟的事件消息。
在这种情况下,将延迟的事件发送回AEG以进行 Fan-Out 分发并使用 Push-and-Push 模式时,以下示例显示此ServiceBusTrigger的实现,其输出绑定到AEG自定义主题:
run.csx:
#r "Newtonsoft.Json"
#r "Microsoft.Azure.EventGrid"
#r "Microsoft.Azure.ServiceBus"
using System.Net;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.EventGrid.Models;
using Microsoft.Azure.ServiceBus;
public static async Task Run(Message queueItem,IAsyncCollector<EventGridEvent> outputEvents,ILogger log)
{
string jsontext = JToken.Parse(Encoding.UTF8.GetString(queueItem.Body)).ToString(Formatting.Indented);
log.LogInformation(jsontext);
EventGridEvent eventGridEvent = JsonConvert.DeserializeObject<EventGridEvent>(jsontext);
eventGridEvent.Topic = null;
eventGridEvent.Subject += "/delayed";
await outputEvents.AddAsync(eventGridEvent);
await Task.CompletedTask;
}
function.json:
{
"bindings": [
{
"name": "queueItem","type": "serviceBusTrigger","direction": "in","queueName": "aeg","connection": "rk2016_SERVICEBUS"
},{
"type": "eventGrid","direction": "out","name": "outputEvents","topicEndpointUri": "AEG_TOPIC_XX_ENDPOINT","topicKeySetting": "AEG_TOPIC_XX_KEY"
}
]
}
如您所见,subject属性已使用后缀 / delayed 进行了修改,以实现过滤目的,例如避免循环等。