python - 如何合并两个 django 数据库?

标签 python django database-migration

我有同一个 Django 应用程序的两个实例。我需要将这些数据库中的数据合并到一个数据库中。

我考虑过自然键固定装置,但我有许多对象,其自然键涉及相关模型中的字段,因此当我使用自然键序列化时,它们不会被序列化。

例如

class Account(models.Model):
    #fields...
    user = models.OneToOneField(User, primary_key=True, parent_link=True)
    image = models.CharField(max_length=512)
    def natural_key(self):
        return self.user.natural_key()
    natural_key.dependencies = ["User"]

此帐户的固定装置:

 >./manage.py dumpdata --natural-primary --natural-foreign backend.Account
 $ {"fields": {"country": "Angola", "birth": "1991-02-18", "facebook_id": 213321}, "model": "backend.account", "pk": 547}

(注意它不包含它的自然键,因为它不是模型的一部分)

合并两个相同 Django 应用程序数据的最佳方法是什么?

编辑:

为了澄清,我有我的数据,并且实例具有不同的值,但(在某些情况下)相同的主键 - 因为它们位于不同的数据库中。我希望将所有数据(实例及其相关对象)放在同一个数据库中。

最佳答案

我遇到了类似的问题,两个相同的网站共享多个模型,但对象不同。 不幸的是,通过natural_key 合并它们是不可能的。

我必须实现一个类似于 loaddata 的新 django 命令,以便将第二个网站中的所有模型附加到第一个网站中。

import os
import warnings
from collections import defaultdict

from django.core.management import CommandError
from django.core.management.utils import parse_apps_and_model_labels
from django.core.management.commands.loaddata import Command as LoadDataCommand, humanize
from django.core.management.color import no_style
from django.db import (
    DEFAULT_DB_ALIAS, DatabaseError, IntegrityError, connections, router
)

from django.core import serializers
from django.db import transaction
from django.db.models.fields.related import RelatedField, ManyToManyField


