python - 在 Django REST ListAPI View 中对原始 SQL 查询进行分页的最佳方法?

标签 python sql django pagination django-rest-framework

我有一个原始 SQL 查询,用于为 Django REST ListAPI View 构建查询集。大致如下(请原谅无意义的名字):

class MyView(ListAPIView):
    serializer_class = MySerializer
    paginate_by = 10
    def get_queryset(self):
        params = {
            "uid": str(self.request.user.id),
            "param": str(self.kwargs['param'])
        }
        query = 'SELECT f.id ' \
            'FROM myapp_foo f, myapp_bar b ' \
            'WHERE b.foo_id = f.id AND ' \
            'b.param >= %(param)s AND ' \
            'f.dt_tm >= NOW() AND ' \
            '(SELECT COUNT(*) FROM myapp_baz z ' \
            'WHERE z.user_id = %(uid)s AND ' \
            'z.qux_id = f.qux_id) = 0 ' \
            'ORDER BY f.dt_tm;'
        return Foo.objects.raw(query, params)

这给出了错误:

object of type 'RawQuerySet' has no len()

我想用类似的 SQL 查询计算计数,然后使用 LIMIT 和 OFFSET 参数进行分页。我已经阅读了一些建议,其中对列表项进行计数以获得 len 但这似乎并不令人满意,因为除非查询中有一个小的 LIMIT(这在任何情况下都会破坏分页的目的),否则效率很低。

更新: 我刚刚注意到 paginate_by 正在等待弃用。

首先,我将如何向返回的对象添加计数方法?

最佳答案

比其他替代方案更有效的解决方案是编写您自己的 RawQuerySet 替代品。我在下面显示代码,但您也可以 access it as a gist here .绝对不能保证没有错误;尽管如此,我还是在 Python 3 上的 Django 1.11 中使用它(使用 PostgreSQL 作为数据库;也应该使用 MySQL)。简而言之,此类将相应的 LIMITOFFSET 子句添加到您的原始 SQL 查询中。没有什么特别的,只是一些简单的字符串连接,所以请确保不要在原始 SQL 查询中包含这些子句。

from django.db import models
from django.db.models import sql
from django.db.models.query import RawQuerySet


class PaginatedRawQuerySet(RawQuerySet):
    def __init__(self, raw_query, **kwargs):
        super(PaginatedRawQuerySet, self).__init__(raw_query, **kwargs)
        self.original_raw_query = raw_query
        self._result_cache = None

    def __getitem__(self, k):
        """
        Retrieves an item or slice from the set of results.
        """
        if not isinstance(k, (slice, int,)):
            raise TypeError
        assert ((not isinstance(k, slice) and (k >= 0)) or
                (isinstance(k, slice) and (k.start is None or k.start >= 0) and
                 (k.stop is None or k.stop >= 0))), \
            "Negative indexing is not supported."

        if self._result_cache is not None:
            return self._result_cache[k]

        if isinstance(k, slice):
            qs = self._clone()
            if k.start is not None:
                start = int(k.start)
            else:
                start = None
            if k.stop is not None:
                stop = int(k.stop)
            else:
                stop = None
            qs.set_limits(start, stop)
            return qs

        qs = self._clone()
        qs.set_limits(k, k + 1)
        return list(qs)[0]

    def __iter__(self):
        self._fetch_all()
        return iter(self._result_cache)

    def count(self):
        if self._result_cache is not None:
            return len(self._result_cache)

        return self.model.objects.count()

    def set_limits(self, start, stop):
        limit_offset = ''

        new_params = tuple()
        if start is None:
            start = 0
        elif start > 0:
            new_params += (start,)
            limit_offset = ' OFFSET %s'
        if stop is not None:
            new_params = (stop - start,) + new_params
            limit_offset = 'LIMIT %s' + limit_offset

        self.params = self.params + new_params
        self.raw_query = self.original_raw_query + limit_offset
        self.query = sql.RawQuery(sql=self.raw_query, using=self.db, params=self.params)

    def _fetch_all(self):
        if self._result_cache is None:
            self._result_cache = list(super().__iter__())

    def __repr__(self):
        return '<%s: %s>' % (self.__class__.__name__, self.model.__name__)

    def __len__(self):
        self._fetch_all()
        return len(self._result_cache)

    def _clone(self):
        clone = self.__class__(raw_query=self.raw_query, model=self.model, using=self._db, hints=self._hints,
                               query=self.query, params=self.params, translations=self.translations)
        return clone

如何使用

自定义管理器

我正在通过自定义管理器使用上面设置的查询:

class MyModelRawManager(models.Manager):
    def raw(self, raw_query, params=None, translations=None, using=None):
        if using is None:
            using = self.db
        return PaginatedRawQuerySet(raw_query, model=self.model, params=params, translations=translations, using=using)

    def my_raw_sql_method(some_arg):
        # set up your query and params
        query = 'your query'
        params = ('your', 'params', 'tuple')
        return self.raw(raw_query=query, params=params)

自定义分页类

为了完成,我还包括了一个分页类:

from rest_framework.pagination import PageNumberPagination


class MyModelResultsPagination(PageNumberPagination):
    """Fixed page-size pagination with 10 items."""
    page_size = 10
    max_page_size = 10

你的 ListAPIView

class MyModelView(generics.ListAPIView):

    serializer_class = MyModelSerializer
    pagination_class = MyModelResultsPagination

    def get_queryset(self):
        return MyModel.raw_manager.my_raw_sql_method(some_arg)

忠告

PaginatedRawQuerySet 类虽然对我有用,但尚未经过广泛测试,但我相信它确实提供了一个比选择所有项目更有效的解决方案的想法在每次调用的查询集中。

您可能会注意到有一个自定义的 count 方法实现(最初在 RawQuerySet 中缺失),它是通过调用 self.model.objects.count 来计算的()。如果没有此方法,分页器将评估 len(your_raw_queryset),这对性能的影响与其他答案相同。

此类不是 RawQuerySet 的通用替代品,这意味着您应该添加自己的自定义项以使其满足您的需求。

例如,如果您需要更复杂的东西,您可以向 PaginatedRawQuerySet 类添加另一个属性,称为 raw_count_query,然后在 count 中调用它() 而不是像现在这样计算所有对象(这将在您需要过滤的情况下使用;raw_count_query 将提供 SQL 以根据您的条件计算子集)。

关于python - 在 Django REST ListAPI View 中对原始 SQL 查询进行分页的最佳方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32191853/

相关文章:

python - 如何在python中编写复杂的排序?

sql - 如何在 PostgreSQL 中仅 self 加入一部分行?

SQL 选择 id 在哪里 `column`

python - 自动更新字段的最佳做法是什么?

css - 如何将 css 类添加到 Django 管理表单

android - 需要帮助使用 Google Oauth 和 Android 配置 Django Rest API

python - 打破 python 多处理管理器列表

Python 基于磁盘的字典

python - 然后在给定范围内替换numpy数组元素

sql - 克服 SQL 查询中的不明确字段错误