我正在使用 azure 事件中心 python SDK 通过此链接向事件中心发送和接收消息。 https://github.com/Azure/azure-event-hubs-python/tree/develop 。我可以成功发送和接收消息。但是我如何解析消息并从事件数据对象中检索数据。请找到下面的代码。
import os
import sys
#import logging
from azure.eventhub import EventHubClient, Receiver, Offset
ADDRESS = 'sb://####.servicebus.windows.net/#####'
USER = '##########'
KEY = '##################################'
CONSUMER_GROUP = "$default"
OFFSET = Offset("-1")
PARTITION = "1"
total = 0
last_sn = -1
last_offset = "-1"
try:
if not ADDRESS:
raise ValueError("No EventHubs URL supplied.")
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=5000,
offset=OFFSET)
client.run()
try:
batched_events = receiver.receive(timeout=20)
except:
raise
finally:
client.stop()
for event_data in batched_events:
last_offset = event_data.offset.value
last_sn = event_data.sequence_number
total += 1
print("Partition {}, Received {}, sn={} offset={}".format(
PARTITION,
total,
last_sn,
last_offset))
except KeyboardInterrupt:
pass
如果我尝试查看收到的 event_data,我可以看到以下消息。
事件数据
<azure.eventhub.common.EventData at 0xd4f1358>
event_data.消息
<uamqp.message.Message at 0xd4f1240>
有关上述如何解析此消息以提取数据的任何帮助
最佳答案
截至1.1.0
,有新的实用方法可以提取消息的实际数据:
原来是这样的
import json
event_obj = json.loads(next(event_data.body).decode('UTF-8'))
现在是:
event_obj = event_data.body_as_json()
关于python - 使用 python 处理事件中心数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52198132/