class Command(LoadDataCommand):
    help = 'Installs the named fixture(s) in the database.'
    missing_args_message = (
        "No database fixture specified. Please provide the path of at least "
        "one fixture in the command line."
    )

    def add_arguments(self, parser):
        parser.add_argument('args', metavar='fixture', nargs='+', help='Fixture labels.')
        parser.add_argument(
            '--database', default=DEFAULT_DB_ALIAS,
            help='Nominates a specific database to load fixtures into. Defaults to the "default" database.',
        )
        parser.add_argument(
            '--app', dest='app_label',
            help='Only look for fixtures in the specified app.',
        )
        parser.add_argument(
            '-e', '--exclude', action='append', default=[],
            help='An app_label or app_label.ModelName to exclude. Can be used multiple times.',
        )
        parser.add_argument(
            '--format',
            help='Format of serialized data when reading from stdin.',
        )

    # TODO delete equals to overridden
    def handle(self, *fixture_labels, **options):
        self.using = options['database']
        self.app_label = options['app_label']
        self.verbosity = options['verbosity']
        self.excluded_models, self.excluded_apps = parse_apps_and_model_labels(options['exclude'])
        self.format = options['format']

        with transaction.atomic(using=self.using):
            self.appenddata(fixture_labels)

        # Close the DB connection -- unless we're still in a transaction. This
        # is required as a workaround for an edge case in MySQL: if the same
        # connection is used to create tables, load data, and query, the query
        # can return incorrect results. See Django #7572, MySQL #37735.
        if transaction.get_autocommit(self.using):
            connections[self.using].close()

    def appenddata(self, fixture_labels):
        # Most of the code is used only to manage transaction and fixture file format reuser it and override load_label instead
        self.loaddata(fixture_labels)

    def load_label(self, fixture_label):
        """Load fixtures files for a given label."""
        self.objs_idx = ObjectDict()
        self.objects = []
        self.show_progress = self.verbosity >= 3
        self.deferred_m2m = []

        for fixture_file, fixture_dir, fixture_name in self.find_fixtures(fixture_label):
            _, ser_fmt, cmp_fmt = self.parse_name(os.path.basename(fixture_file))
            open_method, mode = self.compression_formats[cmp_fmt]
            fixture = open_method(fixture_file, mode)
            try:
                self.fixture_count += 1
                objects_in_fixture = 0
                loaded_objects_in_fixture = 0
                if self.verbosity >= 2:
                    self.stdout.write(
                        "Installing %s fixture '%s' from %s."
                        % (ser_fmt, fixture_name, humanize(fixture_dir))
                    )

                objects = serializers.deserialize(
                    ser_fmt, fixture, using=self.using, ignorenonexistent=True,
                    handle_forward_references=False,
                )

                for obj in objects:
                    objects_in_fixture += 1
                    if (obj.object._meta.app_config in self.excluded_apps or
                            type(obj.object) in self.excluded_models):
                        continue
                    if router.allow_migrate_model(self.using, obj.object.__class__):
                        loaded_objects_in_fixture += 1
                        self.models.add(obj.object.__class__)
                        # Load all fixture in memory
                        self.objs_idx.append_deserialized_object(obj)
                        self.objects.append(obj)
                    if obj.deferred_fields:
                        self.objs_with_deferred_fields.append(obj)
                if objects and self.show_progress:
                    self.stdout.write('')  # add a newline after progress indicator
                self.loaded_object_count += loaded_objects_in_fixture
                self.fixture_object_count += objects_in_fixture
            except Exception as e:
                if not isinstance(e, CommandError):
                    e.args = ("Problem installing fixture '%s': %s" % (fixture_file, e),)
                raise e
            finally:
                fixture.close()

            # Warn if the fixture we loaded contains 0 objects.
            if objects_in_fixture == 0:
                warnings.warn(
                    "No fixture data found for '%s'. (File format may be "
                    "invalid.)" % fixture_name,
                    RuntimeWarning
                )
        # Once you have all object in memory you can load them
        for obj in self.objects:
            self.process_object(obj.object)
        if self.verbosity >= 1:
            self.stdout.write('... All objects saved ...')
        # Once all objects have been save (append mode) and new pks have been assigned add m2m relations
        for obj, field_attname, related_pk in self.deferred_m2m:
            attr = getattr(obj, field_attname)
            attr.add(related_pk)
            if self.verbosity >= 3:
                self.stdout.write('Adding relation for field {0}: {1} -> {2}'.format(field_attname, obj.pk, related_pk))
        # Disabled for security reason
        # raise ValueError('Disabled')

    def process_object(self, obj):
        if obj is None:
            raise ValueError('None object in process object')

        old_pk = obj.pk
        new_pk = self.objs_idx[obj]['new_pk']
        # Object has been save yet no work
        if new_pk:
            return new_pk

        self.manage_related_field(obj)

        if self.verbosity >= 2:
            self.stdout.write('Saving object: (%s, %s)' % (obj.__class__, obj))
        obj.pk = None
        try:
            obj.save(using=self.using)
            if self.show_progress:
                self.stdout.write(
                    '\rSaving object: (%s, %s)' % (obj.__class__, obj),
                    ending=''
                )
        # psycopg2 raises ValueError if data contains NUL chars.
        except (DatabaseError, IntegrityError, ValueError) as e:
            e.args = ("Could not load %(app_label)s.%(object_name)s(pk=%(pk)s): %(error_msg)s" % {
                'app_label': obj.object._meta.app_label,
                'object_name': obj.object._meta.object_name,
                'pk': obj.object.pk,
                'error_msg': e,
            },)
            raise
        self.objs_idx.data[obj._meta.model][old_pk]['new_pk'] = obj.pk
        return obj.pk

    def manage_related_field(self, obj):
        related_fields = [field for field in obj._meta.get_fields() if isinstance(field, RelatedField)]
        if len(related_fields) > 0:  # has not related field
            for field in related_fields:
                if field.related_model in self.excluded_models:
                    continue

                if type(field) is ManyToManyField:
                    attr = getattr(obj, field.attname)
                    attr.clear()
                    m2m_pks = self.objs_idx[obj]['deserialized_object'].m2m_data[field.name]
                    for m2m_pk in m2m_pks:
                        related_obj = self.objs_idx.data[field.related_model][m2m_pk]['object']
                        new_related_pk = self.process_object(related_obj)
                        self.deferred_m2m.append((obj, field.attname, new_related_pk))
                        # attr.add(new_related_pk)
                else:
                    related_obj = self.objs_idx.data[field.related_model][getattr(obj, field.attname)]['object']
                    if related_obj is not None:
                        new_related_pk = self.process_object(related_obj)
                        setattr(obj, field.attname, new_related_pk)


