控制台应用程序的Azure事件网格订阅

问题描述

我想在C#控制台应用程序中订阅Azure事件网格,实际上我正在实现eShopContainer项目中的EventBus示例,我需要订阅一个主题并收听消息,处理和打印发送的消息在另一个实现EventBus的C#控制台应用程序之前。因此,¿该如何使用C#控制台应用程序来做到这一点?

这是我的azure门户,邮件存储在队列存储中:

azure portal subscriptions

这是所有消息所在的队列:

all messages

所以,我需要订阅并获取所有消息!

解决方法

基本上,有三种方法可以在Azure事件网格模型中使用控制台订阅服务器。下图显示了它们:

enter image description here

请注意,在我的hybrid connection中使用了ngrok tunnelAzure Event Grid Tester。看看它们的实现。

以下代码段是在控制台应用程序中使用HybridConnectionListener的示例:

using Microsoft.Azure.Relay;
using Newtonsoft.Json.Linq;
using System;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading.Tasks;

namespace ConsoleApp3
{
    class Program
    {
        static async Task Main(string[] args)
        {
            string connectionString = ConfigurationManager.AppSettings["HybridConnection"];
            HybridConnectionListener listener = null;

            try
            {
                listener = new HybridConnectionListener(connectionString);
                listener.Connecting += (o,hce) =>
                {
                    Console.ForegroundColor = ConsoleColor.White;
                    Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Connecting,listener:{listener.Address}");
                    Console.ResetColor();
                };
                listener.Online += (o,hce) =>
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Online,listener:{listener.Address}");
                    Console.ResetColor();
                };
                listener.Offline += (o,hce) =>
                {
                    Console.ForegroundColor = ConsoleColor.Blue;
                    Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Offline,listener:{listener.Address}");
                    Console.ResetColor();
                };

                listener.RequestHandler = (context) =>
                {
                    try
                    {
                        if (!context.Request.Headers.AllKeys.Contains("Aeg-Event-Type",StringComparer.OrdinalIgnoreCase) || !string.Equals(context.Request.Headers["Aeg-Event-Type"],"Notification",StringComparison.CurrentCultureIgnoreCase))
                            throw new Exception("Received message is not for EventGrid subscriber");

                        string jsontext = null;
                        using (var reader = new StreamReader(context.Request.InputStream))
                        {
                            var jtoken = JToken.Parse(reader.ReadToEnd());
                            if (jtoken is JArray)
                                jsontext = jtoken.SingleOrDefault<JToken>().ToString(Newtonsoft.Json.Formatting.Indented);
                            else if (jtoken is JObject)
                                jsontext = jtoken.ToString(Newtonsoft.Json.Formatting.Indented);
                            else if (jtoken is JValue)
                                throw new Exception($"The payload (JValue) is not accepted. JValue={jtoken.ToString(Newtonsoft.Json.Formatting.None)}");
                        }

                        Console.ForegroundColor = ConsoleColor.DarkYellow;
                        Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] Headers: {string.Join(" | ",context.Request.Headers.AllKeys.Where(i => i.StartsWith("aeg-") || i.StartsWith("Content-Type")).Select(i => $"{i}={context.Request.Headers[i]}"))}");
                        Console.ForegroundColor = ConsoleColor.Yellow;
                        Console.WriteLine($"{jsontext}");
                                             
                    }
                    catch (Exception ex)
                    {
                        Console.ForegroundColor = ConsoleColor.Red;
                        Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Message processing failed - {ex.Message}");
                    }
                    finally
                    {
                        context.Response.StatusCode = HttpStatusCode.NoContent;
                        context.Response.Close();
                        Console.ResetColor();
                    }
                };
                await listener.OpenAsync(TimeSpan.FromSeconds(60));
            }
            catch (Exception ex)
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] Open HybridConnection failed - {ex.Message}");
                Console.ResetColor();
            }

            Console.ReadLine();

            if(listener != null)
                await listener.CloseAsync();
        }
    }
}

使用AEG订阅中的Hybrid Connection作为事件处理程序的目的地,所有事件将被传递到控制台应用程序,如以下屏幕片段所示:

enter image description here

更新:

以下示例显示了订户的实现,该订户的输出绑定到signalR服务。在这种情况下,我们将需要构建两个HttpTrigger函数,一个用于订阅者,另一个用于signalR客户端,以获取特定userId的url和访问令牌:

enter image description here

  1. HttpTriggerGetSignalRinfo 函数:

run.csx:

#r "Microsoft.Azure.WebJobs.Extensions.SignalRService"

using System;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Microsoft.Azure.WebJobs.Extensions.SignalRService;

public static async Task<IActionResult> Run(HttpRequest req,SignalRConnectionInfo connectionInfo,ILogger log)
{
    log.LogInformation($"Info.Url={connectionInfo.Url}");

    return new OkObjectResult(new 
    { 
        url = connectionInfo.Url,accessToken = connectionInfo.AccessToken,}); 
}

function.json:

{
  "bindings": [
    {
      "authLevel": "function","name": "req","type": "httpTrigger","direction": "in","methods": [
        "get"
      ]
    },{
      "type": "signalRConnectionInfo","name": "connectionInfo","hubName": "%AzureSignalRHubName%","connectionStringSetting": "AzureSignalRConnectionString","userId": "{query.userid}","direction": "in"
    },{
      "name": "$return","type": "http","direction": "out"
    }
  ]
}
  1. signalR客户端-控制台应用程序:

     using Microsoft.AspNetCore.SignalR.Client;
     using Newtonsoft.Json;
     using Newtonsoft.Json.Linq;
     using System;
     using System.Configuration;
     using System.Net.Http;
     using System.Threading.Tasks;
    
     namespace ConsoleApp4
     {
         class Program
         {
             static async Task Main(string[] args)
             {
                 HubConnection connection = null;
                 string userId = ConfigurationManager.AppSettings.Get("userId");
                 string signalRInfo = ConfigurationManager.AppSettings.Get("signalRInfo");
    
                 try
                 {
                     using (var client = new HttpClient())
                     {
                         var rsp = await client.GetAsync($"{signalRInfo}&userid={userId}");
                         string jsontext = await rsp.Content.ReadAsStringAsync();
                         var info = JsonConvert.DeserializeAnonymousType(jsontext,new { url = "",accessToken = "" });
    
                         connection = new HubConnectionBuilder()
                             .WithUrl(info.url,option =>
                             {
                             option.AccessTokenProvider = () =>
                                 {
                                     return Task.FromResult(info.accessToken);
                                 };
                             }).Build();
    
                         Console.ForegroundColor = ConsoleColor.Green;
                         Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] SignalR Client on {info.url}/users/{userId}");
                         Console.ResetColor();
                     }
    
                     connection.On<string,string>("SendMessage",(string headers,string message) =>
                     {
                         Console.ForegroundColor = ConsoleColor.DarkYellow;
                         Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] {headers}");
                         Console.ForegroundColor = ConsoleColor.Yellow;
                         Console.WriteLine($"{JToken.Parse(message).ToString(Formatting.Indented)}");
                         Console.ResetColor();
                     });
    
                     await connection.StartAsync();              
                 }
                 catch (Exception ex)
                 {
                     Console.ForegroundColor = ConsoleColor.Red;
                     Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] Open HybridConnection failed - {ex.Message}");
                     Console.ResetColor();
                 }
                 Console.ReadLine();
                 if (connection != null)
                     await connection.StopAsync();
             }       
         }
     }
    
  2. HttpTriggerSendMsgToSignalR 函数-订户

run.csx:

#r "Microsoft.Azure.WebJobs.Extensions.SignalRService"
#r "Newtonsoft.Json"

using System.Net;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.WebJobs.Extensions.SignalRService;

public static async Task<IActionResult> Run(HttpRequest req,IAsyncCollector<SignalRMessage> signalRMessages,ILogger log)
{   
    string headers = string.Join(" | ",req.Headers.Where(h => h.Key.StartsWith("aeg-") || h.Key.StartsWith("Content-Type")).Select(i => $"{i.Key}={i.Value.First()}")); 
    log.LogInformation($"Method: {req.Method} Headers: {headers}");    
          
    if (req.Method == HttpMethod.Options.ToString())
    {
        log.LogInformation("CloudEventSchema validation");               
        req.HttpContext.Response.Headers.Add("Webhook-Allowed-Origin",req.Headers["WebHook-Request-Origin"].FirstOrDefault()?.Trim());
        return (ActionResult)new OkResult();
    }
    
    var jtoken = JToken.Parse(await new StreamReader(req.Body).ReadToEndAsync());
    string eventTypeHeader = req.Headers["aeg-event-type"].FirstOrDefault()?.Trim(); 

    if(eventTypeHeader == "SubscriptionValidation") 
    {       
        if(jtoken is JArray)
            jtoken = jtoken.SingleOrDefault<JToken>();

        if(jtoken["eventType"].Value<string>() == "Microsoft.EventGrid.SubscriptionValidationEvent")
        {
            log.LogInformation("EventGridSchema validation");
            return (ActionResult)new OkObjectResult(new { validationResponse = ((dynamic)jtoken["data"]["validationCode"])});         
        }           
        return new BadRequestObjectResult($"Not valid event schema");
    }   
    else if(eventTypeHeader == "Notification") 
    {          
        await signalRMessages.AddAsync(
            new SignalRMessage
            {
                // the message will only be sent to these user IDs or if this property not exit,the bindig path will be used it
                Target = "SendMessage",Arguments = new[] { headers,jtoken.ToString() }
            });        
        return (ActionResult)new OkResult();  
    }
     
    return new BadRequestObjectResult($"{eventTypeHeader} is not a valid type");
}

function.json:

{
  "bindings": [
    {
      "authLevel": "function","methods": [
        "options","post"
      ]
    },{
      "type": "signalR","name": "signalRMessages","hubName": "%AzureSignalRHubName%/users/{query.userid}","direction": "out"
    },"direction": "out"
    }
  ]
}

请注意,webhook事件处理程序用于订户是出于两个原因,例如传递CloudEvent消息和通过url查询字符串参数来配置signalR客户端userId。

  1. 在控制台应用程序上显示 userid = abcd 的事件:

enter image description here

请注意,与混合连接相反,signalR客户端实例允许针对同一用户标识多播消息,在混合连接中,消息在侦听器实例之间保持平衡。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...