配置关系连接方式


relationship() 通常会通过检查两个表之间的外键关系来确定应该比较哪些列,从而在两个表之间创建连接。在各种情况下,需要自定义此行为。


处理多个 Join 路径


最常见的情况之一是两个 table 之间有多个外键路径。


考虑一个 Customer 类,它包含一个 Address 的两个外键 类:

from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship


class Base(DeclarativeBase):
    pass


class Customer(Base):
    __tablename__ = "customer"
    id = mapped_column(Integer, primary_key=True)
    name = mapped_column(String)

    billing_address_id = mapped_column(Integer, ForeignKey("address.id"))
    shipping_address_id = mapped_column(Integer, ForeignKey("address.id"))

    billing_address = relationship("Address")
    shipping_address = relationship("Address")


class Address(Base):
    __tablename__ = "address"
    id = mapped_column(Integer, primary_key=True)
    street = mapped_column(String)
    city = mapped_column(String)
    state = mapped_column(String)
    zip = mapped_column(String)


当我们尝试使用上面的映射时,将产生错误:

sqlalchemy.exc.AmbiguousForeignKeysError: Could not determine join
condition between parent/child tables on relationship
Customer.billing_address - there are multiple foreign key
paths linking the tables.  Specify the 'foreign_keys' argument,
providing a list of those columns which should be
counted as containing a foreign key reference to the parent table.


上面的信息很长。relationship() 可以返回许多潜在的消息,这些消息经过精心定制以检测各种常见的配置问题;大多数会建议解决歧义或其他缺失信息所需的额外配置。


在这种情况下,消息希望我们限定每个 relationship() 通过指示每个 API 应该考虑哪个外键列,以及 适当的表格如下:

class Customer(Base):
    __tablename__ = "customer"
    id = mapped_column(Integer, primary_key=True)
    name = mapped_column(String)

    billing_address_id = mapped_column(Integer, ForeignKey("address.id"))
    shipping_address_id = mapped_column(Integer, ForeignKey("address.id"))

    billing_address = relationship("Address", foreign_keys=[billing_address_id])
    shipping_address = relationship("Address", foreign_keys=[shipping_address_id])


上面,我们指定了 foreign_keys 参数,它是一个 ColumnColumn 对象列表,指示那些列被视为“外部”,或者换句话说,包含引用父表的值的列。从 Customer 加载 Customer.billing_address 关系 object 将使用 billing_address_id 中存在的值来标识 Address 中要加载的行;同样,shipping_address_id 用于 shipping_address 关系。在持久化期间,两列的链接也起着作用;在刷新期间,刚刚插入的 Address 对象新生成的主键将被复制到关联 Customer 对象的相应外键列中。


当使用 Declare 指定 foreign_keys 时,我们也可以使用字符串名称来指定,但是重要的是,如果使用列表,则列表 是字符串的一部分

billing_address = relationship("Address", foreign_keys="[Customer.billing_address_id]")


在这个特定示例中,该列表在任何情况下都不是必需的,因为我们只需要一个 Column

billing_address = relationship("Address", foreign_keys="Customer.billing_address_id")


警告


当作为 Python 可评估字符串传递时, relationship.foreign_keys 参数使用 Python 的 eval() 函数。不要将不受信任的输入传递给此字符串。看 关系参数的评估 有关 relationship() 参数的声明性评估的详细信息。


指定备用连接条件


构造联接时 relationship() 的默认行为 是它将 Primary Key 列的值相等 一侧与外键引用列的一侧。 我们可以将此标准更改为我们想要的任何条件,使用 relationship.primaryjoin 参数以及 relationship.SecondaryJoin 参数。


在下面的示例中,使用 User 类以及存储街道地址的 Address 类,我们创建了一个关系boston_addresses,该关系仅加载指定 “Boston” 城市的 Address 对象:

from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "user"
    id = mapped_column(Integer, primary_key=True)
    name = mapped_column(String)
    boston_addresses = relationship(
        "Address",
        primaryjoin="and_(User.id==Address.user_id, Address.city=='Boston')",
    )


class Address(Base):
    __tablename__ = "address"
    id = mapped_column(Integer, primary_key=True)
    user_id = mapped_column(Integer, ForeignKey("user.id"))

    street = mapped_column(String)
    city = mapped_column(String)
    state = mapped_column(String)
    zip = mapped_column(String)


在这个字符串 SQL 表达式中,我们使用 and_() 连接结构为连接条件建立两个不同的谓词 - 将 User.id 列和 Address.user_id 列相互连接,以及将 Address 中的行限制为仅 city='Boston'。当使用声明式时,像 and_() 这样的基本 SQL 函数会自动在字符串 relationship() 的 evaluated 命名空间中可用 论点。


警告


