c# - 在 C# 中读取事件中心存档文件

标签 c# azure avro azure-eventhub

是否有任何 C# 示例代码用于读取 Azure 事件中心存档文件(Avro 格式)?

我正在尝试使用 Microsoft.Hadoop.Avro 库。我使用 java avro 工具转储了架构,该工具生成以下内容:

{

                ""type"":""record"",
                ""name"":""EventData"",
                ""namespace"":""Microsoft.ServiceBus.Messaging"",
                ""fields"":[
                             {""name"":""SequenceNumber"",""type"":""long""},
                             {""name"":""Offset"",""type"":""string""},
                             {""name"":""EnqueuedTimeUtc"",""type"":""string""},
                             {""name"":""SystemProperties"",""type"":{ ""type"":""map"",""values"":[""long"",""double"",""string"",""bytes""]}},
                             {""name"":""Properties"",""type"":{ ""type"":""map"",""values"":[""long"",""double"",""string"",""bytes"", ""null""]}},
                             {""name"":""Body"",""type"":[""null"",""bytes""]}
                         ]
                }

但是,当尝试反序列化文件以读回数据时,如下所示:

using (var reader = AvroContainer.CreateReader<EventData>(stream))
            {
                using (var streamReader = new SequentialReader<EventData>(reader))
                {
                    foreach (EventData dta in streamReader.Objects)
                    {
                        //stuff here
                    }

                }
            }

传递生产者端使用的实际 EventData 类型时不起作用,因此我尝试创建一个用 DataContract 属性标记的特殊类,如下所示:

[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
public class EventData
{
    [DataMember(Name = "SequenceNumber")]
    public long SequenceNumber { get; set; }

    [DataMember(Name = "Offset")]
    public string Offset { get; set; }

    [DataMember(Name = "EnqueuedTimeUtc")]
    public string EnqueuedTimeUtc { get; set; }

    [DataMember(Name = "Body")]
    public ArraySegment<byte> Body { get; set; }

    //[DataMember(Name = "SystemProperties")]
    //public SystemPropertiesCollection SystemProperties { get; set; }

    //[DataMember(Name = "Properties")]
    //public IDictionary<string, object> Properties { get; set; }
}

出现以下错误:

System.Runtime.Serialization.SerializationException occurred
Message=Cannot match the union schema.

对于使用 C# 读取 Avro 存档文件的用例,MS 中没有示例代码吗?

最佳答案

如果您尝试使用 Microsoft.Hadoop.Avro 库读取 Avro 文件,则可以使用以下类:

[DataContract(Name = "EventData", Namespace = "Microsoft.ServiceBus.Messaging")]
class EventData
{
    [DataMember(Name = "SequenceNumber")]
    public long SequenceNumber { get; set; }

    [DataMember(Name = "Offset")]
    public string Offset { get; set; }

    [DataMember(Name = "EnqueuedTimeUtc")]
    public DateTime EnqueuedTimeUtc { get; set; }

    [DataMember(Name = "SystemProperties")]
    public Dictionary<string, object> SystemProperties { get; set; }

    [DataMember(Name = "Properties")]
    public Dictionary<string, object> Properties { get; set; } 

    [DataMember(Name = "Body")]
    public byte[] Body { get; set; }

    public EventData(dynamic record)
    {
        SequenceNumber = (long)record.SequenceNumber;
        Offset = (string)record.Offset;
        DateTime.TryParse((string)record.EnqueuedTimeUtc, out var enqueuedTimeUtc);
        EnqueuedTimeUtc = enqueuedTimeUtc;
        SystemProperties = (Dictionary<string, object>)record.SystemProperties;
        Properties = (Dictionary<string, object>)record.Properties;
        Body = (byte[])record.Body;
    }

}

当您读取 avro 文件时,您可以将其作为动态对象读取,然后将其序列化。这是一个例子:

var reader = AvroContainer.CreateGenericReader(stream);
while (reader.MoveNext()) 
{
   foreach (dynamic record in reader.Current.Objects)
   {
       var eventData = new EventData(record);
       var sequenceNumber = eventData.SequenceNumber;
       var bodyText = Encoding.UTF8.GetString(eventData.Body);
       var properties = eventData.Properties;
       var sysProperties = eventData.SystemProperties;
   }
}

您可以引用this answer更多细节。

关于c# - 在 C# 中读取事件中心存档文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43993644/

相关文章:

python - databricks 之外是否有另一种/类似的 Spark.read.format.load 方法?

C# - 如何对对象本身进行 xml 反序列化?

c# - 对 C# Controller 的 HTTP POST 请求

python - 使用 Python 将 XSD(XML Schema)转换为 AVSC(Avro Schema)

sql - 如何在 Visual Studio 中更改 Azure 数据库表的列顺序

azure - 如何使用 PowerShell 添加应用程序注册 key 而不破坏 Azure 门户?

avro - 如何使用 Spring-Kafka 通过 Confluence Schema 注册表读取 AVRO 消息?

c# - Windows-Phone-7:检查是否正在播放 soundEffectInstance 实例

c# - 右键单击部署时如何运行 --publish-local-settings?

azure - TFS\Team Services,在生成定义中使用变量组中的 Azure KeyVault secret