python - 调度程序关闭流警告

标签 python dask dask-distributed

我有一个周期性的批处理作业在我的笔记本电脑上运行。代码如下所示:

client = Client()
print(client.scheduler_info())
topic='raw_data'
start = datetime.datetime.now()
delta = datetime.timedelta(minutes=2)
while True:
    end = start + delta
    if end <= datetime.datetime.now():
        start = end
        print('It\'s time to run the analysis for the 2 mins')
        data = get_data_from_parquet('raw_data_fast_par.par', start=start, end=end)
        metrics = [Metric1(), Metric2(), Metric3()]
        print(data.npartitions)
        channels = data.groupby(['col1', 'col2', 'col3'])
        for metric in metrics:
            features = metric.map_job(channels, start, end)
            print(features.count().compute())

简而言之,我每两分钟对数据进行某种分析,这些数据是我从 Parquet 文件中读取的,预测日期过滤。这是一个测试,所以我知道现在没有多大意义。 我在终端上收到以下警告。有人可以解释为什么会发生这种情况,如果它很重要,以及我该如何避免这种情况?

distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:55448 remote=tcp://127.0.0.1:42197>

最佳答案

我不知道实际问题是什么,但您可以尝试在完成后彻底关闭本地集群,也许可以使用 Client 作为上下文管理器。

with Client() as client:
    ...

关于python - 调度程序关闭流警告,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53508212/

相关文章:

python - ValueError : Not all divisions are known, 无法对齐 dask 数据帧上的分区错误

python - 更改 Django 1.11 中的用户名 max_length (django.contrib.auth)

python - Cocoa 客户端/服务器应用程序

python - 使用 Dask 处理大型、压缩的 csv 文件

python - 不同大小数组的逐元素运算

dask - 为什么在运行 Pandas 操作时会收到 dask 警告?

python - 本地使用 dask : to Client() or not to Client()?

python - 我如何让 matplotlib 自动更改线标记?

python - 将项目配置存储在环境变量中是一种不好的做法吗?

dataframe - dask dataframe 70GB 数据连接操作的最佳分区大小