当作为 Python 可评估字符串传递时, relationship.primaryjoin 参数使用 Python 的 eval() 函数。不要将不受信任的输入传递给此字符串。看 关系参数的评估 有关 relationship() 参数的声明性评估的详细信息。


我们在 relationship.primaryjoin 中使用的自定义条件 通常仅在 SQLAlchemy 在 order 加载或表示此关系。也就是说,它用于 为了执行 per-attribute 而发出的 SQL 语句 延迟加载,或者在查询时构造联接时,例如 via Select.join(),或通过预先加载的 “joined” 或 “subquery” 样式。当 in-memory 对象作时,我们可以将任何我们想要的 Address 对象放入 boston_addresses 集合,而不管 .city 属性的值是什么 是。 这些对象将保留在集合中,直到 属性过期并从数据库中重新加载,其中 准则。 当 flush 发生时, boston_addresses 将被无条件刷新,将主键 user.id 列的值分配给外键持有 每行address.user_id列。city 条件在这里不起作用,因为 flush 进程只关心将主键值同步到引用外键值。


创建自定义外部条件


主联接条件的另一个元素是如何确定那些被视为 “foreign” 的列。通常,Column 对象的某些子集将指定 ForeignKey,或者成为与连接条件相关的 ForeignKeyConstraint 的一部分。 relationship() 在决定时查看此外键状态 它应该如何加载和保留此关系的数据。 但是, relationship.primaryjoin 参数可用于创建不涉及任何 “schema” 级别外键的联接条件。我们可以将 relationship.primaryjoin 以及 relationship.foreign_keysrelationship.remote_side 来建立这样的联接。


在下图中,一个类 HostEntry 联接到自身,使字符串内容相等 column 设置为 ip_address 列,该列是名为 IINT 的 PostgreSQL 类型。我们需要使用 cast() 以便将 join 的一侧转换为另一侧的类型:

from sqlalchemy import cast, String, Column, Integer
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import INET

from sqlalchemy.orm import DeclarativeBase


class Base(DeclarativeBase):
    pass


class HostEntry(Base):
    __tablename__ = "host_entry"

    id = mapped_column(Integer, primary_key=True)
    ip_address = mapped_column(INET)
    content = mapped_column(String(50))

    # relationship() using explicit foreign_keys, remote_side
    parent_host = relationship(
        "HostEntry",
        primaryjoin=ip_address == cast(content, INET),
        foreign_keys=content,
        remote_side=ip_address,
    )


上述关系将产生如下连接:

SELECT host_entry.id, host_entry.ip_address, host_entry.content
FROM host_entry JOIN host_entry AS host_entry_1
ON host_entry_1.ip_address = CAST(host_entry.content AS INET)


上述语法的另一种语法是使用 foreign()remote()注解,内联在 relationship.primaryjoin 表达式中。此语法表示 relationship() 通常单独应用于给定 relationship.foreign_keysrelationship.remote_side 参数。当存在显式连接条件时,这些函数可能更简洁,并且还用于准确标记“外部”或“远程”列,而与该列是多次声明还是在复杂的 SQL 表达式中声明无关:

from sqlalchemy.orm import foreign, remote


class HostEntry(Base):
    __tablename__ = "host_entry"

    id = mapped_column(Integer, primary_key=True)
    ip_address = mapped_column(INET)
    content = mapped_column(String(50))

    # relationship() using explicit foreign() and remote() annotations
    # in lieu of separate arguments
    parent_host = relationship(
        "HostEntry",
        primaryjoin=remote(ip_address) == cast(foreign(content), INET),
    )


在 join 条件中使用自定义运算符


关系的另一个用例是使用自定义运算符,例如 PostgreSQL 的“包含在”<< 运算符,当与 INETCIDR 等类型联接时。对于自定义布尔运算符,我们使用 Operators.bool_op() 函数:

inet_column.bool_op("<<")(cidr_column)


像上面的比较可以直接与 relationship.primaryJoin 时:

class IPA(Base):
    __tablename__ = "ip_address"

    id = mapped_column(Integer, primary_key=True)
    v4address = mapped_column(INET)

    network = relationship(
        "Network",
        primaryjoin="IPA.v4address.bool_op('<<')(foreign(Network.v4representation))",
        viewonly=True,
    )


class Network(Base):
    __tablename__ = "network"

    id = mapped_column(Integer, primary_key=True)
    v4representation = mapped_column(CIDR)


上面,查询如下:

select(IPA).join(IPA.network)


将渲染为:

SELECT ip_address.id AS ip_address_id, ip_address.v4address AS ip_address_v4address
FROM ip_address JOIN network ON ip_address.v4address << network.v4representation


基于 SQL 函数的自定义运算符


