我正在尝试创建一些 SQLAlchemy 模型,并且正在努力解决如何将 timedelta
正确应用于特定列的问题。 timedelta
(以天为单位指定)作为整数存储在单独的表 (Shifts
) 中,并且对于我的 Exam
中的每条记录可能不同表。
如果我使用 hybrid_property
,我可以计算具有偏移日期的属性
from sqlalchemy import Column, DateTime, Integer, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.ext.associationproxy import association_proxy
class Exam(Base):
__table_args__ = {'schema': 'database.dbo'}
PT_ID = Column(Integer, primary_key=True)
# Unshifted date
START_DTTM = Column(DateTime)
shiftlink = relationship('Shifts', uselist=False)
# I want the DATE_ADD attribute to be easily accessible here
DATE_ADD = association_proxy('shiftlink', 'DATE_ADD')
@hybrid_property
def START_SHIFTED(self):
return self.START_DTTM + timedelta(days=self.DATE_ADD)
class Shifts(Base):
__table_args__ = {'schema': 'otherdb.dbo'}
PT_ID = Column(Integer, ForeignKey(Exam.PT_ID))
# Date shift in days (stored as an integer)
DATE_ADD = Column(Integer)
但是,我希望能够根据这个偏移日期过滤我的查询。
如果我按照上面写的那样尝试这样做,我会得到以下错误
TypeError: bad operand type for unary -: 'AssociationProxy'
为了解决这个问题,我尝试为混合属性定义以下 expression
@START_SHIFTED.expression
def START_SHIFTED(cls):
return func.dateadd(text('day'), cls.DATE_ADD, cls.START_DTTM)
当我尝试使用它进行过滤时,除了没有 Shifts
表 JOIN
之外,实际查询看起来有点正确/p>
from datetime import datetime
query = session.query(Exam).filter(Exam.START_SHIFTED < datetime.now())
print query
# SELECT ...
# FROM [database].[dbo].[exam]
# WHERE dateadd(day, %(dateadd_1)s, [database].[dbo].[exam].[BEGIN_DTTM]) < $(dateadd_2)s
但是当我尝试运行查询时,出现以下错误:
query.count()
TypeError: Boolean value of this clause is not defined
我觉得我一定错过了一些简单的东西。我基本上只是希望移动日期的行为就像对 datetime
对象的内置支持一样。我是否应该使用 AssociationProxy
以外的东西的 column_property
来定义 Exam.DATE_ADD
?
就其值(value)而言,我的数据库引擎是 SQL Server 2012。
最佳答案
为了解决这个问题,我最终创建了一个重载了 bind_expression
的 types.DateTime
类型版本和 column_expression
应用日期偏移的方法。此外,我必须创建一个自定义构造函数,它接受包含日期偏移的列作为输入,然后将其存储以供 column_expression
和 bind_expression
方法使用。
此自定义类型会将移位应用到从数据库接收的“解码”值,并在执行任何类型的过滤时减去移位,以确保始终如一地应用移位。
from sqlalchemy import types, func, text
class ShiftedDateTime(types.TypeDecorator):
impl = types.DateTime
def __init__(self, column, *args, **kwargs):
self.offset = column
super(ShiftedDateTime, self).__init__(*args, **kwargs)
def bind_expression(self, value):
if value is None:
return None
return func.dateadd(text('DAY'), self.offset, value)
def column_expression(self, col):
return func.dateadd(text('DAY'), -self.offset, col)
class Shifts(Base):
PT_ID = Column(Integer, primary_key=True)
# Date shift in days (stored as an integer)
DATE_ADD = Column(Integer)
class Exam(Base):
PT_ID = Column(Integer, ForeignKey(Shifts.PT_ID))
# Shifted start time
START_DTTM = Column(ShiftedDateTime(Shifts.DATE_ADD))
shiftlink = relationship(Shifts)
这种方法的唯一缺点是必须为任何查询执行显式 JOIN
,因为新类型需要 DATE_ADD
列。这显然是必要的,但如果可以执行隐式连接就更好了。
session.query(Exam).join(Shifts).filter(Exam.START_DTTM < datetime.now())
关于python - 表示 SQLAlchemy 中偏移日期的混合属性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43212410/