我有一个用python
开发的Eventhub的async_receive
方法,并且还有一个检查点
。代码取自Eventhub官方样本github repo .
问题-使用上述代码,如果我整天保持接收器打开状态,则我只能每分钟接收 20-35 条消息
,而我的Eventhub 摄取了大量流数据(每分钟约 200 条消息)。由于接收端的吞吐量较差,消息在 eventhub 上的排队时间现在落后了90 分钟,这意味着 在 Eventhub 中 X 分钟排队的数据被拉出X+90 分钟内完成
调查-我尝试查看 receive subclass in the Eventhub python SDK并遇到了一个预取参数
(第318行),默认情况下设置为300
。如果这已经设置为 300,那么默认情况下我应该能够拉取超过 30-35 条消息。
关于如何提高我的拉动能力有什么想法吗?我陷入了困境,没有前进的方向,非常感谢任何帮助。
编辑1- 我现在附上我的 Python 代码,如下所示 -
import asyncio
import json
import logging
import os
import sys
import time
from datetime import date
import requests
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
import log_handler
import threading
import traceback
try:
## Set env variables
CONNECTION_STR = os.environ["ECS"].strip()
EVENTHUB_NAME = os.environ['EN'].strip()
EVENTHUB_CONSUMER = os.environ["EC"].strip()
API = os.environ['API_variable'].strip()
AZURE_BLOB_CONNECTION_STR = os.environ["ACS"].strip()
BLOB_CONTAINER_NAME = os.environ["BCN"].strip()
BLOB_ACCOUNT_URL = os.environ["BAU"].strip()
PREFETCH_COUNT = int(os.environ["PREFETCH_COUNT"])
MAX_WAIT_TIME = float(os.environ["MAX_WAIT_TIME"])
except Exception as exception:
logging.debug(traceback.format_exc())
logging.warning(
"*** Please check the environment variables for {}".format(str(exception)))
sys.exit()
def API_CALL(event_data):
"""
Sends the request to the API
"""
try:
url = event_data['image_url']
payload = {"url": url}
## API call to the server
service_response = requests.post(API, json=payload)
logging.info(f"*** service_response.status_code : {service_response.status_code}")
cloud_response = json.loads(
service_response.text) if service_response.status_code == 200 else None
today = date.today()
response_date = today.strftime("%B %d, %Y")
response_time = time.strftime("%H:%M:%S", time.gmtime())
response_data = {
"type": "response_data",
"consumer_group": EVENTHUB_CONSUMER,
'current_date': response_date,
'current_time': response_time,
'image_url': url,
'status_code': service_response.status_code,
'response': cloud_response,
'api_response_time': int(service_response.elapsed.total_seconds()*1000),
"eventhub_data": event_data
}
logging.info(f"*** response_data {json.dumps(response_data)}")
logging.debug(f"*** response_data {json.dumps(response_data)}")
except Exception as exception:
logging.debug(traceback.format_exc())
logging.error(
"**** RaiseError: Failed request url %s, Root Cause of error: %s", url, exception)
async def event_operations(partition_context, event):
start_time = time.time()
data_ = event.body_as_str(encoding='UTF-8')
json_data = json.loads(data_)
## forming data payload
additional_data = {
"type": "event_data",
"consumer_group": EVENTHUB_CONSUMER,
"image_name": json_data["image_url"].split("/")[-1]
}
json_data.update(additional_data)
logging.info(f"*** Data fetched from EH : {json_data}")
logging.debug(f"*** Data fetched from EH : {json_data}")
API_CALL(json_data)
logging.info(f"*** time taken to process an event(ms): {(time.time()-start_time)*1000}")
def between_callback(partition_context, event):
"""
Loop to create threads
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(event_operations(partition_context, event))
loop.close()
async def on_event(partition_context, event):
"""
Put your code here.
Do some sync or async operations.
If the operation is i/o intensive, async will have better performanceself.
"""
t1 = time.time()
_thread = threading.Thread(target=between_callback, args=(partition_context, event))
_thread.start()
logging.info(f"*** time taken to start a thread(ms): {(time.time()-t1)*1000}")
logging.info("*** Fetching the next event")
## Update checkpoint per event
t2 = time.time()
await partition_context.update_checkpoint(event)
logging.info(f"*** time taken to update checkpoint(ms): {(time.time()-t2)*1000}")
async def main(client):
"""
Run the on_event method for each event received
Args:
client ([type]): Azure Eventhub listener client
"""
try:
async with client:
# Call the receive method. Only read current data (@latest)
logging.info("*** Listening to event")
await client.receive(on_event=on_event,
prefetch=PREFETCH_COUNT,
max_wait_time=MAX_WAIT_TIME)
except KeyboardInterrupt:
print("*** Stopped receiving due to keyboard interrupt")
except Exception as err:
logging.debug(traceback.format_exc())
print("*** some error occured :", err)
if __name__ == '__main__':
## Checkpoint initialization
checkpoint_store = BlobCheckpointStore(
blob_account_url=BLOB_ACCOUNT_URL,
container_name=BLOB_CONTAINER_NAME,
credential=AZURE_BLOB_CONNECTION_STR
)
## Client initialization
client = EventHubConsumerClient.from_connection_string(
CONNECTION_STR,
consumer_group=EVENTHUB_CONSUMER,
eventhub_name=EVENTHUB_NAME,
checkpoint_store=checkpoint_store, #COMMENT TO RUN WITHOUT CHECKPOINT
logging_enable=True,
on_partition_initialize=on_partition_initialize,
on_partition_close=on_partition_close,
idle_timeout=10,
on_error=on_error,
retry_total=3
)
logging.info("Connecting to eventhub {} consumer {}".format(
EVENTHUB_NAME, EVENTHUB_CONSUMER))
logging.info("Registering receive callback.")
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(client))
except KeyboardInterrupt as exception:
pass
finally:
loop.stop()
Execution-flow main()-->on_event()-->Thread(between_callback-->API_CALL)-->update_checkpoint
最佳答案
当我们能够获取事件并运行它们直到事件完成时,按以下格式更改接收器的函数。
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
async def on_event(partition_context, event):
# Print the event data.
print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))
# Update the checkpoint so that the program doesn't read the events
# that it has already read when you run it next time.
await partition_context.update_checkpoint(event)
async def main():
# Create an Azure blob checkpoint store to store the checkpoints.
checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME")
# Create a consumer client for the event hub.
client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store)
async with client:
# Call the receive method. Read from the beginning of the partition (starting_position: "-1")
await client.receive(on_event=on_event, starting_position="-1")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Run the main method.
loop.run_until_complete(main())
另外,按照评论中的建议,按时间间隔调用 API。
关于python - 为什么 Eventhub 异步接收器每分钟仅获取 30-35 条消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69394014/