Operators.op.is_comparison用例的一个变体是当我们不使用运算符,而是使用 SQL 函数时。此用例的典型示例是 PostgreSQL PostGIS 函数,但是,任何数据库上解析为二进制条件的任何 SQL 函数都可能适用。为了适应此用例,该方法 FunctionElement.as_comparison() 可以修改任何 SQL 函数,例如从 func 调用的函数 namespace 中,向 ORM 指示该函数生成 两个表达式。 下面的示例使用 Geoalchemy2 库:

from geoalchemy2 import Geometry
from sqlalchemy import Column, Integer, func
from sqlalchemy.orm import relationship, foreign


class Polygon(Base):
    __tablename__ = "polygon"
    id = mapped_column(Integer, primary_key=True)
    geom = mapped_column(Geometry("POLYGON", srid=4326))
    points = relationship(
        "Point",
        primaryjoin="func.ST_Contains(foreign(Polygon.geom), Point.geom).as_comparison(1, 2)",
        viewonly=True,
    )


class Point(Base):
    __tablename__ = "point"
    id = mapped_column(Integer, primary_key=True)
    geom = mapped_column(Geometry("POINT", srid=4326))


在上面,表示 FunctionElement.as_comparison() func 的ST_Contains() SQL 函数正在比较 Polygon.geomPoint.geom 表达式。foreign() 注解还指出了在此特定关系中哪列承担 “foreign key” 角色。


1.3 版本中的新功能: 已添加 FunctionElement.as_comparison() .


重叠的外键


使用复合外键时,可能会出现一种罕见的情况,例如单个列可能是通过外键约束引用的多个列的主题。


考虑一个(诚然很复杂)映射,例如 Magazine 对象,它由 Writer 对象和 Article 对象使用包括 magazine_id 的复合主键方案引用 两者都有;然后使 Article 也引用 WriterArticle.magazine_id 卷入了两种不同的关系; Article.magazineArticle.writer

class Magazine(Base):
    __tablename__ = "magazine"

    id = mapped_column(Integer, primary_key=True)


class Article(Base):
    __tablename__ = "article"

    article_id = mapped_column(Integer)
    magazine_id = mapped_column(ForeignKey("magazine.id"))
    writer_id = mapped_column()

    magazine = relationship("Magazine")
    writer = relationship("Writer")

    __table_args__ = (
        PrimaryKeyConstraint("article_id", "magazine_id"),
        ForeignKeyConstraint(
            ["writer_id", "magazine_id"], ["writer.id", "writer.magazine_id"]
        ),
    )


class Writer(Base):
    __tablename__ = "writer"

    id = mapped_column(Integer, primary_key=True)
    magazine_id = mapped_column(ForeignKey("magazine.id"), primary_key=True)
    magazine = relationship("Magazine")


配置上述映射后,我们将看到发出以下警告:

SAWarning: relationship 'Article.writer' will copy column
writer.magazine_id to column article.magazine_id,
which conflicts with relationship(s): 'Article.magazine'
(copies magazine.id to article.magazine_id). Consider applying
viewonly=True to read-only relationships, or provide a primaryjoin
condition marking writable columns with the foreign() annotation.


这指的是 Article.magazine_id 两个不同外键约束的主题;它指的是 Magazine.id 直接作为源列,但也引用 Writer.magazine_id作为组合键上下文中的源列分配给 Writer。如果我们将一篇文章与特定杂志关联,但随后将该文章Writer 的 Writer Article.magazine_id 更改我们引用的杂志;它可能 如果我们取消关联 Writer from an Article.该警告让我们知道情况确实如此。


为了解决这个问题,我们需要分解 Article 的行为,以包括以下所有三个功能:


  1. 文章首先写给 Article.magazine_id基于 Article.magazine 中保留的数据 relationship,即从 Magazine.id 复制的值。


  2. Article 可以代表 Article.writer 关系中保留的数据写入 Article.writer_id,但只能写入 Writer.id 列;不应将 Writer.magazine_id 列写入 Article.magazine_id,因为它最终来源于 Magazine.id


  3. 文章在加载时会考虑 Article.magazine_id Article.writer 的 Article.writer 中写入的 Article.writer 中写入的 Article.writer 中写入的 Article.writer 的 Story.writer 中写入的 Article.writer 的 Story.writer


要只得到 #1 和 #2,我们可以只指定 Article.writer_id 作为 Article.writer 的“外键”:

class Article(Base):
    # ...

    writer = relationship("Writer", foreign_keys="Article.writer_id")


但是,这具有 Article.writer 不采用 Article.magazine_id Writer 进行查询时要考虑的事项:

SELECT article.article_id AS article_article_id,
    article.magazine_id AS article_magazine_id,
    article.writer_id AS article_writer_id
FROM article
JOIN writer ON writer.id = article.writer_id


