我想在C#控制台应用程序中订阅Azure事件网格,实际上我正在实现eShopContainer项目中的EventBus示例,我需要订阅一个主题并监听消息,处理和打印发送的消息之前是另一个实现 EventBus 的 C# 控制台应用程序。那么,如何使用 C# 控制台应用程序来做到这一点?
这是我的天蓝色门户,消息存储在队列存储中:
这是所有消息所在的队列:
所以,我需要订阅并获取所有消息!
最佳答案
基本上,可以通过三种方式在 Azure 事件网格模型中使用控制台订阅者。下图显示了它们:
请注意,hybrid connection和 ngrok tunnel用于我的 Azure Event Grid Tester 。看看他们的实现。
以下代码片段是在控制台应用程序中使用 HybridConnectionListener 的示例:
using Microsoft.Azure.Relay;
using Newtonsoft.Json.Linq;
using System;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
namespace ConsoleApp3
{
class Program
{
static async Task Main(string[] args)
{
string connectionString = ConfigurationManager.AppSettings["HybridConnection"];
HybridConnectionListener listener = null;
try
{
listener = new HybridConnectionListener(connectionString);
listener.Connecting += (o, hce) =>
{
Console.ForegroundColor = ConsoleColor.White;
Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Connecting, listener:{listener.Address}");
Console.ResetColor();
};
listener.Online += (o, hce) =>
{
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Online, listener:{listener.Address}");
Console.ResetColor();
};
listener.Offline += (o, hce) =>
{
Console.ForegroundColor = ConsoleColor.Blue;
Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Offline, listener:{listener.Address}");
Console.ResetColor();
};
listener.RequestHandler = (context) =>
{
try
{
if (!context.Request.Headers.AllKeys.Contains("Aeg-Event-Type", StringComparer.OrdinalIgnoreCase) || !string.Equals(context.Request.Headers["Aeg-Event-Type"], "Notification", StringComparison.CurrentCultureIgnoreCase))
throw new Exception("Received message is not for EventGrid subscriber");
string jsontext = null;
using (var reader = new StreamReader(context.Request.InputStream))
{
var jtoken = JToken.Parse(reader.ReadToEnd());
if (jtoken is JArray)
jsontext = jtoken.SingleOrDefault<JToken>().ToString(Newtonsoft.Json.Formatting.Indented);
else if (jtoken is JObject)
jsontext = jtoken.ToString(Newtonsoft.Json.Formatting.Indented);
else if (jtoken is JValue)
throw new Exception($"The payload (JValue) is not accepted. JValue={jtoken.ToString(Newtonsoft.Json.Formatting.None)}");
}
Console.ForegroundColor = ConsoleColor.DarkYellow;
Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] Headers: {string.Join(" | ", context.Request.Headers.AllKeys.Where(i => i.StartsWith("aeg-") || i.StartsWith("Content-Type")).Select(i => $"{i}={context.Request.Headers[i]}"))}");
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine($"{jsontext}");
}
catch (Exception ex)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] HybridConnection: Message processing failed - {ex.Message}");
}
finally
{
context.Response.StatusCode = HttpStatusCode.NoContent;
context.Response.Close();
Console.ResetColor();
}
};
await listener.OpenAsync(TimeSpan.FromSeconds(60));
}
catch (Exception ex)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] Open HybridConnection failed - {ex.Message}");
Console.ResetColor();
}
Console.ReadLine();
if(listener != null)
await listener.CloseAsync();
}
}
}
在事件处理程序目标的 AEG 订阅中使用混合连接,所有事件都将传递到控制台应用程序,如以下屏幕片段所示:
更新:
以下示例显示了订阅者的实现,其输出绑定(bind)到 signalR 服务。在这种情况下,我们需要构建两个 HttpTrigger 函数,一个用于订阅者,另一个用于 signalR 客户端,以获取特定 userId 的 url 和访问 token :
- HttpTriggerGetSignalRinfo函数:
运行.csx:
#r "Microsoft.Azure.WebJobs.Extensions.SignalRService"
using System;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Microsoft.Azure.WebJobs.Extensions.SignalRService;
public static async Task<IActionResult> Run(HttpRequest req, SignalRConnectionInfo connectionInfo, ILogger log)
{
log.LogInformation($"Info.Url={connectionInfo.Url}");
return new OkObjectResult(new
{
url = connectionInfo.Url,
accessToken = connectionInfo.AccessToken,
});
}
函数.json:
{
"bindings": [
{
"authLevel": "function",
"name": "req",
"type": "httpTrigger",
"direction": "in",
"methods": [
"get"
]
},
{
"type": "signalRConnectionInfo",
"name": "connectionInfo",
"hubName": "%AzureSignalRHubName%",
"connectionStringSetting": "AzureSignalRConnectionString",
"userId": "{query.userid}",
"direction": "in"
},
{
"name": "$return",
"type": "http",
"direction": "out"
}
]
}
signalR 客户端 - 控制台应用程序:
using Microsoft.AspNetCore.SignalR.Client; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System; using System.Configuration; using System.Net.Http; using System.Threading.Tasks; namespace ConsoleApp4 { class Program { static async Task Main(string[] args) { HubConnection connection = null; string userId = ConfigurationManager.AppSettings.Get("userId"); string signalRInfo = ConfigurationManager.AppSettings.Get("signalRInfo"); try { using (var client = new HttpClient()) { var rsp = await client.GetAsync($"{signalRInfo}&userid={userId}"); string jsontext = await rsp.Content.ReadAsStringAsync(); var info = JsonConvert.DeserializeAnonymousType(jsontext, new { url = "", accessToken = "" }); connection = new HubConnectionBuilder() .WithUrl(info.url, option => { option.AccessTokenProvider = () => { return Task.FromResult(info.accessToken); }; }).Build(); Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] SignalR Client on {info.url}/users/{userId}"); Console.ResetColor(); } connection.On<string, string>("SendMessage", (string headers, string message) => { Console.ForegroundColor = ConsoleColor.DarkYellow; Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] {headers}"); Console.ForegroundColor = ConsoleColor.Yellow; Console.WriteLine($"{JToken.Parse(message).ToString(Formatting.Indented)}"); Console.ResetColor(); }); await connection.StartAsync(); } catch (Exception ex) { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine($"[{DateTime.Now.ToLocalTime().ToString("yyyy-MM-ddTHH:MM:ss.fff")}] Open HybridConnection failed - {ex.Message}"); Console.ResetColor(); } Console.ReadLine(); if (connection != null) await connection.StopAsync(); } } }
HttpTriggerSendMsgToSignalR 函数 - 订阅者
运行.csx:
#r "Microsoft.Azure.WebJobs.Extensions.SignalRService"
#r "Newtonsoft.Json"
using System.Net;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.WebJobs.Extensions.SignalRService;
public static async Task<IActionResult> Run(HttpRequest req, IAsyncCollector<SignalRMessage> signalRMessages, ILogger log)
{
string headers = string.Join(" | ", req.Headers.Where(h => h.Key.StartsWith("aeg-") || h.Key.StartsWith("Content-Type")).Select(i => $"{i.Key}={i.Value.First()}"));
log.LogInformation($"Method: {req.Method} Headers: {headers}");
if (req.Method == HttpMethod.Options.ToString())
{
log.LogInformation("CloudEventSchema validation");
req.HttpContext.Response.Headers.Add("Webhook-Allowed-Origin", req.Headers["WebHook-Request-Origin"].FirstOrDefault()?.Trim());
return (ActionResult)new OkResult();
}
var jtoken = JToken.Parse(await new StreamReader(req.Body).ReadToEndAsync());
string eventTypeHeader = req.Headers["aeg-event-type"].FirstOrDefault()?.Trim();
if(eventTypeHeader == "SubscriptionValidation")
{
if(jtoken is JArray)
jtoken = jtoken.SingleOrDefault<JToken>();
if(jtoken["eventType"].Value<string>() == "Microsoft.EventGrid.SubscriptionValidationEvent")
{
log.LogInformation("EventGridSchema validation");
return (ActionResult)new OkObjectResult(new { validationResponse = ((dynamic)jtoken["data"]["validationCode"])});
}
return new BadRequestObjectResult($"Not valid event schema");
}
else if(eventTypeHeader == "Notification")
{
await signalRMessages.AddAsync(
new SignalRMessage
{
// the message will only be sent to these user IDs or if this property not exit, the bindig path will be used it
Target = "SendMessage",
Arguments = new[] { headers, jtoken.ToString() }
});
return (ActionResult)new OkResult();
}
return new BadRequestObjectResult($"{eventTypeHeader} is not a valid type");
}
函数.json:
{
"bindings": [
{
"authLevel": "function",
"name": "req",
"type": "httpTrigger",
"direction": "in",
"methods": [
"options",
"post"
]
},
{
"type": "signalR",
"name": "signalRMessages",
"hubName": "%AzureSignalRHubName%/users/{query.userid}",
"connectionStringSetting": "AzureSignalRConnectionString",
"direction": "out"
},
{
"name": "$return",
"type": "http",
"direction": "out"
}
]
}
请注意,Webhook 事件处理程序用于订阅者有两个原因,例如传递 CloudEvent 消息和通过 url 查询字符串参数配置 signalR 客户端 userId。
- 在控制台应用上显示 userid=abcd 的事件:
请注意,signalR 客户端实例允许为同一用户 ID 多播消息,这与混合连接相反,其中消息在监听器实例之间进行平衡。
关于c# - Azure 事件网格订阅控制台应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64764117/