结合多个 cron 表达式的 Airflow 时间表?

标签 airflow airflow-scheduler

我有几个需要应用于单个 DAG 的 cron 表达式。没有办法用一个 cron 表达式来表达它们。

Airflow 2.2 引入了时间表。是否有一个采用 cron 表达式列表的实现?

最佳答案

我一直在寻找同样的东西,但没有找到任何东西。如果标准版带有 Airflow 就太好了。

这是我为 Airflow 2.2.5 编写的 0.1 版本。

# This file is <airflow plugins directory>/timetable.py

from typing import Any, Dict, List, Optional
import pendulum
from croniter import croniter
from pendulum import DateTime, Duration, timezone, instance as pendulum_instance
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from airflow.exceptions import AirflowTimetableInvalid


class MultiCronTimetable(Timetable):
    valid_units = ['minutes', 'hours', 'days']

    def __init__(self,
                 cron_defs: List[str],
                 timezone: str = 'Europe/Berlin',
                 period_length: int = 0,
                 period_unit: str = 'hours'):

        self.cron_defs = cron_defs
        self.timezone = timezone
        self.period_length = period_length
        self.period_unit = period_unit

    def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
        """
        Determines date interval for manually triggered runs.
        This is simply (now - period) to now.
        """
        end = run_after
        if self.period_length == 0:
            start = end
        else:
            start = self.data_period_start(end)
        return DataInterval(start=start, end=end)

    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: Optional[DataInterval],
        restriction: TimeRestriction) -> Optional[DagRunInfo]:
        """
        Determines when the DAG should be scheduled.

        """

        if restriction.earliest is None:
            # No start_date. Don't schedule.
            return None

        is_first_run = last_automated_data_interval is None

        if is_first_run:
            if restriction.catchup:
                scheduled_time = self.next_scheduled_run_time(restriction.earliest)

            else:
                scheduled_time = self.previous_scheduled_run_time()
                if scheduled_time is None:
                    # No previous cron time matched. Find one in the future.
                    scheduled_time = self.next_scheduled_run_time()
        else:
            last_scheduled_time = last_automated_data_interval.end

            if restriction.catchup:
                scheduled_time = self.next_scheduled_run_time(last_scheduled_time)

            else:
                scheduled_time = self.previous_scheduled_run_time()

                if scheduled_time is None or scheduled_time == last_scheduled_time:
                    # No previous cron time matched,
                    # or the matched cron time was the last execution time,
                    scheduled_time = self.next_scheduled_run_time()

                elif scheduled_time > last_scheduled_time:
                    # Matched cron time was after last execution time, but before now.
                    # Use this cron time
                    pass

                else:
                    # The last execution time is after the most recent matching cron time.
                    # Next scheduled run will be in the future
                    scheduled_time = self.next_scheduled_run_time()

        if scheduled_time is None:
            return None

        if restriction.latest is not None and scheduled_time > restriction.latest:
            # Over the DAG's scheduled end; don't schedule.
            return None

        start = self.data_period_start(scheduled_time)
        return DagRunInfo(run_after=scheduled_time, data_interval=DataInterval(start=start, end=scheduled_time))

    def data_period_start(self, period_end: DateTime):
        return period_end - Duration(**{self.period_unit: self.period_length})

    def croniter_values(self, base_datetime=None):
        if not base_datetime:
            tz = timezone(self.timezone)
            base_datetime = pendulum.now(tz)

        return [croniter(expr, base_datetime) for expr in self.cron_defs]

    def next_scheduled_run_time(self, base_datetime: DateTime = None):
        min_date = None
        tz = timezone(self.timezone)
        if base_datetime:
            base_datetime_localized = base_datetime.in_timezone(tz)
        else:
            base_datetime_localized = pendulum.now(tz)

        for cron in self.croniter_values(base_datetime_localized):
            next_date = cron.get_next(DateTime)
            if not min_date:
                min_date = next_date
            else:
                min_date = min(min_date, next_date)
        if min_date is None:
            return None
        return pendulum_instance(min_date)

    def previous_scheduled_run_time(self, base_datetime: DateTime = None):
        """
        Get the most recent time in the past that matches one of the cron schedules
        """
        max_date = None
        tz = timezone(self.timezone)
        if base_datetime:
            base_datetime_localized = base_datetime.in_timezone(tz)
        else:
            base_datetime_localized = pendulum.now(tz)

        for cron in self.croniter_values(base_datetime_localized):
            prev_date = cron.get_prev(DateTime)
            if not max_date:
                max_date = prev_date
            else:
                max_date = max(max_date, prev_date)
        if max_date is None:
            return None
        return pendulum_instance(max_date)


    def validate(self) -> None:
        if not self.cron_defs:
            raise AirflowTimetableInvalid("At least one cron definition must be present")

        if self.period_unit not in self.valid_units:
            raise AirflowTimetableInvalid(f'period_unit must be one of {self.valid_units}')

        if self.period_length < 0:
            raise AirflowTimetableInvalid(f'period_length must not be less than zero')

        try:
            self.croniter_values()
        except Exception as e:
            raise AirflowTimetableInvalid(str(e))

    @property
    def summary(self) -> str:
        """A short summary for the timetable.

        This is used to display the timetable in the web UI. A cron expression
        timetable, for example, can use this to display the expression.
        """
        return ' || '.join(self.cron_defs) + f' [TZ: {self.timezone}]'

    def serialize(self) -> Dict[str, Any]:
        """Serialize the timetable for JSON encoding.

        This is called during DAG serialization to store timetable information
        in the database. This should return a JSON-serializable dict that will
        be fed into ``deserialize`` when the DAG is deserialized.
        """
        return dict(cron_defs=self.cron_defs,
                    timezone=self.timezone,
                    period_length=self.period_length,
                    period_unit=self.period_unit)

    @classmethod
    def deserialize(cls, data: Dict[str, Any]) -> "MultiCronTimetable":
        """Deserialize a timetable from data.

        This is called when a serialized DAG is deserialized. ``data`` will be
        whatever was returned by ``serialize`` during DAG serialization.
        """
        return cls(**data)