因此,为了获得 #1、#2 和 #3 的所有结果,我们表示连接条件 以及通过组合来写入哪些列 relationship.primaryjoin 以及 relationship.foreign_keys 个参数,或者更简洁地使用 foreign() 进行注释:

class Article(Base):
    # ...

    writer = relationship(
        "Writer",
        primaryjoin="and_(Writer.id == foreign(Article.writer_id), "
        "Writer.magazine_id == Article.magazine_id)",
    )


非关系比较 / 物化路径


警告


本节详细介绍了一项实验性功能。


使用自定义表达式意味着我们可以生成不遵循通常的主键/外键模型的非正统连接条件。一个这样的例子是物化路径模式,我们比较字符串中的重叠路径标记,以生成树结构。


通过谨慎使用 foreign()remote(),我们可以构建一种关系,有效地产生一个基本的物化路径系统。本质上,当 foreign()remote() 位于比较表达式的同一侧时,这种关系被认为是 “一对多”的;当他们站在不同的一方时,这种关系被认为是 “多对一”。为了在这里进行比较,我们将处理集合,因此我们将将事情配置为 “一对多”:

class Element(Base):
    __tablename__ = "element"

    path = mapped_column(String, primary_key=True)

    descendants = relationship(
        "Element",
        primaryjoin=remote(foreign(path)).like(path.concat("/%")),
        viewonly=True,
        order_by=path,
    )


在上面,如果给定一个 path 属性为 “/foo/bar2”Element 对象,我们会寻找 Element.descendants 的加载,如下所示:

SELECT element.path AS element_path
FROM element
WHERE element.path LIKE ('/foo/bar2' || '/%') ORDER BY element.path


自引用多对多关系


另请参阅


本节介绍了“邻接列表”模式的双表变体,该模式记录在邻接列表关系中。 请务必查看 小节中的自引用查询模式 自引用查询策略配置自引用 Eager 加载 这同样适用于此处讨论的映射模式。


多对多关系可以由 relationship.primaryjoin 中的一个或两个进行自定义 和 relationship.secondaryjoin - 后者对于使用 relationship.secondary 参数指定多对多引用的关系非常重要。一种常见的情况,涉及使用 relationship.primaryjoinrelationship.secondaryjoin 是建立从 class 到自身的多对多关系时,如下所示:

from typing import List

from sqlalchemy import Integer, ForeignKey, Column, Table
from sqlalchemy.orm import DeclarativeBase, Mapped
from sqlalchemy.orm import mapped_column, relationship


class Base(DeclarativeBase):
    pass


node_to_node = Table(
    "node_to_node",
    Base.metadata,
    Column("left_node_id", Integer, ForeignKey("node.id"), primary_key=True),
    Column("right_node_id", Integer, ForeignKey("node.id"), primary_key=True),
)


class Node(Base):
    __tablename__ = "node"
    id: Mapped[int] = mapped_column(primary_key=True)
    label: Mapped[str]
    right_nodes: Mapped[List["Node"]] = relationship(
        "Node",
        secondary=node_to_node,
        primaryjoin=id == node_to_node.c.left_node_id,
        secondaryjoin=id == node_to_node.c.right_node_id,
        back_populates="left_nodes",
    )
    left_nodes: Mapped[List["Node"]] = relationship(
        "Node",
        secondary=node_to_node,
        primaryjoin=id == node_to_node.c.right_node_id,
        secondaryjoin=id == node_to_node.c.left_node_id,
        back_populates="right_nodes",
    )


在上面,SQLAlchemy 无法自动知道哪些列应该连接到 right_nodesleft_nodes 关系的哪些列。relationship.primaryjoin 和 relationship.SecondaryJoin 参数确定我们希望如何联接到 association 表。在上面的声明式形式中,由于我们在对应于 Node 类的 Python 块中声明这些条件,因此 id 变量可以直接作为我们希望联接的 Column 对象使用。


或者,我们可以定义 relationship.primaryjoinrelationship.secondaryjoin 参数,这适用于我们的配置尚没有可用的 Node.id 列对象,或者 node_to_node 表可能尚不可用的情况。当在声明性字符串中引用普通 Table 对象时,我们使用 MetaData 中存在的表的字符串名称:

class Node(Base):
    __tablename__ = "node"
    id = mapped_column(Integer, primary_key=True)
    label = mapped_column(String)
    right_nodes = relationship(
        "Node",
        secondary="node_to_node",
        primaryjoin="Node.id==node_to_node.c.left_node_id",
        secondaryjoin="Node.id==node_to_node.c.right_node_id",
        backref="left_nodes",
    )


警告


当作为 Python 可评估字符串传递时, relationship.primaryjoinrelationship.secondaryjoin 参数使用 Python 的 eval() 函数进行解释。不要将不受信任的输入传递给这些 字符串。有关 relationship() 参数的声明性评估的详细信息,请参阅关系参数的评估


