dask - Streamz 与 Dask 分布式

标签 dask dask-distributed

基于streamz documentation ,可以通过以下方式利用 dask 分布式集群:

from distributed import Client
client = Client('tcp://localhost:8786')  # Connect to scheduler that has distributed workers

from streamz import Stream
source = Stream()
(source.scatter()       # scatter local elements to cluster, creating a DaskStream
       .map(increment)  # map a function remotely
       .buffer(5)       # allow five futures to stay on the cluster at any time
       .gather()        # bring results back to local process
       .sink(write))    # call write locally

for x in range(10):
    source.emit(x)

从概念上讲,尚不清楚为什么我们不必将 dask 分布式 client 作为参数传递来实例化 Stream()。更具体地说,Stream() 如何知道要附加到哪个调度程序?

如果您有两个调度程序,它们的工作人员位于不相关的节点上,例如:

from distributed import Client
client_1 = Client('tcp://1.2.3.4:8786')
client_2 = Client('tcp://10.20.30.40:8786')

如何分别为 client_1client_2 创建两个流?

最佳答案

Dask 中的基本规则是,如果定义了分布式客户端,则将其用于任何 Dask 计算。如果有多个分布式客户端,请使用最近创建的仍处于事件状态的客户端。

Streamz 不会明确让您在 .scatter() 时选择要使用的客户端,它使用 dask.distributed.default_client() 来选择一个。您可能希望向他们提出问题以允许使用 client= 关键字。该工作流程甚至不适合基于上下文的方法。目前,如果您想让多个 Streamz 同时处理不同 Dask 集群上的数据,您可能必须操作 dask.distributed.client._global_clients 的状态。

关于dask - Streamz 与 Dask 分布式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52614762/

相关文章:

python - Dask Dataframe 分布式进程 ID 访问被拒绝

parquet - 在单个多核机器上索引大型 dask 数据帧时的内存使用情况

dask - 覆盖 dask 调度程序以在多个工作人员上同时加载数据

dask - distributed.worker 内存使用率很高,但 worker 没有数据可以存储到磁盘

python - dask 分布式数据帧上的慢 len 函数

python - 没有找到模块错误 : No module named 'distributed'

pandas - 如何在 Dask Dataframe 上估算列值?

python - 使用 GUI 从 Python 代码运行 dask 调度程序

python - 使用 Python xarray 屏蔽 'where' 不再起作用

python - Dask 中间结果