python - 为什么 Eventhub 异步接收器每分钟仅获取 30-35 条消息?

标签 python azure azure-eventhub

我有一个用python开发的Eventhub的async_receive方法,并且还有一个检查点。代码取自Eventhub官方样本github repo .

问题-使用上述代码,如果我整天保持接收器打开状态,则我只能每分钟接收 20-35 条消息,而我的Eventhub 摄取了大量流数据(每分钟约 200 条消息)。由于接收端的吞吐量较差,消息在 eventhub 上的排队时间现在落后了90 分钟,这意味着 在 Eventhub 中 X 分钟排队的数据被拉出X+90 分钟内完成

调查-我尝试查看 receive subclass in the Eventhub python SDK并遇到了一个预取参数(第318行),默认情况下设置为300。如果这已经设置为 300,那么默认情况下我应该能够拉取超过 30-35 条消息。

关于如何提高我的拉动能力有什么想法吗?我陷入了困境,没有前进的方向,非常感谢任何帮助。

编辑1- 我现在附上我的 Python 代码,如下所示 -

import asyncio
import json
import logging
import os
import sys
import time
from datetime import date

import requests
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
import log_handler
import threading
import traceback

try:
    ## Set env variables
    CONNECTION_STR = os.environ["ECS"].strip()
    EVENTHUB_NAME = os.environ['EN'].strip()
    EVENTHUB_CONSUMER = os.environ["EC"].strip()
    API = os.environ['API_variable'].strip()
    AZURE_BLOB_CONNECTION_STR = os.environ["ACS"].strip()
    BLOB_CONTAINER_NAME = os.environ["BCN"].strip()
    BLOB_ACCOUNT_URL = os.environ["BAU"].strip()

    PREFETCH_COUNT = int(os.environ["PREFETCH_COUNT"])
    MAX_WAIT_TIME = float(os.environ["MAX_WAIT_TIME"])


except Exception as exception:
    logging.debug(traceback.format_exc())
    logging.warning(
        "*** Please check the environment variables for {}".format(str(exception)))
    sys.exit()

def API_CALL(event_data):
    """
    Sends the request to the API
    """
    
    try:
        url = event_data['image_url']
        payload = {"url": url}

        ## API call to the server
        service_response = requests.post(API, json=payload)
        logging.info(f"*** service_response.status_code : {service_response.status_code}")
        cloud_response = json.loads(
            service_response.text) if service_response.status_code == 200 else None
        today = date.today()
        response_date = today.strftime("%B %d, %Y")
        response_time = time.strftime("%H:%M:%S", time.gmtime())
        response_data = {
            "type": "response_data",
            "consumer_group": EVENTHUB_CONSUMER,
            'current_date': response_date,
            'current_time': response_time,
            'image_url': url,
            'status_code': service_response.status_code,
            'response': cloud_response,
            'api_response_time': int(service_response.elapsed.total_seconds()*1000),
            "eventhub_data": event_data
        }
        logging.info(f"*** response_data {json.dumps(response_data)}")
        logging.debug(f"*** response_data {json.dumps(response_data)}")
    except Exception as exception:
        logging.debug(traceback.format_exc())
        logging.error(
            "**** RaiseError: Failed request url %s, Root Cause of error: %s", url, exception)


async def event_operations(partition_context, event):

    start_time = time.time()
    data_ = event.body_as_str(encoding='UTF-8')
    json_data = json.loads(data_)
    
    ## forming data payload
    additional_data = {
        "type": "event_data",
        "consumer_group": EVENTHUB_CONSUMER,
        "image_name": json_data["image_url"].split("/")[-1]
    }

    json_data.update(additional_data)

    logging.info(f"*** Data fetched from EH : {json_data}")
    logging.debug(f"*** Data fetched from EH : {json_data}")

    API_CALL(json_data)

    logging.info(f"*** time taken to process an event(ms): {(time.time()-start_time)*1000}")


def between_callback(partition_context, event):
    """
    Loop to create threads
    """
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(event_operations(partition_context, event))
    loop.close()