这里的经典映射情况类似,其中 node_to_node 可以联接到 node.c.id

from sqlalchemy import Integer, ForeignKey, String, Column, Table, MetaData
from sqlalchemy.orm import relationship, registry

metadata_obj = MetaData()
mapper_registry = registry()

node_to_node = Table(
    "node_to_node",
    metadata_obj,
    Column("left_node_id", Integer, ForeignKey("node.id"), primary_key=True),
    Column("right_node_id", Integer, ForeignKey("node.id"), primary_key=True),
)

node = Table(
    "node",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("label", String),
)


class Node:
    pass


mapper_registry.map_imperatively(
    Node,
    node,
    properties={
        "right_nodes": relationship(
            Node,
            secondary=node_to_node,
            primaryjoin=node.c.id == node_to_node.c.left_node_id,
            secondaryjoin=node.c.id == node_to_node.c.right_node_id,
            backref="left_nodes",
        )
    },
)


请注意,在这两个示例中,relationship.backref keyword 指定 backref left_nodes - 当 relationship() 以相反的方式创建第二个关系 方向,它足够聪明,可以反转 relationship.primaryjoinrelationship.secondaryjoin 参数。


另请参阅


复合 “Secondary” 连接


注意


本节介绍了 SQLAlchemy 在一定程度上支持的远端情况,但是建议尽可能使用合理的关系布局和/或 Python 内属性以更简单的方式解决此类问题。


有时,当一个人试图在两个表之间建立 relationship() 时,需要涉及的不仅仅是两个或三个表才能将它们连接起来。这是 relationship() 的一个领域,人们寻求突破可能的界限,通常许多这些奇特用例的最终解决方案需要在 SQLAlchemy 邮件列表中敲定。


在较新版本的 SQLAlchemy 中,relationship.secondary parameter 可用于提供复合 目标。 下面是这样一个 join 条件(至少需要 0.9.2 版本才能按原样运行):

class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)
    b_id = mapped_column(ForeignKey("b.id"))

    d = relationship(
        "D",
        secondary="join(B, D, B.d_id == D.id).join(C, C.d_id == D.id)",
        primaryjoin="and_(A.b_id == B.id, A.id == C.a_id)",
        secondaryjoin="D.id == B.d_id",
        uselist=False,
        viewonly=True,
    )


class B(Base):
    __tablename__ = "b"

    id = mapped_column(Integer, primary_key=True)
    d_id = mapped_column(ForeignKey("d.id"))


class C(Base):
    __tablename__ = "c"

    id = mapped_column(Integer, primary_key=True)
    a_id = mapped_column(ForeignKey("a.id"))
    d_id = mapped_column(ForeignKey("d.id"))


class D(Base):
    __tablename__ = "d"

    id = mapped_column(Integer, primary_key=True)


在上面的例子中,我们提供了 relationship.secondary 的所有三个 relationship.primaryjoinrelationship.secondaryjoin,以声明式样式引用命名表 ABCD 径直。 从 AD 的查询如下所示:

sess.scalars(select(A).join(A.d)).all()

SELECT a.id AS a_id, a.b_id AS a_b_id FROM a JOIN ( b AS b_1 JOIN d AS d_1 ON b_1.d_id = d_1.id JOIN c AS c_1 ON c_1.d_id = d_1.id) ON a.b_id = b_1.id AND a.id = c_1.a_id JOIN d ON d.id = b_1.d_id


在上面的例子中,我们利用了能够将多个表塞入一个 “secondary” 容器的优势,这样我们就可以跨多个表进行连接,同时仍然保持 relationship() 的 “简单” 状态,因为 “left” 和 “right” 两侧都只有 “one” table;复杂性保持在中间。


警告


上述关系通常标记为 viewonly=True,并且应该被视为只读。虽然有时有一些方法可以使上述关系可写,但这通常很复杂且容易出错。


与 Aliased Class 的关系


在上一节中,我们说明了一种使用 relationship.secondary 中,以便将其他表置于 Join 条件中。在一种复杂的连接情况下,即使这种技术也是不够的。当我们寻求从 A 加入时 到 B,在两者之间使用任意数量的 CD 等,但是 AB 之间也直接存在连接条件。在这种情况下,从 AB 的连接可能是 仅用复杂难以表达 relationship.primaryjoin 条件,因为中间表可能需要特殊处理,并且也不能用 relationship.secondary 对象来表示,因为 A->secondary->B 模式不支持 AB 直接。当出现这种极其高级的情况时,我们可以求助于创建第二个映射作为关系的目标。这就是我们使用 AliasedClass 的地方,以便映射到一个包含此联接所需的所有附加表的类。为了生成此映射器作为类的“替代”映射,我们使用 aliased() 函数生成新的构造,然后对对象使用 relationship(),就像它是一个普通映射类一样。


