django - 努力通过 Digital Ocean 上的 Celery Beat 将对象保存到 Django 数据库

标签 django redis celery digital-ocean supervisord

我正在努力通过 Celery Beat 将对象保存到我的 Django 应用程序(显示 OHLC 数据)。

此脚本在本地环境(保存 3M 对象)上运行良好,但在像 Digital Ocean 这样的 VPN 上运行不佳。它保存了一定数量的对象(大约 200K 个对象或 2GB),但随后它删除了其他对象以添加每个新对象,这完全令人困惑。

我的堆栈

  • Django
  • Redis
  • 主管
  • Ubuntu

我没有在本地使用 Supervisor,所以我认为这是导致问题但无法识别的原因。非常感谢任何反馈/帮助。

脚本

@periodic_task(
    # run_every=(crontab(minute='*/1')),
    run_every=(crontab(minute='*/60')),
    name="load_data",
    ignore_result=False
)
def load_data():
# Forex OHLC
TOKEN = MYTOKEN
con = fxcmpy.fxcmpy(access_token = TOKEN, log_level = 'error')
start = dt.datetime(2010, 1, 1)
stop = dt.datetime.today()
df = pd.DataFrame(list(DatasourceItem.objects.filter(datasource__sub_category__exact='Forex').values('symbol')))

for i in df['symbol']:

    datasource_item_obj = DatasourceItem.objects.get(symbol=i)

    Ohlc.objects.filter(datasource = datasource_item_obj).delete()

    if datasource_item_obj.base_symbol:
        base_symbol = datasource_item_obj.base_symbol
        tar_symbol = datasource_item_obj.tar_symbol
        mod_symbol = base_symbol + "/" + tar_symbol
        sys_symbol = base_symbol + tar_symbol
    else:
        sys_symbol = datasource_item_obj.symbol
        mod_symbol = datasource_item_obj.symbol

    data = con.get_candles(mod_symbol, period='D1', start=start, stop=stop)
    del data['askopen']
    del data['askclose']
    del data['askhigh']
    del data['asklow']
    del data['tickqty']
    data.columns = ['Open', 'Close', 'High', 'Low']
    data = data[['Open', 'High', 'Low',  'Close']]
    data.insert(loc=0, column='Symbol', value=sys_symbol)
    data.reset_index(level=0, inplace=True)
    data.dropna()
    # .values = return numpy array
    data_list = data.values.tolist()
    for row in data_list:
        new_price = Ohlc(time = row[0], symbol = row[1], open_price = row[2], high_price = row[3], low_price = row[4], close_price = row[5], datasource = datasource_item_obj)
        new_price.save()

# Stock OHLC
start = dt.datetime.now() - dt.timedelta(days=(365.25*5))
stop = dt.datetime.today()

df = pd.DataFrame(list(DatasourceItem.objects.filter(datasource__sub_category__exact='Stock').values('symbol')))
for i in df['symbol']:
    datasource_obj = DatasourceItem.objects.get(symbol=i)
    old_price = Ohlc.objects.filter(datasource = datasource_obj).delete()

    symbol = datasource_obj.symbol
    data = get_historical_data(symbol, start=start, stop=stop, output_format='pandas')
    del data['volume']
    data.columns = ['Open', 'High', 'Low', 'Close']
    data.insert(loc=0, column='Symbol', value=symbol)
    data.reset_index(level=0, inplace=True)
    data.dropna()
    data_list = data.values.tolist()
    for row in data_list:
        price = Ohlc(time = row[0], symbol = row[1], open_price = row[2], high_price = row[3], low_price = row[4], close_price = row[5], datasource = datasource_obj)
        price.save()

最佳答案

嘿,这是由于数据库中发生的事务数量而发生的,因此请尝试优化数据创建查询,例如,您可以使用批量创建而不是单独创建每个对象。

price_list
for row in data_list:
    price = Ohlc(time = row[0], symbol = row[1], open_price = row[2], high_price = row[3], low_price = row[4], close_price = row[5], datasource = datasource_obj)
    price_list.append(price)
Ohlc.objects.bulk(price_list)

可能不会一次性创建大的 set off 数据,然后将数据分成 1000 个 block 。

关于django - 努力通过 Digital Ocean 上的 Celery Beat 将对象保存到 Django 数据库,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52735572/

相关文章:

python - Django + Celery + Scrapy 扭曲 react 器(ReactorNotRestartable)和数据库(SSL 错误)错误

python - 如何使用模拟来测试 next_day_of_week 函数

django - Django Syncdb 可以处理压缩的 initial_data.json.tgz 装置吗?

django - 使用类似于 Rails JST 的 Django 管道分配 Backbone 模板?

amazon-web-services - 如何在不使用 stunnel 的情况下从加密的 Amazon ElastiCache Redis 服务器读取写入?

python - Celery .delay() 同步工作,不延迟

python - 使用注释和聚合而不重复

redis - Express-Gateway-基本身份验证/Redis性能问题

c# - 如何使用 StackExchange.Redis 进行基本 WATCH

python - celery 弦不等待子任务(一组链)