async def on_event(partition_context, event):
    """
    Put your code here.
    Do some sync or async operations.
    If the operation is i/o intensive, async will have better performanceself.
    """

    t1 = time.time()
    _thread = threading.Thread(target=between_callback, args=(partition_context, event))
    _thread.start()
    logging.info(f"*** time taken to start a thread(ms): {(time.time()-t1)*1000}")
    logging.info("*** Fetching the next event")
    
    ## Update checkpoint per event
    t2 = time.time()
    await partition_context.update_checkpoint(event)
    logging.info(f"*** time taken to update checkpoint(ms): {(time.time()-t2)*1000}")

async def main(client):
    """
    Run the on_event method for each event received
    Args:
        client ([type]): Azure Eventhub listener client
    """
    
    try:
        async with client:
            # Call the receive method. Only read current data (@latest)
            logging.info("*** Listening to event")
            await client.receive(on_event=on_event,
                                 prefetch=PREFETCH_COUNT,
                                 max_wait_time=MAX_WAIT_TIME)

    except KeyboardInterrupt:
        print("*** Stopped receiving due to keyboard interrupt")
    except Exception as err:
        logging.debug(traceback.format_exc())
        print("*** some error occured :", err)

if __name__ == '__main__':
    
    ## Checkpoint initialization
    checkpoint_store = BlobCheckpointStore(
        blob_account_url=BLOB_ACCOUNT_URL,
        container_name=BLOB_CONTAINER_NAME,
        credential=AZURE_BLOB_CONNECTION_STR
    )
    
    ## Client initialization
    client = EventHubConsumerClient.from_connection_string(
        CONNECTION_STR,
        consumer_group=EVENTHUB_CONSUMER,
        eventhub_name=EVENTHUB_NAME,
        checkpoint_store=checkpoint_store, #COMMENT TO RUN WITHOUT CHECKPOINT
        logging_enable=True,
        on_partition_initialize=on_partition_initialize,
        on_partition_close=on_partition_close,
        idle_timeout=10,
        on_error=on_error,
        retry_total=3
    )
    
    logging.info("Connecting to eventhub {} consumer {}".format(
        EVENTHUB_NAME, EVENTHUB_CONSUMER))
    logging.info("Registering receive callback.")
    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(main(client))
    except KeyboardInterrupt as exception:
        pass
    finally:
        loop.stop()

Execution-flow main()-->on_event()-->Thread(between_callback-->API_CALL)-->update_checkpoint

最佳答案

当我们能够获取事件并运行它们直到事件完成时,按以下格式更改接收器的函数。

import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
async def on_event(partition_context, event):
 # Print the event data.
 print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))
 # Update the checkpoint so that the program doesn't read the events
 # that it has already read when you run it next time.
 await partition_context.update_checkpoint(event)
async def main():
 # Create an Azure blob checkpoint store to store the checkpoints.
 checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME")
 # Create a consumer client for the event hub.
 client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store)
 async with client:
 # Call the receive method. Read from the beginning of the partition (starting_position: "-1")
 await client.receive(on_event=on_event, starting_position="-1")
if __name__ == '__main__':
 loop = asyncio.get_event_loop()
 # Run the main method.
 loop.run_until_complete(main())

另外,按照评论中的建议,按时间间隔调用 API。

关于python - 为什么 Eventhub 异步接收器每分钟仅获取 30-35 条消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69394014/

相关文章:

python - Pandas 在 groupby 之后获得整数键

azure - 返回电子邮件地址 - 自定义策略 Azure B2C AD(企业帐户)

c# - 如何/在哪里可以找到我是否忘记处置 Redis 连接?

python-2.7 - 使用 azure-event-hubs-python 从多个分区获取事件?

azure - 将分析流式传输到事件中心 - 意外串联事件

python - Web API 和渲染模板,它们应该在同一个函数调用中吗?

python - Python Bot 的基本计算机视觉技术。

python - 如何对返回 JSON 对象的方法进行单元测试?

Azure辅助角色自定义性能计数器