c# - Azure 事件网格订阅控制台应用程序

标签 c# azure-eventgrid

我想在C#控制台应用程序中订阅Azure事件网格,实际上我正在实现eShopContainer项目中的EventBus示例,我需要订阅一个主题并监听消息,处理和打印发送的消息之前是另一个实现 EventBus 的 C# 控制台应用程序。那么,如何使用 C# 控制台应用程序来做到这一点?

这是我的天蓝色门户,消息存储在队列存储中:

azure portal subscriptions

这是所有消息所在的队列:

all messages

所以,我需要订阅并获取所有消息!

最佳答案

基本上,可以通过三种方式在 Azure 事件网格模型中使用控制台订阅者。下图显示了它们:

enter image description here

请注意,hybrid connectionngrok tunnel用于我的 Azure Event Grid Tester 。看看他们的实现。

以下代码片段是在控制台应用程序中使用 HybridConnectionListener 的示例:

using Microsoft.Azure.Relay;
using Newtonsoft.Json.Linq;
using System;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading.Tasks;

namespace ConsoleApp3
{
    class Program
    {
        static async Task Main(string[] args)
        {
            string connectionString = ConfigurationManager.AppSettings["HybridConnection"];
            HybridConnectionListener listener = null;

            try
            {
                listener = new HybridConnectionListener(connectionString);
                listener.Connecting += (o, hce) =>
                {
                    Console.ForegroundColor = ConsoleColor.White;
                    Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Connecting, listener:{listener.Address}");
                    Console.ResetColor();
                };
                listener.Online += (o, hce) =>
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Online, listener:{listener.Address}");
                    Console.ResetColor();
                };
                listener.Offline += (o, hce) =>
                {
                    Console.ForegroundColor = ConsoleColor.Blue;
                    Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Offline, listener:{listener.Address}");
                    Console.ResetColor();
                };

                listener.RequestHandler = (context) =>
                {
                    try
                    {
                        if (!context.Request.Headers.AllKeys.Contains("Aeg-Event-Type", StringComparer.OrdinalIgnoreCase) || !string.Equals(context.Request.Headers["Aeg-Event-Type"], "Notification", StringComparison.CurrentCultureIgnoreCase))
                            throw new Exception("Received message is not for EventGrid subscriber");

                        string jsontext = null;
                        using (var reader = new StreamReader(context.Request.InputStream))
                        {
                            var jtoken = JToken.Parse(reader.ReadToEnd());
                            if (jtoken is JArray)
                                jsontext = jtoken.SingleOrDefault<JToken>().ToString(Newtonsoft.Json.Formatting.Indented);
                            else if (jtoken is JObject)
                                jsontext = jtoken.ToString(Newtonsoft.Json.Formatting.Indented);
                            else if (jtoken is JValue)
                                throw new Exception($"The payload (JValue) is not accepted. JValue={jtoken.ToString(Newtonsoft.Json.Formatting.None)}");
                        }

                        Console.ForegroundColor = ConsoleColor.DarkYellow;
                        Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] Headers: {string.Join(" | ", context.Request.Headers.AllKeys.Where(i => i.StartsWith("aeg-") || i.StartsWith("Content-Type")).Select(i => $"{i}={context.Request.Headers[i]}"))}");
                        Console.ForegroundColor = ConsoleColor.Yellow;
                        Console.WriteLine($"{jsontext}");
                                             
                    }
                    catch (Exception ex)
                    {
                        Console.ForegroundColor = ConsoleColor.Red;
                        Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Message processing failed - {ex.Message}");
                    }
                    finally
                    {
                        context.Response.StatusCode = HttpStatusCode.NoContent;
                        context.Response.Close();
                        Console.ResetColor();
                    }
                };
                await listener.OpenAsync(TimeSpan.FromSeconds(60));
            }
            catch (Exception ex)
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] Open HybridConnection failed - {ex.Message}");
                Console.ResetColor();
            }

            Console.ReadLine();

            if(listener != null)
                await listener.CloseAsync();
        }
    }
}