class CustomTimetablePlugin(AirflowPlugin):
    name = "custom_timetable_plugin"
    timetables = [MultiCronTimetable]

要使用它,您需要提供一个 cron 表达式列表,可以选择时区字符串,也可以选择周期长度和周期单位。

对于我的用例,我实际上不需要周期长度 + 单位,它们用于确定 DAG 的 data_interval。如果您的 DAG 不关心 data_interval,您可以将它们保留为默认值 0 分钟。

我尝试模仿标准 schedule_interval 行为。例如,如果 catchup = False 并且 DAG 自上次运行以来可能已被触发多次(无论出于何种原因,例如 DAG 运行时间比预期长,或者调度程序未运行,或者这是 DAG 第一次被调度),那么 DAG 将被调度到最近一次匹配的时间运行。

我还没有真正用catchup = True测试它,但理论上它会在自DAG的start_date以来的每个匹配的cron时间运行(但每个不同的只能运行一次)时间,例如使用 */30 * * * *0 * * * *,DAG 每小时运行两次,而不是三次)。

DAG 文件示例:

from time import sleep
import airflow
from airflow.operators.python import PythonOperator
import pendulum
from timetable import MultiCronTimetable

def sleepy_op():
    sleep(660)


with airflow.DAG(
        dag_id='timetable_test',
        start_date=pendulum.datetime(2022, 6, 2, tz=pendulum.timezone('America/New_York')),
        timetable=MultiCronTimetable(['*/5 * * * *', '*/3 * * * fri,sat', '1 12 3 * *'], timezone='America/New_York', period_length=10, period_unit='minutes'),
        catchup=False,
        max_active_runs=1) as dag:

    sleepy = PythonOperator(
        task_id='sleepy',
        python_callable=sleepy_op
    )

关于结合多个 cron 表达式的 Airflow 时间表?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72478492/

相关文章:

debian - 重击: Airflow :找不到命令

airflow - 如何阻止DAG回填? catchup_by_default=False 和 catchup=False 似乎不起作用并且 Airflow Scheduler 无法回填

airflow - 如何在 Airflow 中传递不记名 token

mysql - Airflow如何获取和处理mysql记录?

python-3.x - 使用 Apache Airflow 提交和监控 SLURM 作业

airflow-scheduler - 管理 Multi-Tenancy Airflow 实例中的连接

python - Airflow :无法将数据从 mysql 数据库传输到 csv 文件

具有同时运行的多个实例的按需 DAG Airflow

postgresql - Airflow 中的 PostgresOperator 超时

airflow - Airflow 中任务的粒度