python - 在 Luigi Orchestrator 任务的运行函数中使用 luigi.DateParameter

标签 python orchestration luigi

我使用 Luigi Orchestrator 编写了以下 Python 代码。

class AggregateArtists(luigi.Task):
    date = luigi.DateParameter(default=date.today() - timedelta(days=1))

    def requires(self):
        return []

    def run(self):
        ...

我想在我的 run() 函数中使用日期参数。问题是我不知道它是什么类型。在文档中,这个参数似乎是一个datetime.date,所以我应该能够使用self.date.strftime()方法。但此方法不适用于 DateParameters

我的问题是:

  • 如何在运行函数中使用代码的可变日期?它是什么类型?字符串、datetime.date 还是其他?

  • 有时,我需要将此日期转换为 YYYYMMDD 形式的字符串,我该怎么办?

最佳答案

您的代码不完整,但我猜其余部分如下所示。您一定在某个地方有错误,因为它有效:DateParameter 返回一个 python 日期值。请参阅luigi source code for details .

我的tasks/foo.py:

from datetime import date, timedelta
import luigi


class AggregateArtists(luigi.Task):
    date = luigi.DateParameter(default=date.today() - timedelta(days=1))

    def output(self):
        return luigi.LocalTarget("/tmp/foobar.txt")

    def run(self):
        with self.output().open('w') as out_file:
            out_file.write(self.date.strftime("%Y%m%d") + "\n")


if __name__ == "__main__":
    luigi.run()

运行任务:

$ python tasks/foo.py AggregateArtists --local-scheduler

DEBUG: Checking if AggregateArtists(date=2015-12-03) is complete
INFO: Scheduled AggregateArtists(date=2015-12-03) (PENDING)
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21831] Worker Worker(salt=179482616, workers=1, host=matagus-laptop, username=matagus, pid=21831) running   AggregateArtists(date=2015-12-03)
INFO: [pid 21831] Worker Worker(salt=179482616, workers=1, host=matagus-laptop, username=matagus, pid=21831) done      AggregateArtists(date=2015-12-03)
DEBUG: 1 running tasks, waiting for next task to finish
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=179482616, workers=1, host=matagus-laptop, username=matagus, pid=21831) was stopped. Shutting down Keep-Alive thread

打印输出文件的内容:

$ cat /tmp/foobar.txt 
20151203

关于python - 在 Luigi Orchestrator 任务的运行函数中使用 luigi.DateParameter,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34072872/

相关文章:

python - lxml的E-factory是否支持动态生成数据?

python - 为数据框创建 Parquet 创建函数

python - 使用外部文件在 Airflow 中动态创建任务

windows - 在 Kubernetes 上编排 Windows VM

python - 当任务依赖项过期时,luigi 可以重新运行任务吗?

python - 访问 AWS 上的 Luigi 可视化工具

python - 基于其他两个列表的比较返回列表元素?

Python 和工厂

docker - 如何在本地环境中协调多个微服务?

python - 我怎样才能让我的 Luigi 调度程序利用带有并行调度标志的多个内核?