python - Eventhub 在 python 中以列表形式读取数据

标签 python azure-eventhub

我想使用 Azure Eventhub 作为中间件消息队列。我现在基本上以列表格式发送模拟数据并以字符串格式接收它。

如您所见Here ,只有几种数据可转换的格式。我希望数据的格式是其中包含 float 据的列表。

这是我现在正在处理的代码。我正在尝试操作下面的行来处理列表中累积的浮点形式的每个事件数据。

LIST.append(event_data.message._body)

这是我的代码主体。

CONSUMER_GROUP = "$default"
OFFSET = Offset("-1")
PARTITION = "0"


total = 0
last_sn = -1
last_offset = "-1"
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
i=1
LIST=[]
try:
    receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=5000, offset=OFFSET)
    client.run()
    start_time = time.time()
    batch = receiver.receive(timeout=None)
    while batch:
        for event_data in batch[-100:]:

            last_offset = event_data.offset
            last_sn = event_data.sequence_number
            print("Received: {}, {}".format(i, last_sn))
            LIST.append(event_data.message._body)

            i += 1
            total += 1
        batch = receiver.receive(timeout=5000)

    end_time = time.time()
    client.stop()
    run_time = end_time - start_time
    print("Received {} messages in {} seconds".format(total, run_time))

except KeyboardInterrupt:
    pass
finally:
    client.stop()

您可以在 Here 中找到 eventData 类

====================================更新============ =====================

结果显示“Message [a b c ....]”,我认为该 Message 已被设置为写入,因此我想删除结果格式中的“Message”一词。

“sender.py”如下:

from azure.eventhub import EventHubClient, Sender, EventData
import time
import logging
import numpy as np

logger = logging.getLogger("azure")

ADDRESS = ""
USER = "RootManageSharedAccessKey"
KEY = ""

try:
    if not ADDRESS:
        raise ValueError("No EventHubs URL supplied.")

    # Create Event Hubs client
    client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
    sender = client.add_sender(partition="0")
    client.run()
    forging2 = lambda x: (np.exp(-(0.1*x-6)**2+3) + np.exp(-(0.1*x-4)**2+4))*1.4
    x_value = np.arange(100)
    try:
        start_time = time.time()
        for i in range(100):

            y_value1 = forging2(x_value) + np.random.normal(0,1,len(x_value))*3
            y_value1 = np.asarray(y_value1)
            print("Sending message: {}, {}".format(i, y_value1))
            message = y_value1
            sender.send(EventData(message))
            time.sleep(0.35)
    except:
        raise
    finally:
        end_time = time.time()
        client.stop()
        run_time = end_time - start_time
        logger.info("Runtime: {} seconds".format(run_time))

except KeyboardInterrupt:
    pass

最佳答案

以下固定代码有效:

logger = logging.getLogger("azure")

ADDRESS = ""
USER = "RootManageSharedAccessKey"
KEY = ""

try:
    if not ADDRESS:
        raise ValueError("No EventHubs URL supplied.")
    # Create Event Hubs client
    client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
    sender = client.add_sender(partition="0")
    client.run()
    forging2 = lambda x: (np.exp(-(0.1*x-6)**2+3) + np.exp(-(0.1*x-4)**2+4))*1.4
    x_value = np.arange(100)
    try:
        start_time = time.time()
        for i in range(100000):

            y_value1 = forging1(x_value) + np.random.normal(0,1,len(x_value))*3
            y_value1 = np.asarray(y_value1)
            print("Sending message: {}, {}".format(i, y_value1))
            message = "{}".format(y_value1)
            sender.send(EventData(message))
            time.sleep(0.35)
    except:
        raise
    finally:
        end_time = time.time()
        client.stop()
        run_time = end_time - start_time
        logger.info("Runtime: {} seconds".format(run_time))

except KeyboardInterrupt:
    pass

这样,我就能够接收到没有“消息”的消息了。

关于python - Eventhub 在 python 中以列表形式读取数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58406060/

相关文章:

python - 在 Python 中,如何确定对象是否仍然存在?

python - 如何将包含重复值的列表转换为以列表为值的字典?

python - 是否可以检测函数中声明的局部变量的数量?

python - 使用rest api时选择Azure事件中心的分区

azure - 如何实现从Azure Event Hub的高速处理接收?

python - 为什么 sum 函数的参数需要放在括号中?

python - 如何在 Python 中检查神秘的反序列化对象

azure - 如何将数据从多个事件中心发送到 ADX?

Azure 事件中心脱机

azure - 如何通过事件中心在 ARM 模板中使用 listkeys 函数