下面说明了一个 relationship() 和一个从 AB,但是 primaryjoin 条件增加了两个额外的实体 CD,这两个实体也必须同时具有与 AB 中的行对齐的行:

class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)
    b_id = mapped_column(ForeignKey("b.id"))


class B(Base):
    __tablename__ = "b"

    id = mapped_column(Integer, primary_key=True)


class C(Base):
    __tablename__ = "c"

    id = mapped_column(Integer, primary_key=True)
    a_id = mapped_column(ForeignKey("a.id"))

    some_c_value = mapped_column(String)


class D(Base):
    __tablename__ = "d"

    id = mapped_column(Integer, primary_key=True)
    c_id = mapped_column(ForeignKey("c.id"))
    b_id = mapped_column(ForeignKey("b.id"))

    some_d_value = mapped_column(String)


# 1. set up the join() as a variable, so we can refer
# to it in the mapping multiple times.
j = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)

# 2. Create an AliasedClass to B
B_viacd = aliased(B, j, flat=True)

A.b = relationship(B_viacd, primaryjoin=A.b_id == j.c.b_id)


使用上面的映射,一个简单的连接如下所示:

sess.scalars(select(A).join(A.b)).all()

SELECT a.id AS a_id, a.b_id AS a_b_id FROM a JOIN (b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) ON a.b_id = b.id


将 AliasedClass 映射与类型化集成并避免早期 Mapper 配置


针对映射类创建 aliased() 构造会强制 configure_mappers() 步骤继续进行,这将解析所有当前类及其关系。如果尚未声明当前映射所需的 unrelated 映射类,或者关系本身的配置需要访问尚未声明的类,则可能会出现问题。此外,当预先声明关系时,SQLAlchemy 的 Declarative 模式与 Python 类型一起工作最有效。


为了组织关系的构造以处理这些问题,像 MapperEvents.before_mapper_configured() ,它只会在所有 mappings 已准备好进行配置:

from sqlalchemy import event


class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)
    b_id = mapped_column(ForeignKey("b.id"))


@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
    # do the above configuration in a configuration hook

    j = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
    B_viacd = aliased(B, j, flat=True)
    A.b = relationship(B_viacd, primaryjoin=A.b_id == j.c.b_id)


在上面,函数 _configure_ab_relationship() 仅在请求 A 的完全配置版本时调用,此时类 BDC 将可用。


对于与内联类型集成的方法,可以使用类似的技术来有效地为别名类生成 “singleton” 创建模式,其中它被后期初始化为全局变量,然后可以在内联关系中使用:

from typing import Any

B_viacd: Any = None
b_viacd_join: Any = None


class A(Base):
    __tablename__ = "a"

    id: Mapped[int] = mapped_column(primary_key=True)
    b_id: Mapped[int] = mapped_column(ForeignKey("b.id"))

    # 1. the relationship can be declared using lambdas, allowing it to resolve
    #    to targets that are late-configured
    b: Mapped[B] = relationship(
        lambda: B_viacd, primaryjoin=lambda: A.b_id == b_viacd_join.c.b_id
    )


# 2. configure the targets of the relationship using a before_mapper_configured
#    hook.
@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
    # 3. set up the join() and AliasedClass as globals from within
    #    the configuration hook.

    global B_viacd, b_viacd_join

    b_viacd_join = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
    B_viacd = aliased(B, b_viacd_join, flat=True)


在 Queries 中使用 AliasedClass 目标


在前面的示例中,A.b 关系是指 B_viacd 实体作为目标,而不是直接使用 B 类。要添加涉及 A.b 关系的其他条件,通常需要直接引用 B_viacd 而不是使用 B,尤其是在 A.b 的目标实体要转换为别名或子查询的情况下。下面说明了使用子查询而不是联接的相同关系:

subq = select(B).join(D, D.b_id == B.id).join(C, C.id == D.c_id).subquery()

B_viacd_subquery = aliased(B, subq)

A.b = relationship(B_viacd_subquery, primaryjoin=A.b_id == subq.c.id)


使用上述 A.b 关系的查询将呈现一个子查询:

sess.scalars(select(A).join(A.b)).all()

SELECT a.id AS a_id, a.b_id AS a_b_id FROM a JOIN (SELECT b.id AS id, b.some_b_column AS some_b_column FROM b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) AS anon_1 ON a.b_id = anon_1.id


如果我们想基于 A.b 连接添加额外的条件,我们必须根据 B_viacd_subquery 而不是直接 B 来做:

sess.scalars(
    select(A)
    .join(A.b)
    .where(B_viacd_subquery.some_b_column == "some b")
    .order_by(B_viacd_subquery.id)
).all()

