Python - 将数据从文件流式传输到启用异步的 Kinesis Producer

标签 python python-3.x python-asyncio

我正在使用 Python 3.6 中支持异步的 Kinesis Producer 模块来部署到 AWS Lambda(因此我需要它与 3.6 兼容)。

我的用例是从磁盘延迟读取文件(大约 100MB 压缩 - 1GB 未压缩)并将数据(一次 500 行)流式传输到 Kinesis Producer。我希望 Kinesis Producer 在我读取下一批 500 行时开始将 500 条记录推送到 Kinesis。

我注意到它一次读取整个文件 500 行,然后开始将数据推送到 Kinesis Producer。看来原因是因为我没有打电话 await asyncio.sleep(1) ,但我也不知道我是否以正确的方式这样做。

def lambda_handler(event, context):
    event_loop = asyncio.get_event_loop()
    # Extract filename from event and download file from S3
    event_loop.run_until_complete(process(filename))
    pending = asyncio.Task.all_tasks()
    event_loop.run_until_complete(asyncio.gather(*pending))


async def process(filename):
    for chunk in read_lines(filename, MAX_RECORDS_IN_BATCH):
    asyncio.ensure_future(write_kinesis(chunk)).add_done_callback(callback)


def callback(result):
    print(str(result))


async def write_kinesis(records):
    future = asyncio.ensure_future(producer.put_records(records=records))

如果我添加await asyncio.sleep(.1)process(filename)结束函数,比它似乎完全按照我想要的方式执行,当然,它实际上阻塞了主线程 0.1 秒。

问-这就是技巧吗,用 asyncio.sleep 阻塞足够长的时间,以便 Kinesis Producer 推出数据?它休眠的时间越少,我在内存中保存的数据就越多,因为 kinesis 客户端没有那么多时间将数据推出,但它会运行得更快(在一定程度上)?

问- 我这样做的方式正确吗?再次,我尝试读取 500 行,推送到 kinesis(异步),在 Kinesis 客户端工作时再读取 500 行,冲洗并重复。

此外,当查看回调函数中的打印语句时,我注意到如果 write_kinesis 函数没有返回任何内容,则回调的打印语句有 result=None ,而如果 write_kinesis 函数返回 Future,则回调的打印语句具有 result=<Task pending...11b35f18>()

问-我假设没有 return 语句,就没有结果,但是为什么当状态仍然是“Pending”时它会调用回调函数?

编辑1:我忘了提及,Kinesis 客户端已经启用了异步功能。

最佳答案

你只需使用await调用异步函数

async def process(filename):
    for chunk in read_lines(filename, MAX_RECORDS_IN_BATCH):
        await producer.put_records(records=chunk)

async def run():
    # Extract filename from event and download file from S3
    await process(filename)

loop = asyncio.get_event_loop()
loop.run_until_complete( run() )
loop.close()

关于Python - 将数据从文件流式传输到启用异步的 Kinesis Producer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55150171/

相关文章:

python - Keras - 通过测试所有可能的超参数来调整顺序模型

python - 有效地将自定义函数应用于 Pandas 中的组

python-3.x - 在 pywin32 中创建一个新的 Excel 文件

python - asyncio.run() 不能从正在运行的事件循环中调用

python - RuntimeError : There is no current event loop in thread 'Thread-1' , 多线程和异步错误

Python:Jinja模板中if语句的多个条件

python - 使用 Restful Flask 渲染 HTML 网页 - 网页无法正确显示

python - 在 Cygwin 上安装 Pip-3.2

python-3.x - 如何使用 python 3.8 获取 AWS S3 对象位置/URL?

python - 从异步子进程读取流输出