class ObjectDict(object):
    """
    Dictionary to easily retrieve fixture object based on class and their original primary key
    """

    def __init__(self):
        self.data = defaultdict(lambda: defaultdict(lambda: {'new_pk': None, 'object': None}))

    @staticmethod
    def from_deserialized_objects(deserialized_objects):
        instance = ObjectDict()
        for deserialized_object in deserialized_objects:
            instance.append_deserialized_object(deserialized_object)
        return instance

    def __getitem__(self, item):
        return self.data[item._meta.model][item.old_pk]

    def append_deserialized_object(self, deserialized_object):
        obj = deserialized_object.object
        setattr(obj, 'old_pk', obj.pk)
        self[obj]['object'] = obj
        self[obj]['deserialized_object'] = deserialized_object

我建议您使用此 TestCase 测试 django 默认测试数据库中的所有内容。 它使用 django 应用程序中预转储的固定文件(使用 python manage.py dumpdata)来填充测试数据库,然后应用自定义命令来附加来自其他数据库固定转储的所有对象。

from collections import defaultdict

from django.core import serializers
from django.core.management.utils import parse_apps_and_model_labels
from django.test import Client, TestCase
from utils.tests import reverse
from django.core.management import call_command
from django.test import TestCase
from django.apps import apps

from apps.commons.accounts.models import User
from apps.commons.accounts.tests import MultiUserTestCase

class TestCustomCommands(TestCase):
    # Create fixture from the primary database to test everything
    fixtures = ['tmp/dump/test_append_data_fixtures_pre.json']

    def test_appenddata(self):
        fixture_to_import = 'tmp/dump/fixtures_to_import.json'
        excludes = ['sites.Site']
        # Counts objects before appenddata per model
        count_pre = {}
        for model in apps.get_models():
            count_pre[model] = model.objects.count()

        self.excluded_models, self.excluded_apps = parse_apps_and_model_labels(excludes)

        # Counts objects to append per model
        with open(fixture_to_import, 'r') as f:
            objects = serializers.deserialize('json', f, ignorenonexistent=True)
            count_new = defaultdict(lambda: 0)
            for obj in objects:
                if obj.object._meta.model in excludes:
                    continue
                count_new[obj.object._meta.model] += 1

        command = ['appenddata', fixture_to_import]
        for exclude in excludes:
            command += ['-e', exclude]
        command += ['-v', '0']
        call_command(*command)

        # Verify with count that all objects have been imported
        for model in apps.get_models():
            self.assertEqual(count_pre[model] + count_new[model], model.objects.count(), msg='Count mismatch for model %s' % model)

测试示例(将 Website2 中的数据附加到 Website1 中):

# Website 1
python manage.py dumpdata app1 app2 ... > test_append_data_fixtures_pre.json

# Website 2
python manage.py dumpdata app1 app2 ... > fixture_to_import.json

# Website 1, run the provided test
python manage.py test TestCustomCommands.test_appenddata

使用示例(将网站2中的数据附加到网站1中):

# Website 1
python manage.py appenddata fixture_to_import.json

关于python - 如何合并两个 django 数据库?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37908224/

相关文章:

python - 使用队列以扭曲方式下载文件

python - 用于选择数据的 Pandas 源

django - django独特领域

python - 根据其他字段重新计算pandas数据框字段的更好方法

jquery - Django Ajax/Jquery 在计时器上刷新 DIV 元素

python - 更改 Django 表单中的字段,覆盖 clean()

django - 比萨pdf转换器在使用大表时非常慢

linux - pg_upgrade oldbindir 检查失败

php - Laravel : Migrations & Seeding for production data

php - 如何在laravel 5.4 的迁移文件中使用原始mysql 语句?