dask - 异步计算 dask 数组 block (Dask + FastAPI)

标签 dask dask-distributed fastapi uvicorn

我正在构建一个 FastAPI 应用程序,它将为 Dask 数组的 block 提供服务。我想利用 FastAPI's asynchronous functionality旁边Dask-distributed's ability to operate asynchronously .下面是一个 mcve,演示了我在应用程序的服务器端和客户端上尝试做的事情:

服务器端:

import time

import dask.array as da
import numpy as np
import uvicorn
from dask.distributed import Client
from fastapi import FastAPI

app = FastAPI()
# create a dask array that we can serve
data = da.from_array(np.arange(0, 1e6, dtype=np.int), chunks=100)


async def _get_block(block_id):
    """return one block of the dask array as a list"""
    block_data = data.blocks[block_id].compute()
    return block_data.tolist()


@app.get("/")
async def get_root():
    time.sleep(1)
    return {"Hello": "World"}


@app.get("/{block_id}")
async def get_block(block_id: int):
    time.sleep(1)  # so we can test concurrency
    my_list = await _get_block(block_id)
    return {"block": my_list}


if __name__ == "__main__":
    client = Client(n_workers=2)
    print(client)
    print(client.cluster.dashboard_link)
    uvicorn.run(app, host="0.0.0.0", port=9000, log_level="debug")

客户端

import dask
import requests
from dask.distributed import Client

client = Client()

responses = [
    dask.delayed(requests.get, pure=False)(f"http://127.0.0.1:9000/{i}") for i in range(10)
]
dask.compute(responses)

在此设置中,_get_block 中的compute() 调用是“阻塞”的,一次只计算一个 block 。我尝试了 Client(asynchronous=True)client.compute(dask.compute(responses)) 的各种组合,但没有任何改进。是否可以等待 dask 数组的计算?

最佳答案

这一行

block_data = data.blocks[block_id].compute()

是阻塞调用。如果你改为执行 client.compute(data.blocks[block_id]),你将得到一个可以与你的 IOLoop 结合使用的可等待的 future ,只要 Dask 使用相同的循环。

请注意,Intake 服务器非常希望以这种方式工作(它也渴望按 block 流式传输数组和其他数据类型的数据)。

关于dask - 异步计算 dask 数组 block (Dask + FastAPI),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60492963/

相关文章:

python - Dask.dataframe 或替代方案 : Scalable way of dropping rows of low frequency items

python - 使用 dask 为一列数据帧应用 json.loads

dask - 属性错误 : module 'dask' has no attribute 'delayed'

python-3.x - 如果延迟评估,如何检查 dask 数据框是否为空?

deployment - 保存经过训练的 Detectron2 模型并对单个图像进行预测

dask - 如何使用 dask/fastparquet 从多个目录读取多个 Parquet 文件(具有相同的架构)

dask - 覆盖 dask 调度程序以在多个工作人员上同时加载数据

fastapi - CryptContext 散列如何知道使用什么 secret ?

python - 如何在 FastAPI 中渲染 CSS/JS/图像以及 HTML 文件?