问题描述
我想在C#控制台应用程序中订阅Azure事件网格,实际上我正在实现eShopContainer项目中的EventBus示例,我需要订阅一个主题并收听消息,处理和打印发送的消息在另一个实现EventBus的C#控制台应用程序之前。因此,¿该如何使用C#控制台应用程序来做到这一点?
这是我的azure门户,邮件存储在队列存储中:
这是所有消息所在的队列:
所以,我需要订阅并获取所有消息!
解决方法
基本上,有三种方法可以在Azure事件网格模型中使用控制台订阅服务器。下图显示了它们:
请注意,在我的hybrid connection中使用了ngrok tunnel和Azure Event Grid Tester。看看它们的实现。
以下代码段是在控制台应用程序中使用HybridConnectionListener的示例:
using Microsoft.Azure.Relay;
using Newtonsoft.Json.Linq;
using System;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
namespace ConsoleApp3
{
class Program
{
static async Task Main(string[] args)
{
string connectionString = ConfigurationManager.AppSettings["HybridConnection"];
HybridConnectionListener listener = null;
try
{
listener = new HybridConnectionListener(connectionString);
listener.Connecting += (o,hce) =>
{
Console.ForegroundColor = ConsoleColor.White;
Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Connecting,listener:{listener.Address}");
Console.ResetColor();
};
listener.Online += (o,hce) =>
{
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Online,listener:{listener.Address}");
Console.ResetColor();
};
listener.Offline += (o,hce) =>
{
Console.ForegroundColor = ConsoleColor.Blue;
Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Offline,listener:{listener.Address}");
Console.ResetColor();
};
listener.RequestHandler = (context) =>
{
try
{
if (!context.Request.Headers.AllKeys.Contains("Aeg-Event-Type",StringComparer.OrdinalIgnoreCase) || !string.Equals(context.Request.Headers["Aeg-Event-Type"],"Notification",StringComparison.CurrentCultureIgnoreCase))
throw new Exception("Received message is not for EventGrid subscriber");
string jsontext = null;
using (var reader = new StreamReader(context.Request.InputStream))
{
var jtoken = JToken.Parse(reader.ReadToEnd());
if (jtoken is JArray)
jsontext = jtoken.SingleOrDefault<JToken>().ToString(Newtonsoft.Json.Formatting.Indented);
else if (jtoken is JObject)
jsontext = jtoken.ToString(Newtonsoft.Json.Formatting.Indented);
else if (jtoken is JValue)
throw new Exception($"The payload (JValue) is not accepted. JValue={jtoken.ToString(Newtonsoft.Json.Formatting.None)}");
}
Console.ForegroundColor = ConsoleColor.DarkYellow;
Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] Headers: {string.Join(" | ",context.Request.Headers.AllKeys.Where(i => i.StartsWith("aeg-") || i.StartsWith("Content-Type")).Select(i => $"{i}={context.Request.Headers[i]}"))}");
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine($"{jsontext}");
}
catch (Exception ex)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Message processing failed - {ex.Message}");
}
finally
{
context.Response.StatusCode = HttpStatusCode.NoContent;
context.Response.Close();
Console.ResetColor();
}
};
await listener.OpenAsync(TimeSpan.FromSeconds(60));
}
catch (Exception ex)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] Open HybridConnection failed - {ex.Message}");
Console.ResetColor();
}
Console.ReadLine();
if(listener != null)
await listener.CloseAsync();
}
}
}
使用AEG订阅中的Hybrid Connection作为事件处理程序的目的地,所有事件将被传递到控制台应用程序,如以下屏幕片段所示:
更新:
以下示例显示了订户的实现,该订户的输出绑定到signalR服务。在这种情况下,我们将需要构建两个HttpTrigger函数,一个用于订阅者,另一个用于signalR客户端,以获取特定userId的url和访问令牌:
- HttpTriggerGetSignalRinfo 函数:
run.csx:
#r "Microsoft.Azure.WebJobs.Extensions.SignalRService"
using System;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Microsoft.Azure.WebJobs.Extensions.SignalRService;
public static async Task<IActionResult> Run(HttpRequest req,SignalRConnectionInfo connectionInfo,ILogger log)
{
log.LogInformation($"Info.Url={connectionInfo.Url}");
return new OkObjectResult(new
{
url = connectionInfo.Url,accessToken = connectionInfo.AccessToken,});
}
function.json:
{
"bindings": [
{
"authLevel": "function","name": "req","type": "httpTrigger","direction": "in","methods": [
"get"
]
},{
"type": "signalRConnectionInfo","name": "connectionInfo","hubName": "%AzureSignalRHubName%","connectionStringSetting": "AzureSignalRConnectionString","userId": "{query.userid}","direction": "in"
},{
"name": "$return","type": "http","direction": "out"
}
]
}
-
signalR客户端-控制台应用程序:
using Microsoft.AspNetCore.SignalR.Client; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System; using System.Configuration; using System.Net.Http; using System.Threading.Tasks; namespace ConsoleApp4 { class Program { static async Task Main(string[] args) { HubConnection connection = null; string userId = ConfigurationManager.AppSettings.Get("userId"); string signalRInfo = ConfigurationManager.AppSettings.Get("signalRInfo"); try { using (var client = new HttpClient()) { var rsp = await client.GetAsync($"{signalRInfo}&userid={userId}"); string jsontext = await rsp.Content.ReadAsStringAsync(); var info = JsonConvert.DeserializeAnonymousType(jsontext,new { url = "",accessToken = "" }); connection = new HubConnectionBuilder() .WithUrl(info.url,option => { option.AccessTokenProvider = () => { return Task.FromResult(info.accessToken); }; }).Build(); Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] SignalR Client on {info.url}/users/{userId}"); Console.ResetColor(); } connection.On<string,string>("SendMessage",(string headers,string message) => { Console.ForegroundColor = ConsoleColor.DarkYellow; Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] {headers}"); Console.ForegroundColor = ConsoleColor.Yellow; Console.WriteLine($"{JToken.Parse(message).ToString(Formatting.Indented)}"); Console.ResetColor(); }); await connection.StartAsync(); } catch (Exception ex) { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] Open HybridConnection failed - {ex.Message}"); Console.ResetColor(); } Console.ReadLine(); if (connection != null) await connection.StopAsync(); } } }
-
HttpTriggerSendMsgToSignalR 函数-订户
run.csx:
#r "Microsoft.Azure.WebJobs.Extensions.SignalRService"
#r "Newtonsoft.Json"
using System.Net;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.WebJobs.Extensions.SignalRService;
public static async Task<IActionResult> Run(HttpRequest req,IAsyncCollector<SignalRMessage> signalRMessages,ILogger log)
{
string headers = string.Join(" | ",req.Headers.Where(h => h.Key.StartsWith("aeg-") || h.Key.StartsWith("Content-Type")).Select(i => $"{i.Key}={i.Value.First()}"));
log.LogInformation($"Method: {req.Method} Headers: {headers}");
if (req.Method == HttpMethod.Options.ToString())
{
log.LogInformation("CloudEventSchema validation");
req.HttpContext.Response.Headers.Add("Webhook-Allowed-Origin",req.Headers["WebHook-Request-Origin"].FirstOrDefault()?.Trim());
return (ActionResult)new OkResult();
}
var jtoken = JToken.Parse(await new StreamReader(req.Body).ReadToEndAsync());
string eventTypeHeader = req.Headers["aeg-event-type"].FirstOrDefault()?.Trim();
if(eventTypeHeader == "SubscriptionValidation")
{
if(jtoken is JArray)
jtoken = jtoken.SingleOrDefault<JToken>();
if(jtoken["eventType"].Value<string>() == "Microsoft.EventGrid.SubscriptionValidationEvent")
{
log.LogInformation("EventGridSchema validation");
return (ActionResult)new OkObjectResult(new { validationResponse = ((dynamic)jtoken["data"]["validationCode"])});
}
return new BadRequestObjectResult($"Not valid event schema");
}
else if(eventTypeHeader == "Notification")
{
await signalRMessages.AddAsync(
new SignalRMessage
{
// the message will only be sent to these user IDs or if this property not exit,the bindig path will be used it
Target = "SendMessage",Arguments = new[] { headers,jtoken.ToString() }
});
return (ActionResult)new OkResult();
}
return new BadRequestObjectResult($"{eventTypeHeader} is not a valid type");
}
function.json:
{
"bindings": [
{
"authLevel": "function","methods": [
"options","post"
]
},{
"type": "signalR","name": "signalRMessages","hubName": "%AzureSignalRHubName%/users/{query.userid}","direction": "out"
},"direction": "out"
}
]
}
请注意,webhook事件处理程序用于订户是出于两个原因,例如传递CloudEvent消息和通过url查询字符串参数来配置signalR客户端userId。
- 在控制台应用程序上显示 userid = abcd 的事件:
请注意,与混合连接相反,signalR客户端实例允许针对同一用户标识多播消息,在混合连接中,消息在侦听器实例之间保持平衡。