SELECT a.id AS a_id, a.b_id AS a_b_id FROM a JOIN (SELECT b.id AS id, b.some_b_column AS some_b_column FROM b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) AS anon_1 ON a.b_id = anon_1.id WHERE anon_1.some_b_column = ? ORDER BY anon_1.id


与窗口函数的行限制关系


AliasedClass 的关系的另一个有趣用例 对象是 该关系需要加入任何形式的专用 SELECT 。 一 scenario 是当需要使用窗口函数时,例如限制 应为关系返回多少行。 下面的示例 说明了将加载第一个 每个产品系列 10 件商品:

class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)


class B(Base):
    __tablename__ = "b"
    id = mapped_column(Integer, primary_key=True)
    a_id = mapped_column(ForeignKey("a.id"))


partition = select(
    B, func.row_number().over(order_by=B.id, partition_by=B.a_id).label("index")
).alias()

partitioned_b = aliased(B, partition)

A.partitioned_bs = relationship(
    partitioned_b, primaryjoin=and_(partitioned_b.a_id == A.id, partition.c.index < 10)
)


我们可以将上述partitioned_bs关系用于大多数 loader 策略,例如 selectinload():

for a1 in session.scalars(select(A).options(selectinload(A.partitioned_bs))):
    print(a1.partitioned_bs)  # <-- will be no more than ten objects


在上面,“selectinload”查询如下所示:

SELECT
    a_1.id AS a_1_id, anon_1.id AS anon_1_id, anon_1.a_id AS anon_1_a_id,
    anon_1.data AS anon_1_data, anon_1.index AS anon_1_index
FROM a AS a_1
JOIN (
    SELECT b.id AS id, b.a_id AS a_id, b.data AS data,
    row_number() OVER (PARTITION BY b.a_id ORDER BY b.id) AS index
    FROM b) AS anon_1
ON anon_1.a_id = a_1.id AND anon_1.index < %(index_1)s
WHERE a_1.id IN ( ... primary key collection ...)
ORDER BY a_1.id


在上面,对于 “a” 中的每个匹配主键,我们将得到按 “b.id” 排序的前 10 个 “bs”。通过在 “a_id” 上进行分区,我们确保每个 “row number” 都是父 “a_id” 的本地。


这样的映射通常还包括从 “A” 到 “B” 的 “普通” 关系,用于持久性作以及需要每个 “A” 的完整 “B” 对象集时。


构建启用查询的属性


非常雄心勃勃的自定义联接条件可能无法直接持久化,在某些情况下甚至可能无法正确加载。要删除等式的持久性部分,请在 relationship(),这会将其建立为只读属性(写入集合的数据将在 flush() 上被忽略)。但是,在极端情况下,请考虑将常规 Python 属性与 Query 结合使用,如下所示:

class User(Base):
    __tablename__ = "user"
    id = mapped_column(Integer, primary_key=True)

    @property
    def addresses(self):
        return object_session(self).query(Address).with_parent(self).filter(...).all()


在其他情况下,可以构建 Descriptors 以利用现有的 Python 内数据。有关特殊 Python 属性的更多一般讨论,请参阅使用 Descriptors 和 Hybrids 部分。


使用 viewonly 关系参数的注意事项


relationship.viewonly 参数在应用于 relationship() 结构表示此 relationship() 将不参与任何 ORM 工作单元作,此外,该属性不希望参与其表示的集合的 Python 内更改。这意味着,虽然 viewonly 关系可能引用可变的 Python 集合(如列表或集),但对该列表或集进行更改将不会影响 ORM 刷新过程。


要探索此方案,请考虑以下映射:

from __future__ import annotations

import datetime

from sqlalchemy import and_
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "user_account"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str | None]

    all_tasks: Mapped[list[Task]] = relationship()

    current_week_tasks: Mapped[list[Task]] = relationship(
        primaryjoin=lambda: and_(
            User.id == Task.user_account_id,
            # this expression works on PostgreSQL but may not be supported
            # by other database engines
            Task.task_date >= func.now() - datetime.timedelta(days=7),
        ),
        viewonly=True,
    )


class Task(Base):
    __tablename__ = "task"

    id: Mapped[int] = mapped_column(primary_key=True)
    user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
    description: Mapped[str | None]
    task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())

    user: Mapped[User] = relationship(back_populates="current_week_tasks")


以下部分将说明此配置的不同方面。


包含 backrefs 的 Python 内部更改不适用于 viewonly=True


上述映射将 User.current_week_tasks viewonly 关系作为 Task.user 属性的 backref 目标。SQLAlchemy 的 ORM 配置过程目前没有标记这,但是这是一个配置错误。更改 Task.user 属性不会影响 .current_week_tasks 属性:

>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[]


还有另一个参数称为 relationship.sync_backrefs 在这种情况下,可以在此处打开以允许更改 .current_week_tasks,但是这不被认为是具有 viewonly 关系的最佳实践,相反,不应依赖 Python 内部的更改。


