azure - 将 Azure.Messaging.EventHubs 从 5.5.0 升级到 5.9.0。当我使用 'eventData.Body.Array, eventData.Body.Offset' 返回错误时

标签 azure upgrade azure-eventhub

将 Azure.Messaging.EventHubs 从 5.5.0 升级到 5.9.0。

我需要将旧的 Microsoft.Azure.EventHubs.EventData 更改为 Azure.Messaging.EventHubs.EventData。

但是当我使用时:

var decompressed = Decompress(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);

解压缩功能是:

public static string Decompress(byte[] input, int offset, int count)
    {
        // GZip
        using var memoryStream = new MemoryStream(input, offset, count);
        using var decompressedStream = new GZipStream(memoryStream, CompressionMode.Decompress);
        using var streamReader = new StreamReader(decompressedStream, Encoding.UTF8);

        return streamReader.ReadToEnd();
    }

返回:“ReadOnlyMemory”不包含“Array”的定义,并且找不到接受“ReadOnlyMemory”类型的第一个参数的可访问扩展方法“Array” enter image description here

我认为新版本的EventData.Body已更改,但我不知道新的替代功能。 替换“eventData.Body.Array、eventData.Body.Offset、eventData.Body.Count”的新函数是什么?如何找到它?

最佳答案

您可以将事件数据对象包装在数组列表中,然后使用 EventHubProducer 客户端发送数据,如下所示:-

using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using System;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApp
{
    class Program
    {
        static async Task Main(string[] args)
        {
            // Replace the <EVENT_HUBS_CONNECTION_STRING> and <EVENT_HUB_NAME> placeholder values
            var connectionString = "Endpoint=sb://eventhub1.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKeyxxxxxxx+AEhLwhfnY=";
            var eventHubName = "siliconeventhub";

            // Create a new EventHubProducerClient object
            await using (var producerClient = new EventHubProducerClient(connectionString, eventHubName))
            {
                // Create a batch of events
                using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();

                // Add events to the batch
                for (int i = 1; i <= 3; i++)
                {
                    var eventData = new EventData(Encoding.UTF8.GetBytes($"Event {i}"));
                    eventBatch.TryAdd(eventData);
                }

                // Send the batch of events to the event hub
                await producerClient.SendAsync(eventBatch);
            }
        }
    }
}

输出:-

enter image description here

事件已发送:-

enter image description here

现在使用下面的代码接收事件并解压缩它:-

using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.IO;
using System.IO.Compression;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApp
{
    class Program
    {
        static async Task Main(string[] args)
        {
            // Replace the <EVENT_HUBS_CONNECTION_STRING> and <EVENT_HUB_NAME> placeholder values
            var connectionString = "Endpoint=sb://eventhub1.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxx+AEhLwhfnY=";
            var eventHubName = "siliconeventhub";

            // Create a new EventHubConsumerClient object
            await using (var consumerClient = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName))
            {
                // Receive events from the event hub
                await foreach (PartitionEvent partitionEvent in consumerClient.ReadEventsAsync())
                {
                    // Decompress the body of the event
                    var decompressed = Decompress(BinaryData.FromBytes(partitionEvent.Data.Body));

                    // Print the decompressed body of the event
                    Console.WriteLine($"Received event: {decompressed}");
                }
            }
        }

        public static string Decompress(BinaryData input)
        {
            // Check if the data is GZIP-compressed
            if (IsGZipCompressed(input.ToArray()))
            {
                using var memoryStream = new MemoryStream(input.ToArray());
                using var decompressedStream = new GZipStream(memoryStream, CompressionMode.Decompress);
                using var streamReader = new StreamReader(decompressedStream, Encoding.UTF8);

                return streamReader.ReadToEnd();
            }
            else
            {
                // Treat the data as plain text if it's not GZIP-compressed
                return Encoding.UTF8.GetString(input.ToArray());
            }
        }

        private static bool IsGZipCompressed(byte[] data)
        {
            if (data.Length < 2)
                return false;

            return data[0] == 31 && data[1] == 139;
        }
    }
}

输出:-

enter image description here

关于azure - 将 Azure.Messaging.EventHubs 从 5.5.0 升级到 5.9.0。当我使用 'eventData.Body.Array, eventData.Body.Offset' 返回错误时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/77065310/

相关文章:

linux - Perl 升级会破坏 Linux 上的旧版本吗?

amazon-web-services - 升级带有附加卷的 cloudformation 堆栈中的实例

python - 使用 python 处理事件中心数据

Azure 事件中心 - REST API 身份验证

azure - 使用 Azure 数据工厂中的复制数据工具导入由事件中心存档生成的 Avro 文件时出错

c# - 使用 ConfigurationManager 访问 v3.5 Azure WebJob 中的 App.config

reactjs - ReactJS 应用程序中的配置文件

.NET 6 Linux 多容器应用程序无法在 Azure 应用服务中建立容器内部通信

azure - Dynamics crm 365 获取 azure adal 授权码

Cassandra 版本升级