airflow 2.2 时间表,总是有错误 : timetable not registered

标签 airflow scheduler timetable

我关注这个example

  1. 创建示例时间表py文件,并将其放入$Home/airflow/plugins
  2. 创建示例 dag 文件,并将其放在 $Home/airflow/dags 中

重启调度器和网络服务器后,我得到DAG导入错误。在 Web UI 中,最后一行详细错误消息:

airflow.exceptions.SerializationError: Failed to serialize DAG 'example_timetable_dag2': Timetable class 'AfterWorkdayTimetable' is not registered

但是如果我运行 airflow plugins,我可以看到时间表在名称和源列表中。

如何修复这个错误?

plugins/AfterWorkdayTimetable.py 的详细信息:

from datetime import timedelta
from typing import Optional

from pendulum import Date, DateTime, Time, timezone

from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable

UTC = timezone("UTC")


class AfterWorkdayTimetable(Timetable):
    def infer_data_interval(self, run_after: DateTime) -> DataInterval:
        weekday = run_after.weekday()
        if weekday in (0, 6):  # Monday and Sunday -- interval is last Friday.
            days_since_friday = (run_after.weekday() - 4) % 7
            delta = timedelta(days=days_since_friday)
        else:  # Otherwise the interval is yesterday.
            delta = timedelta(days=1)
        start = DateTime.combine((run_after - delta).date(), Time.min).replace(tzinfo=UTC)
        return DataInterval(start=start, end=(start + timedelta(days=1)))
    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: Optional[DataInterval],
        restriction: TimeRestriction,
    ) -> Optional[DagRunInfo]:
        if last_automated_data_interval is not None:  # There was a previous run on the regular schedule.
            last_start = last_automated_data_interval.start
            last_start_weekday = last_start.weekday()
            if 0 <= last_start_weekday < 4:  # Last run on Monday through Thursday -- next is tomorrow.
                delta = timedelta(days=1)
            else:  # Last run on Friday -- skip to next Monday.
                delta = timedelta(days=(7 - last_start_weekday))
            next_start = DateTime.combine((last_start + delta).date(), Time.min).replace(tzinfo=UTC)
        else:  # This is the first ever run on the regular schedule.
            next_start = restriction.earliest
            if next_start is None:  # No start_date. Don't schedule.
                return None
            if not restriction.catchup:
                # If the DAG has catchup=False, today is the earliest to consider.
                next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC))
            elif next_start.time() != Time.min:
                # If earliest does not fall on midnight, skip to the next day.
                next_day = next_start.date() + timedelta(days=1)
                next_start = DateTime.combine(next_day, Time.min).replace(tzinfo=UTC)
            next_start_weekday = next_start.weekday()
            if next_start_weekday in (5, 6):  # If next start is in the weekend, go to next Monday.
                delta = timedelta(days=(7 - next_start_weekday))
                next_start = next_start + delta
        if restriction.latest is not None and next_start > restriction.latest:
            return None  # Over the DAG's scheduled end; don't schedule.
        return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))


class WorkdayTimetablePlugin(AirflowPlugin):
    name = "workday_timetable_plugin"
    timetables = [AfterWorkdayTimetable]

dags/test_afterwork_timetable.py 的详细信息:

import datetime

from airflow import DAG
from AfterWorkdayTimetable import AfterWorkdayTimetable
from airflow.operators.dummy import DummyOperator


with DAG(
    dag_id="example_workday_timetable",
    start_date=datetime.datetime(2021, 1, 1),
    timetable=AfterWorkdayTimetable(),
    tags=["example", "timetable"],
) as dag:
    DummyOperator(task_id="run_this")

如果我运行 airflow plugins:

name                              | source                                   
==================================+==========================================
workday_timetable_plugin          | $PLUGINS_FOLDER/AfterWorkdayTimetable.py       

最佳答案

我有类似的问题。

要么你需要添加__init__.py文件,或者你应该试试这个来调试你的问题:

获取所有插件管理器对象:

    from airflow import plugins_manager
    plugins_manager.initialize_timetables_plugins()
    plugins_manager.timetable_classes

我得到了这个结果:{'quarterly.QuarterlyTimetable': <class 'quarterly.QuarterlyTimetable'>}

将您的结果与异常消息进行比较。如果timetable_classes dictionary 有不同的插件名称,您应该更改插件文件路径。

你也可以在 DAG python 文件中尝试这个:

from AfterWorkdayTimetable import AfterWorkdayTimetable
from airflow import plugins_manager
print(plugins_manager.as_importable_string(AfterWorkdayTimetable))

这将帮助您找到 airflow 在搜索 timetable_classes 时尝试使用的名称词典。

关于airflow 2.2 时间表,总是有错误 : timetable not registered,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69732193/

相关文章:

python - 如何满怀期待地使用Kedro?

Airflow worker 卡住 : Task is in the 'running' state which is not a valid state for execution. 必须清除任务才能运行

sql - 我正在使用 SQL 设计巴士时刻表。每条巴士路线都有多个站点,我需要为每条路线使用不同的 table 吗?

algorithm - 制定时间表

airflow - 我可以在 Airflow UI 中的何处添加 aws 连接?

Airflow中使用的MySQL出现了很多死锁

xml - 在任务计划程序操作中访问事件查看器值查询的结果

.net - Quartz.Net 调度程序 - 什么具体表示成功完成作业?

kubernetes - 避免 kubernetes 调度程序在 kubernetes 集群的单个节点中运行所有 pod

java - 根据三个旋转器中的输入从一项 Activity 转移到另一项 Activity