在事件处理程序目标的 AEG 订阅中使用混合连接,所有事件都将传递到控制台应用程序,如以下屏幕片段所示:

enter image description here

更新:

以下示例显示了订阅者的实现,其输出绑定(bind)到 signalR 服务。在这种情况下,我们需要构建两个 HttpTrigger 函数,一个用于订阅者,另一个用于 signalR 客户端,以获取特定 userId 的 url 和访问 token :

enter image description here

  1. HttpTriggerGetSignalRinfo函数:

运行.csx:

#r "Microsoft.Azure.WebJobs.Extensions.SignalRService"

using System;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Microsoft.Azure.WebJobs.Extensions.SignalRService;

public static async Task<IActionResult> Run(HttpRequest req, SignalRConnectionInfo connectionInfo, ILogger log)
{
    log.LogInformation($"Info.Url={connectionInfo.Url}");

    return new OkObjectResult(new 
    { 
        url = connectionInfo.Url, 
        accessToken = connectionInfo.AccessToken,
    }); 
}

函数.json:

{
  "bindings": [
    {
      "authLevel": "function",
      "name": "req",
      "type": "httpTrigger",
      "direction": "in",
      "methods": [
        "get"
      ]
    },
    {
      "type": "signalRConnectionInfo",
      "name": "connectionInfo",
      "hubName": "%AzureSignalRHubName%",
      "connectionStringSetting": "AzureSignalRConnectionString",
      "userId": "{query.userid}",
      "direction": "in"
    },
    {
      "name": "$return",
      "type": "http",
      "direction": "out"
    }
  ]
}
  • signalR 客户端 - 控制台应用程序:

     using Microsoft.AspNetCore.SignalR.Client;
     using Newtonsoft.Json;
     using Newtonsoft.Json.Linq;
     using System;
     using System.Configuration;
     using System.Net.Http;
     using System.Threading.Tasks;
    
     namespace ConsoleApp4
     {
         class Program
         {
             static async Task Main(string[] args)
             {
                 HubConnection connection = null;
                 string userId = ConfigurationManager.AppSettings.Get("userId");
                 string signalRInfo = ConfigurationManager.AppSettings.Get("signalRInfo");
    
                 try
                 {
                     using (var client = new HttpClient())
                     {
                         var rsp = await client.GetAsync($"{signalRInfo}&userid={userId}");
                         string jsontext = await rsp.Content.ReadAsStringAsync();
                         var info = JsonConvert.DeserializeAnonymousType(jsontext, new { url = "", accessToken = "" });
    
                         connection = new HubConnectionBuilder()
                             .WithUrl(info.url, option =>
                             {
                             option.AccessTokenProvider = () =>
                                 {
                                     return Task.FromResult(info.accessToken);
                                 };
                             }).Build();
    
                         Console.ForegroundColor = ConsoleColor.Green;
                         Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] SignalR Client on {info.url}/users/{userId}");
                         Console.ResetColor();
                     }
    
                     connection.On<string, string>("SendMessage", (string headers, string message) =>
                     {
                         Console.ForegroundColor = ConsoleColor.DarkYellow;
                         Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] {headers}");
                         Console.ForegroundColor = ConsoleColor.Yellow;
                         Console.WriteLine($"{JToken.Parse(message).ToString(Formatting.Indented)}");
                         Console.ResetColor();
                     });
    
                     await connection.StartAsync();              
                 }
                 catch (Exception ex)
                 {
                     Console.ForegroundColor = ConsoleColor.Red;
                     Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] Open HybridConnection failed - {ex.Message}");
                     Console.ResetColor();
                 }
                 Console.ReadLine();
                 if (connection != null)
                     await connection.StopAsync();
             }       
         }
     }
    
  • HttpTriggerSendMsgToSignalR 函数 - 订阅者

  • 运行.csx:

    #r "Microsoft.Azure.WebJobs.Extensions.SignalRService"
    #r "Newtonsoft.Json"
    
    using System.Net;
    using Microsoft.AspNetCore.Mvc;
    using Microsoft.Extensions.Primitives;
    using Newtonsoft.Json;
    using Newtonsoft.Json.Linq;
    using Microsoft.Azure.WebJobs.Extensions.SignalRService;
    
    public static async Task<IActionResult> Run(HttpRequest req, IAsyncCollector<SignalRMessage> signalRMessages, ILogger log)
    {   
        string headers = string.Join(" | ", req.Headers.Where(h => h.Key.StartsWith("aeg-") || h.Key.StartsWith("Content-Type")).Select(i => $"{i.Key}={i.Value.First()}")); 
        log.LogInformation($"Method: {req.Method} Headers: {headers}");    
              
        if (req.Method == HttpMethod.Options.ToString())
        {
            log.LogInformation("CloudEventSchema validation");               
            req.HttpContext.Response.Headers.Add("Webhook-Allowed-Origin", req.Headers["WebHook-Request-Origin"].FirstOrDefault()?.Trim());
            return (ActionResult)new OkResult();
        }
        
        var jtoken = JToken.Parse(await new StreamReader(req.Body).ReadToEndAsync());
        string eventTypeHeader = req.Headers["aeg-event-type"].FirstOrDefault()?.Trim(); 
    
        if(eventTypeHeader == "SubscriptionValidation") 
        {       
            if(jtoken is JArray)
                jtoken = jtoken.SingleOrDefault<JToken>();
    
            if(jtoken["eventType"].Value<string>() == "Microsoft.EventGrid.SubscriptionValidationEvent")
            {
                log.LogInformation("EventGridSchema validation");
                return (ActionResult)new OkObjectResult(new { validationResponse = ((dynamic)jtoken["data"]["validationCode"])});         
            }           
            return new BadRequestObjectResult($"Not valid event schema");
        }   
        else if(eventTypeHeader == "Notification") 
        {          
            await signalRMessages.AddAsync(
                new SignalRMessage
                {
                    // the message will only be sent to these user IDs or if this property not exit, the bindig path will be used it
                    Target = "SendMessage",
                    Arguments = new[] { headers, jtoken.ToString() }
                });        
            return (ActionResult)new OkResult();  
        }
         
        return new BadRequestObjectResult($"{eventTypeHeader} is not a valid type");
    }
    

    函数.json:

    {
      "bindings": [
        {
          "authLevel": "function",
          "name": "req",
          "type": "httpTrigger",
          "direction": "in",
          "methods": [
            "options",
            "post"
          ]
        },
        {
          "type": "signalR",
          "name": "signalRMessages",
          "hubName": "%AzureSignalRHubName%/users/{query.userid}",
          "connectionStringSetting": "AzureSignalRConnectionString",
          "direction": "out"
        },
        {
          "name": "$return",
          "type": "http",
          "direction": "out"
        }
      ]
    }
    

    请注意,Webhook 事件处理程序用于订阅者有两个原因,例如传递 CloudEvent 消息和通过 url 查询字符串参数配置 signalR 客户端 userId。

  • 在控制台应用上显示 userid=abcd 的事件:
  • enter image description here

    请注意,signalR 客户端实例允许为同一用户 ID 多播消息,这与混合连接相反,其中消息在监听器实例之间进行平衡。

    关于c# - Azure 事件网格订阅控制台应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64764117/

    相关文章:

    c# - 使用 ASP.NET 按钮显示的 jQuery UI 对话框

    要检查的 Azure 事件网格

    c# - 如何解决此 Azure 事件网格订阅错误?

    c# - 从 C# 访问 COM 对象的最佳方式

    javascript - 加载集线器时出错。 HTML5/js 前端和 C# 后端

    c# - 尝试使用 json.Net 反序列化 json 字符串时出错

    c# - 如何将表值参数传递给 .NET Core 中的 Dapper?

    azure-storage - 仅当 Blob 完全提交时才发生 BlobCreated 事件

    azure - 在 azure 中,我应该在哪里添加电子邮件端点以从 azure 自动化 Runbook 发送结果?

    java - Azure 函数、azureFunctionsPackage 任务因 java.lang.NullPointerException 失败