我正在运行一个带有 postgreSQL 数据库的 django 应用程序,我正在尝试向数据库发送一个非常大的字典(由时间序列数据组成)。
我的目标是尽快将我的数据写入数据库。我正在使用库请求通过 API 调用(使用 django REST 构建)发送数据:
我的 API View 很简单:
@api_view(["POST"])
def CreateDummy(request):
for elem, ts in request.data['time_series'] :
TimeSeries.objects.create(data_json=ts)
msg = {"detail": "Created successfully"}
return Response(msg, status=status.HTTP_201_CREATED)
request.data['time_series']
是一个巨大的字典,结构如下:{Building1: {1:123, 2: 345, 4:567 .... 31536000: 2345}, .... Building30: {..... }}
这意味着我有 30 个键有 30 个值,而每个值都是一个包含 31536000 个元素的字典。 我的 API 请求如下所示(其中 data 是我上面描述的字典):
payload = {
"time_series": data,
}
requests.request(
"post", url=endpoint, json=payload
)
该代码将时间序列数据保存到后端的 jsonb 字段中。现在,如果我只循环遍历字典的前 4 个元素,就可以了。我可以在大约 1 分钟内获得这些数据。但是当我遍历整个字典时,我的开发服务器关闭了。估计是内存不够。我得到一个 requests.exceptions.ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))
.整个 dict 在开始迭代之前是否保存到内存中?我对此表示怀疑,因为我在 python3 循环中读到了 .items()
返回一个迭代器,是执行此操作的首选方法。有没有更好的方法来处理 django/python 中的大量字典?我应该先循环一半然后再循环另一半吗?或者有没有更快的方法?也许使用
pandas
?或者可能以不同的方式发送数据?我想我正在寻找最高效的方法来做到这一点。如果需要,很乐意提供更多代码。
非常感谢任何帮助、提示或指南!提前致谢
EDIT2:我认为这不是我的 RAM 使用量或 dict 的大小。当服务器关闭时,我还有 5GiB 的 RAM。 ~~而且dict的大小是1176bytes~~ dict要大很多,看评论
EDIT3:我什至无法打印巨大的字典。然后它也会关闭
EDIT4:当拆分数据并一次性发送时,服务器可以处理它。但是当我尝试查询它时,服务器再次中断。它在我的生产服务器(nginx AWS RDS 设置)上中断,在我的本地开发服务器上中断。我很确定这是因为 django 无法处理我当前设置的大查询。但是我怎么能解决这个问题呢?
EDIT5:所以我正在寻找的是一个两部分的解决方案。一种用于创建数据,一种用于查询数据。我上面描述的数据的创建。但即使我将所有数据都放入数据库中,我仍然会遇到问题。
我尝试通过不是一起创建数据而是单独创建每个时间序列来实现这一点。所以让我们假设我的数据库中有这么大的数据,我尝试查询回来。所有时间序列对象都属于一个网络,所以我试过这样:
class TimeSeriesByTypeAndCreationMethod(ListAPIView):
"""Query time-series in specific network."""
serializer_class = TimeSeriesSerializer
def get_queryset(self):
"""Query time-series
Query by name of network, type of data, creation method and
source.
"""
network = self.kwargs["name_network"]
if TimeSeries.objects.filter(
network_element__network__name=network,
).exists():
time_series = TimeSeries.objects.filter(
network_element__network__name=network,
)
return time_series
else:
raise NotFound()
但是查询会像之前的数据创建一样破坏服务器。我认为这也是太多的数据负载。我想我可以使用原始 sql 避免破坏服务器......或者还有更好的方法吗?EDIT6:相关模型:
class TimeSeries(models.Model):
TYPE_DATA_CHOICES = [
....many choices...
]
CREATION_METHOD_CHOICES = [
....many choices...
]
description = models.CharField(
max_length=120,
null=True,
blank=True,
)
network_element = models.ForeignKey(
Building,
on_delete=models.CASCADE,
null=True,
blank=True,
)
type_data = models.CharField(
null=True,
blank=True,
max_length=30,
choices=TYPE_DATA_CHOICES,
)
creation_method = models.CharField(
null=True,
blank=True,
max_length=30,
choices=CREATION_METHOD_CHOICES,
)
source = models.CharField(
null=True,
blank=True,
max_length=300
)
data_json = JSONField(
help_text="Data for time series in JSON format. Valid JSON expected."
)
creation_date = models.DateTimeField(auto_now=True, null=True, blank=True)
def __str__(self):
return f"{self.creation_method}:{self.type_data}"
class Building(models.Model):
USAGE_CHOICES = [
...
]
name = models.CharField(
max_length=120,
null=True,
blank=True,
)
street = models.CharField(
max_length=120,
null=True,
blank=True,
)
house_number = models.CharField(
max_length=20,
null=True,
blank=True,
)
zip_code = models.CharField(
max_length=5,
null=True,
blank=True,
)
city = models.CharField(
max_length=120,
null=True,
blank=True,
)
usage = models.CharField(
max_length=120,
choices=USAGE_CHOICES,
null=True,
blank=True,
)
.....many more fields....
最佳答案
您可以使用两种技术解决您的问题。
数据创建
使用bulk_create插入大量记录,如果由于查询量过大等导致SQL错误,则提供batch_size
在 bulk_create
.
records = []
for elem, ts in request.data['time_series'] :
records.append(
TimeSeries(data_json=ts)
)
# setting batch size t 1000
TimeSeries.objects.bulk_create(records, batch_size=1000)
对bulk_create 有一些警告,比如它不会生成信号,其他人在Doc 中看到更多信息。数据检索
配置rest框架以使用分页默认配置
REST_FRAMEWORK = {
'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.LimitOffsetPagination',
'PAGE_SIZE': 100
}
对于自定义配置使用class TimeSeriesResultsSetPagination(PageNumberPagination):
page_size = 50
page_size_query_param = 'page_size'
max_page_size = 10000
class BillingRecordsView(generics.ListAPIView):
serializer_class = TimeSeriesSerializer
pagination_class = TimeSeriesResultsSetPagination
def get_queryset(self):
"""Query time-series
Query by name of network, type of data, creation method and
source.
"""
network = self.kwargs["name_network"]
if TimeSeries.objects.filter(
network_element__network__name=network,
).exists():
time_series = TimeSeries.objects.filter(
network_element__network__name=network,
)
return time_series
else:
raise NotFound()
请参阅 https://www.django-rest-framework.org/api-guide/pagination/ 的其他分页技术
关于python - 通过 API 调用发送大字典会中断开发服务器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64327172/