django - 使用 Django 启动和停止定期后台任务

标签 django python-3.x task bots django-channels

我想用 Django 发送一个比特币通知。如果设法有一个工作的 Telegram 机器人在我要求他这样做时发送比特币统计数据。现在我希望他在比特币达到特定值时给我发消息。有一些教程在服务器上运行 python 脚本,但没有使用 Django。我阅读了一些关于 Django channel 的答案和描述,但无法将它们适应我的项目。

我想通过电报发送关于金额和持续时间的命令。然后 Django 将使用这些值和我在后台发送的 channel 的值启动一个进程。如果现在,在持续时间内达到了金额,Django 会向我的 channel 发回一条消息。这对于一个以上的人来说也应该是可能的。

这些是否可能与开箱即用的 Django 一起使用,也许与装饰器一起使用,或者我是否需要 django-channels 或其他东西?

编辑 2018-08-10: 也许我的代码能更好地解释我想做什么。

import requests
import json
from datetime import datetime

from django.shortcuts import render
from django.http import HttpResponse
from django.conf import settings

from django.views.generic import TemplateView
from django.views.decorators.csrf 
import csrf_exempt


class AboutView(TemplateView):
    template_name = 'telapi/about.html'


bot_token = settings.BOT_TOKEN


def get_url(method):
    return 'https://api.telegram.org/bot{}/{}'.format(bot_token, method)


def process_message(update):
    data = {}
    data['chat_id'] = update['message']['from']['id']
    data['text'] = "I can hear you!"
    r = requests.post(get_url('sendMessage'), data=data)


@csrf_exempt
def process_update(request, r_bot_token):
    ''' Method that is called from telegram-bot'''
    if request.method == 'POST' and r_bot_token == bot_token:
        update = json.loads(request.body.decode('utf-8'))
        if 'message' in update:
            if update['message']['text'] == 'give me news':
                new_bitcoin_price(update)
            else:
                process_message(update)
            return HttpResponse(status=200)


bitconin_api_uri = 'https://api.coinmarketcap.com/v2/ticker/1/?convert=EUR'
# response = requests.get(bitconin_api_uri)


def get_latest_bitcoin_price():
    response = requests.get(bitconin_api_uri)
    response_json = response.json()
    euro_price = float(response_json['data']['quotes']['EUR']['price'])
    timestamp = int(response_json['metadata']['timestamp'])
    date = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
    return euro_price, date


def new_bitcoin_price(update):
    data = {}
    data['chat_id'] = update['message']['from']['id']
    euro_price, date = get_latest_bitcoin_price()
    data['text'] = "Aktuel ({}) beträgt der Preis {:.2f}€".format(
        date, euro_price)
    r = requests.post(get_url('sendMessage'), data=data)

编辑 2018-08-13: 我认为解决方案是 celery-beat 和 channels。有谁知道好的教程吗?

最佳答案

我的一个队友使用 django-celery-beat,可在 https://github.com/celery/django-celery-beat 获得为此,他给了我一些很好的反馈。您可以使用 crontab 语法安排 celery 任务。

关于django - 使用 Django 启动和停止定期后台任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51767332/

相关文章:

django - 没有关系时计数并返回零的注释

python - 获取仅以空格开头的数据正则表达式 - Python

python-3.x - 复制 vim 中的行以在 ter 命令打开的上述窗口中执行?

c# - 在 C# 中同步繁重任务和轻任务

windows-7 - 如何说服 powershell(通过任务计划程序运行)找到我的网络驱动器?

Django 管理 ListView 在自定义列中显示外部 url 数据

mysql - 在 django ORM 中按 id 数组排序

Django 表单 : most DRY way to organize create/update forms for inherited models

python - 如何从深度差异中完全排除 "type_changes",因为我只关心值的变化?

Laravel 任务调度设置为每分钟运行一次,但只运行一次