python - 为重复发生的事件安排提醒

标签 python scheduled-tasks celery recurring-events

我正在使用一个网络应用程序,该应用程序允许用户在日历上创建事件(一次性或重复发生),并且在事件开始前不久,系统将通知其参与者。我在设计此类通知的流程时遇到了问题,尤其是针对重复发生的事件。

需要考虑的事项:

  1. Web 应用程序的架构使得有许多相同结构的数据库,每个数据库都保留自己的一组用户和事件。因此,对一个数据库的任何查询都需要对数千个其他数据库进行查询。
  2. 周期性事件可能已排除日期(类似于 RRULE 和 EXDATE 组合)。

  3. 用户可以更新事件的时间/循环规则。

  4. 该应用程序是用 Python 编写的,并且已经将 Celery 3.1 与 Redis 代理一起使用。使用此设置的解决方案会很好,尽管任何事情都可以。据我了解,目前很难用Celery动态添加周期性任务。

我正在尝试的解决方案:

  • 周期性任务每天运行一次,扫描每个数据库并添加任务以在适当的时间为当天重复发生的每个事件发出通知。

  • 如上生成的每个任务都有其id临时保存在Redis中。如果用户在安排通知任务后更改当天的事件时间,则该任务将被撤销并替换为新任务。

上述解决方案的示例代码:

  • tasks.py中,所有要运行的任务:

    from celery.task import task as celery_task
    from celery.result import AsyncResult
    from datetime import datetime
    
    # ...
    
    @celery_task
    def create_notify_task():
        for account in system.query(Account):
            db_session = account.get_session()    # get sql alchemy session
            for event in db_session.query(Event):
                schedule_notify_event(account, partial_event)
    
    
    @celery_task(name='notify_event_users')
    def notify_event_users(account_id, event_id):
        # do notification for every event participant
        pass
    
    def schedule_notify_event(account, event):
        partial_event = event.get_partial_on(datetime.today())
        if partial_event:
            result = notify_event_users.apply_async(
                    args = (account.id, event.id),
                    eta = partial_event.start)
            replace_task_id(account.id, event.id, result.id)
        else:
            replace_task_id(account.id, event.id, None)
    
    def replace_task_id(account_id, event_id, result_id):
        key = '{}:event'.format(account_id)
        client = redis.get_client()
        old_result_id = client.hget(key, event_id)
        if old_result_id:
            AsyncResult(old_result_id).revoke()
        client.hset(key, event_id, result_id)
    
  • event.py 中:

    # when a user change event's time
    def update_event(event, data):
        # ...
        # update event
        # ...
        schedule_notify_event(account, event)
    
  • Celery 安装文件:

    from celery.schedules import crontab
    
    CELERYBEAT_SCHEDULE = {
        'create-notify-every-day': {
            'task': 'tasks.create_notify_task',
            'schedule': crontab(minute=0, hour=0),
            'args': (,)
        },
    }
    

上述的一些缺点是:

  • 日常任务可能需要很长时间才能运行。最后处理的数据库中的事件必须等待并且可能会被遗漏。提前安排该任务(例如,第二天前 2 小时)可能会缓解这种情况,但首次运行设置(或在服务器重启后)有点尴尬。

  • 必须注意不要为同一事件安排两次通知任务(例如,因为 create_notify_task 每天运行不止一次...)。

对此有更明智的方法吗?

相关问题:

最佳答案

好久没有人回答了,忘记这个问题了。无论如何,当时我采用了以下解决方案。我在这里概述一下,以防有人感兴趣。

  • 当一个事件被创建时,一个任务被安排在它下一次发生之前不久运行(即下一个通知时间)。计划时间是根据应用的所有重复规则和异常(exception)规则计算的,因此它只是一个简单的 celery 计划一次性任务。
  • 当任务运行时,它会执行通知作业,并在下一个通知时间安排一个新任务(同样,考虑所有重复规则和异常(exception)规则)。如果没有下一个事件发生,则不会安排新任务。
  • 任务的 ID 与事件一起保存在数据库中。如果事件的时间发生变化,任务将被取消,并在新的下一个通知时间安排一个新任务。当任务运行并调度新任务时,新任务的 ID 将保存在数据库中。

我能想到的一些优缺点:

  • 优点:
    • 不需要在 celery 中使用复杂的重复规则,因为任务只会安排一次运行。
    • 每个任务都相当小且速度很快,因为它只需要关心一个事件通知。
  • 缺点:
    • 任何时候都有大量的celery定时任务等待执行,大概有几十万个量级。我不确定这会如何影响 celery 的性能,所以它可能是也可能不是真正的骗局。到目前为止,系统似乎运行良好。

关于python - 为重复发生的事件安排提醒,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36274417/

相关文章:

python - 导入错误 : No module named statsmodels

c++ - 同时处理非成员函数指针和成员函数指针

python - celery ,用倒计时调用延迟

python - 即使在 pip install airflow[celery] 之后也未定义“CeleryExecutor”

python - 在 Python 中为道琼斯指数抓取雅虎财经

python - 我可以信任哪个工具?

python - 如何创建 Bokeh DataTable DateTime 格式化程序?

python - 如何使用 python 在 Django 应用程序中安排任务

linux - 是否可以安排一个自动化任务每小时在特定终端运行一次?

docker - 使用非root用户时,Docker Alpine,Celery(worker和beat)失败并出现PermissionError