在这个映射中,可以在 User.all_tasksTask.user,因为它们都不是 viewonly 的,并且会正常同步。


除了为 viewonly 关系禁用 backref 更改的问题之外,在将更改刷新到数据库之前,Python 中对 User.all_tasks 集合的普通更改也不会反映在 User.current_week_tasks 集合中。


总体而言,对于自定义集合应立即响应 Python 内部更改的用例,viewonly 关系通常不合适。更好的方法是使用 SQLAlchemy 的 Hybrid Attributes 功能,或者对于仅实例的情况,使用 Python @property,其中用户定义的 集合可以是 实现。 为了将示例更改为以这种方式工作,我们修复了 relationship.back_populates Task.user 上的参数来引用User.all_tasks,然后演示一个简单的@property,该将根据 immediate User.all_tasks 集合提供结果:

class User(Base):
    __tablename__ = "user_account"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str | None]

    all_tasks: Mapped[list[Task]] = relationship(back_populates="user")

    @property
    def current_week_tasks(self) -> list[Task]:
        past_seven_days = datetime.datetime.now() - datetime.timedelta(days=7)
        return [t for t in self.all_tasks if t.task_date >= past_seven_days]


class Task(Base):
    __tablename__ = "task"

    id: Mapped[int] = mapped_column(primary_key=True)
    user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
    description: Mapped[str | None]
    task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())

    user: Mapped[User] = relationship(back_populates="all_tasks")


使用每次动态计算的 Python 内部集合,我们可以保证始终获得正确答案,根本不需要使用数据库:

>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[<__main__.Task object at 0x7f3d699523c0>]


viewonly=True 集合 / 属性在过期之前不会被重新查询


继续原始的 viewonly 属性,如果我们确实对持久化对象上的 User.all_tasks 集合进行了更改,则 viewonly 集合只能在两次后显示此更改的最终结果 事情发生了。 首先,对 User.all_tasks 的更改是 flushed,以便新数据在数据库中可用,至少在本地事务的范围内。第二个是 User.current_week_tasks 属性过期并通过对数据库的新 SQL 查询重新加载。


为了支持此要求,最简单的流程是 viewonly 关系仅在主要读取的作中使用 只是开始。如下所示,如果我们从数据库中检索 User fresh,则集合将是 current:

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b906b0>]


当我们对 u1.all_tasks 进行修改时,如果我们想看到这些更改反映在 u1.current_week_tasks viewonly 关系中,则需要刷新这些更改,并且 u1.current_week_tasks 属性需要过期,以便下次访问时延迟加载。最简单的方法是使用 Session.commit(),保持 Session.expire_on_commit 参数设置为默认值 True

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.commit()
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b90ec0>, <__main__.Task object at 0x7f8711b90a10>]


在上面,对 Session.commit() 的调用将更改刷新为 u1.all_tasks 到数据库,然后使所有对象过期,因此当我们访问u1.current_week_tasks时,会发生一个 :term:' 延迟加载',它从数据库中新鲜地获取了这个属性的内容。


要在不实际提交事务的情况下拦截作,该属性需要显式过期 第一。 一种简单的方法是直接调用它。 在 在下面的例子中,Session.flush() 将待处理的更改发送到数据库,然后使用 Session.expire() 使 u1.current_week_tasks过期 集合,以便在下次访问时重新获取:

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.flush()
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]


实际上,我们可以跳过对 Session.flush() 的调用,假设有一个 将 Session.autoflush 保持为默认值 True会话,因为过期的 current_week_tasks 属性在过期后访问时将触发 autoflush:

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)  # triggers autoflush before querying
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]


继续上述方法进行更复杂的作,当相关的 User.all_tasks 集合发生变化时,我们可以使用事件钩子以编程方式应用过期。这是一个高级的 技术,其中应首先检查更简单的架构,如 @property 或坚持只读用例。在我们的简单示例中,这将配置为:

from sqlalchemy import event, inspect


@event.listens_for(User.all_tasks, "append")
@event.listens_for(User.all_tasks, "remove")
@event.listens_for(User.all_tasks, "bulk_replace")
def _expire_User_current_week_tasks(target, value, initiator):
    inspect(target).session.expire(target, ["current_week_tasks"])


使用上述钩子,更改作会被拦截,并导致 User.current_week_tasks 集合自动过期:

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f66d093ccb0>, <__main__.Task object at 0x7f66d093cce0>]


上面使用的 AttributeEvents 事件钩子也是由 backref 突变触发的,因此使用上面的钩子,对 Task.user 的更改也会被拦截:

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     t1 = Task(task_date=datetime.datetime.now())
...     t1.user = u1
...     sess.add(t1)
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f3b0c070d10>, <__main__.Task object at 0x7f3b0c057d10>]