连接 / 引擎


如何配置日志记录?


请参阅配置日志记录


如何实现池化数据库连接?我的连接是否共用?


SQLAlchemy 在大多数情况下自动执行应用程序级连接池。对于所有包含的方言(使用“内存”数据库时,SQLite 除外),Engine 对象引用 QueuePool 作为连接源。


有关更多详细信息,请参阅引擎配置连接池


如何将自定义 connect 参数传递给我的数据库 API?


create_engine() 调用直接通过 connect_args 关键字参数接受其他参数:

e = create_engine(
    "mysql+mysqldb://scott:tiger@localhost/test", connect_args={"encoding": "utf8"}
)


或者对于基本的字符串和整数参数,它们通常可以在 URL 的查询字符串中指定:

e = create_engine("mysql+mysqldb://scott:tiger@localhost/test?encoding=utf8")


“MySQL Server 已消失”


此错误的主要原因是 MySQL 连接已超时 ,并且已被服务器关闭。 MySQL 服务器关闭连接 这些 API 的空闲时间默认为 8 小时。 为了适应这种情况,直接设置是启用 create_engine.pool_recycle 设置,这将确保早于设定秒数的连接在下次签出时被丢弃并替换为新连接。


对于由于网络问题而导致数据库重启和其他临时连接丢失的更一般情况,池中的连接可能会被回收,以响应更通用的断开连接检测技术。处理断开连接部分提供了 “悲观” (例如预 ping) 和 “乐观” (例如正常恢复) 技术的背景。现代 SQLAlchemy 倾向于 “悲观” 方法。


另请参阅


处理断开连接


“命令不同步;你现在无法运行此命令“/”此结果对象不返回行。它已自动关闭“


MySQL 驱动程序具有相当广泛的故障模式,因此与服务器的连接状态处于无效状态。通常,当再次使用连接时,将出现以下两条错误消息之一。原因是服务器的状态已更改为 Client 端库不期望的状态,因此当 Client 端库在连接上发出新语句时,服务器不会按预期响应。


在 SQLAlchemy 中,由于数据库连接是池化的,因此连接上的消息传递不同步的问题变得更加重要,因为当作失败时,如果连接本身处于不可用状态,如果它返回到连接池中,它将在再次签出时出现故障。此问题的缓解措施是,当出现此类故障模式时,连接将失效,以便丢弃与 MySQL 的底层数据库连接。对于许多已知的失败模式,这种失效会自动发生,也可以通过 Connection.invalidate() 方法显式调用。


在此类别中还有第二类故障模式,其中上下文管理器(例如使用 session.begin_nested():希望在发生错误时“回滚”事务;但是,在连接的某些故障模式下,回滚本身(也可以是 RELEASE SAVEPOINT作)也会失败,从而导致误导性的堆栈跟踪。


最初,此错误的原因曾经相当简单,这意味着多线程程序从多个线程在单个连接上调用命令。这适用于最初的 “MySQLdb” native-C 驱动程序,它几乎是唯一使用的驱动程序。然而,随着 PyMySQL 和 MySQL-connector-Python 等纯 Python 驱动程序的引入,以及 gevent/eventlet、multiprocessing(通常与 Celery 一起)等工具的使用增加,已知有一系列因素会导致此问题,其中一些因素在 SQLAlchemy 版本中得到了改进,但另一些因素是不可避免的:


  • 在线程之间共享连接 - 这是发生此类错误的最初原因。一个程序同时在两个或多个线程中使用相同的连接,这意味着多组消息在连接上混淆,使服务器端会话处于客户端不再知道如何解释的状态。然而,今天通常更可能出现其他原因。


  • 在进程之间共享连接的 filehandle - 当程序使用 os.fork() 生成新进程,并且父进程中存在的 TCP 连接被共享到一个或多个子进程时,通常会发生这种情况。由于多个进程现在向本质上相同的 filehandle 发送消息,因此服务器接收交错消息并中断连接的状态。


    如果程序使用 Python 的 “multiprocessing” 模块并使用在父进程中创建的 Engine,则这种情况很容易发生。在使用 Celery 等工具时,使用 “multiprocessing” 是很常见的。正确的方法应该是一个新的 Engine 在子进程首次启动时生成,并丢弃任何 Engine 它来自父进程;或者,从父进程继承的 Engine 可以通过调用 Engine.dispose() 来释放其内部连接池。


  • 带出口的 Greenlet Monkeypatching - 当使用像 gevent 或 eventlet 这样的库来对 Python 网络 API 进行猴子修补时,PyMySQL 等库现在以异步作模式工作,即使它们不是针对此模型明确开发的。一个常见的问题是 greenthread 被中断,通常是由于应用程序中的超时逻辑。这导致了 GreenletExit 异常,并且纯 Python MySQL 驱动程序从 它的工作,可能是它正在接收来自服务器的响应 或准备以其他方式重置连接的状态。 当异常 缩短了所有工作,客户端和服务器之间的对话现在是 不同步,并且后续使用连接可能会失败。 SQLAlchemy 从 1.1.0 版本开始,知道如何防止这种情况,就像数据库作一样 被所谓的 “exit exception” 打断,其中包括 GreenletExit 以及不是 Exception 的子类的 Python BaseException 的任何其他子类,则连接无效。


  • 回滚/SAVEPOINT 发布失败 - 某些错误类会导致连接在事务上下文中以及在 “SAVEPOINT” 块中作时不可用。在这些情况下,连接失败已将任何 SAVEPOINT 渲染为不再存在,但是当 SQLAlchemy 或应用程序尝试“回滚”此保存点时,“RELEASE SAVEPOINT”作失败,通常显示“savepoint does not exist”之类的消息。在这种情况下,在 Python 3 下将有一连串异常输出,其中也会显示错误的最终 “原因” 。在 Python 2 下,没有“链式”异常,但是最新版本的 SQLAlchemy 将尝试发出警告,说明原始失败原因,同时仍然抛出直接错误,即 ROLLBACK 的失败。


如何自动 “重试” 语句执行?


文档部分 Dealing with Disconnects 讨论了可用于自上次签出特定连接以来已断开连接的池连接的策略。在这方面,最现代的功能是 create_engine.pre_ping 参数,它允许在从池中检索数据库连接时在数据库连接上发出“ping”,如果当前连接已断开连接,则重新连接。


请务必注意,此 “ping” 仅在连接实际用于作之前发出。一旦连接被交付给调用者,根据 Python DBAPI 规范,它现在需要执行 autostart作,这意味着它将在首次使用时自动 BEGIN 新事务,该事务对后续语句仍然有效,直到 DBAPI 级别的 connection.commit()connection.rollback() 方法。


在 SQLAlchemy 的现代使用中,始终会调用一系列 SQL 语句 在此事务状态下,假设 未启用 DBAPI 自动提交模式(下一节将详细介绍),这意味着不会自动提交单个语句;如果作失败,则当前事务中所有语句的效果都将丢失。


这对于“重试”语句的概念的含义是,默认情况下,当连接丢失时,整个事务都是 迷失了。数据库没有有用的方法可以“重新连接并重试”并从中断的地方继续,因为数据已经丢失。因此,SQLAlchemy 没有透明的“重新连接”功能,该功能可以在事务中工作,适用于数据库连接在使用时断开连接的情况。处理作中断开连接的规范方法是从 transaction,通常是通过使用自定义 Python 装饰器,该装饰器将多次“重试”特定函数,直到它成功,或者以其他方式构建应用程序,使其能够灵活地处理被丢弃的事务,从而导致作失败。


还有扩展的概念可以跟踪所有 在事务中继续进行的语句,然后在 一个 new transaction 来近似 “retry”作。 SQLAlchemy 的 事件系统确实允许这样的系统是 构造的,但是这种方法通常也不有用,因为 无法保证那些 DML 语句将针对相同的状态工作,因为一旦事务结束,新事务中的数据库状态可能会完全不同。在事务作开始和提交时将 “retry” 显式地构建到应用程序中仍然是更好的方法,因为应用程序级事务方法最知道如何重新运行其步骤。


否则,如果 SQLAlchemy 要提供一种功能,可以在事务中透明地、静默地“重新连接”连接,那么效果将是数据静默丢失。通过试图隐藏问题,SQLAlchemy 会使情况变得更糟。


但是,如果我们不使用事务,则有更多选项可用,如下一节所述。


使用 DBAPI autocommit 允许 Transparent Reconnect 的只读版本


在说明了没有透明重新连接机制的基本原理后,上一节基于应用程序实际上正在使用 DBAPI 级事务的假设。由于大多数 DBAPI 现在都提供本机 “autocommit” 设置,我们可以利用这些功能来提供一种有限形式的透明 reconnect for read only, autocommit only作。透明语句重试可以应用于 DBAPI 的 cursor.execute() 方法,但应用于 DBAPI 的 cursor.executemany() 方法仍然不安全,因为该语句可能已经使用了给定参数的任何部分。


警告


以下配方应用于写入数据的作。在生产使用此配方之前,用户应仔细阅读并了解配方的工作原理,并针对专门针对的 DBAPI 驱动程序非常仔细地测试故障模式。重试机制并不能保证在所有情况下都防止断开连接错误。


可以将简单的重试机制应用于 DBAPI 级别 cursor.execute() 方法通过使用 DialectEvents.do_execute() DialectEvents.do_execute_no_params() 钩子,它将能够在语句执行期间拦截断开连接。它不会 在结果集获取作期间拦截连接失败,对于那些 未完全缓冲结果集的 DBAPI。 配方要求 数据库支持 DBAPI 级别的自动提交,并且不保证用于特定后端。提供了一个函数 reconnecting_engine(),它将事件钩子应用于给定的 Engine 对象,返回启用 DBAPI 级自动提交的始终自动提交版本。对于单参数和无参数语句执行,连接将透明地重新连接:

import time

from sqlalchemy import event


def reconnecting_engine(engine, num_retries, retry_interval):
    def _run_with_retries(fn, context, cursor_obj, statement, *arg, **kw):
        for retry in range(num_retries + 1):
            try:
                fn(cursor_obj, statement, context=context, *arg)
            except engine.dialect.dbapi.Error as raw_dbapi_err:
                connection = context.root_connection
                if engine.dialect.is_disconnect(raw_dbapi_err, connection, cursor_obj):
                    if retry > num_retries:
                        raise
                    engine.logger.error(
                        "disconnection error, retrying operation",
                        exc_info=True,
                    )
                    connection.invalidate()

                    # use SQLAlchemy 2.0 API if available
                    if hasattr(connection, "rollback"):
                        connection.rollback()
                    else:
                        trans = connection.get_transaction()
                        if trans:
                            trans.rollback()

                    time.sleep(retry_interval)
                    context.cursor = cursor_obj = connection.connection.cursor()
                else:
                    raise
            else:
                return True

    e = engine.execution_options(isolation_level="AUTOCOMMIT")

    @event.listens_for(e, "do_execute_no_params")
    def do_execute_no_params(cursor_obj, statement, context):
        return _run_with_retries(
            context.dialect.do_execute_no_params, context, cursor_obj, statement
        )

    @event.listens_for(e, "do_execute")
    def do_execute(cursor_obj, statement, parameters, context):
        return _run_with_retries(
            context.dialect.do_execute, context, cursor_obj, statement, parameters
        )

    return e


鉴于上述方法,可以证明在事务中重新连接 使用以下概念验证脚本。 运行后,它将发出一个 SELECT 1 语句添加到数据库:

from sqlalchemy import create_engine
from sqlalchemy import select

if __name__ == "__main__":
    engine = create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True)

    def do_a_thing(engine):
        with engine.begin() as conn:
            while True:
                print("ping: %s" % conn.execute(select([1])).scalar())
                time.sleep(5)

    e = reconnecting_engine(
        create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True),
        num_retries=5,
        retry_interval=2,
    )

    do_a_thing(e)


在脚本运行时重新启动数据库,以演示透明 reconnect作:

$ python reconnect_test.py
ping: 1
ping: 1
disconnection error, retrying operation
Traceback (most recent call last):
  ...
MySQLdb._exceptions.OperationalError: (2006, 'MySQL server has gone away')
2020-10-19 16:16:22,624 INFO sqlalchemy.pool.impl.QueuePool Invalidate connection <_mysql.connection open to 'localhost' at 0xf59240>
ping: 1
ping: 1
...


上述配方针对 SQLAlchemy 1.4 进行了测试。


为什么 SQLAlchemy 会发出这么多 ROLLBACKs?


SQLAlchemy 当前假定 DBAPI 连接处于“非自动提交”模式——这是 Python 数据库 API 的默认行为,这意味着必须假定事务始终在进行中。当返回连接时,连接池会发出 connection.rollback() 。 这样,连接上剩余的任何事务资源都是 释放。在 PostgreSQL 或 MSSQL 等表资源所在的数据库上 主动锁定,这很关键,这样行和表就不会保留 锁定在不再使用的连接中。应用程序可以 否则挂起。然而,它不仅适用于锁,而且对 任何具有任何类型的事务隔离的数据库,包括 MySQL InnoDB 的任何仍在旧事务中的连接都将返回 过时数据,如果已在 隔离。有关为什么即使在 MySQL 上也可能看到过时数据的背景信息,请参阅 https://dev.mysql.com/doc/refman/5.1/en/innodb-transaction-model.html


我在使用 MyISAM - 如何关闭它?


连接池的连接返回行为的行为可以使用 reset_on_return 进行配置:

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    "mysql+mysqldb://scott:tiger@localhost/myisam_database",
    pool=QueuePool(reset_on_return=False),
)


我在 SQL Server 上 - 如何将这些 ROLLBACK 转换为 COMMIT?


reset_on_return 除了接受 TrueFalseNone 之外,还接受值 commitrollback。设置为 commit 将导致 COMMIT,因为任何连接都返回到池中:

engine = create_engine(
    "mssql+pyodbc://scott:tiger@mydsn", pool=QueuePool(reset_on_return="commit")
)


我正在使用 SQLite 数据库的多个连接(通常用于测试事务作),但我的测试程序不工作!


如果使用 SQLite :memory: 数据库,则默认连接池是 SingletonThreadPool,它为每个线程维护一个 SQLite 连接。因此,在同一个线程中使用的两个连接实际上是同一个 SQLite 连接。确保您没有使用 :memory: 数据库,以便引擎使用 QueuePool(当前 SQLAlchemy 版本中非内存数据库的默认值)。


另请参阅


线程/池化行为 - 有关 PySQLite 行为的信息。


使用 Engine 时如何获取原始 DBAPI 连接?


使用常规的 SA 引擎级 Connection,您可以通过 Connection,对于真正的 DBAPI 连接,您可以调用 PoolProxiedConnection.dbapi_connection 属性。在常规同步驱动程序上,通常不需要访问非池代理的 DBAPI 连接,因为所有方法都是通过以下方式代理的:

engine = create_engine(...)
conn = engine.connect()

# pep-249 style PoolProxiedConnection (historically called a "connection fairy")
connection_fairy = conn.connection

# typically to run statements one would get a cursor() from this
# object
cursor_obj = connection_fairy.cursor()
# ... work with cursor_obj

# to bypass "connection_fairy", such as to set attributes on the
# unproxied pep-249 DBAPI connection, use .dbapi_connection
raw_dbapi_connection = connection_fairy.dbapi_connection

# the same thing is available as .driver_connection (more on this
# in the next section)
also_raw_dbapi_connection = connection_fairy.driver_connection


在 1.4.24 版本发生变更: 添加了 PoolProxiedConnection.dbapi_connection 属性 取代了之前的 PoolProxiedConnection.connection 属性仍然可用;此属性始终提供 PEP-249 同步样式连接对象。这 PoolProxiedConnection.driver_connection attribute 将始终引用真实的 driver 级别 连接,无论它提供什么 API。


访问 asyncio 驱动程序的底层连接


当使用 asyncio 驱动程序时,上述方案有两处更改。第一个原因是,在使用 AsyncConnection 时,必须使用 awaitable 方法访问 PoolProxiedConnection AsyncConnection.get_raw_connection() 。在这种情况下,返回的 PoolProxiedConnection 保留了同步样式的 pep-249 使用模式,而 PoolProxiedConnection.dbapi_connection attribute 引用 一个 SQLAlchemy 适配的连接对象,它适配 asyncio 连接到 Sync 样式的 PEP-249 API,换句话说,在使用 asyncio 驱动程序时,有两个级别的代理。实际的 asyncio 连接可从 driver_connection 属性获得。用 asyncio 来重述前面的例子,如下所示:

async def main():
    engine = create_async_engine(...)
    conn = await engine.connect()

    # pep-249 style ConnectionFairy connection pool proxy object
    # presents a sync interface
    connection_fairy = await conn.get_raw_connection()

    # beneath that proxy is a second proxy which adapts the
    # asyncio driver into a pep-249 connection object, accessible
    # via .dbapi_connection as is the same with a sync API
    sqla_sync_conn = connection_fairy.dbapi_connection

    # the really-real innermost driver connection is available
    # from the .driver_connection attribute
    raw_asyncio_connection = connection_fairy.driver_connection

    # work with raw asyncio connection
    result = await raw_asyncio_connection.execute(...)


在 1.4.24 版本发生变更: 添加了 PoolProxiedConnection.dbapi_connection PoolProxiedConnection.driver_connection 属性以允许使用一致的接口访问 PEP-249 连接、PEP-249 适配层和底层驱动程序连接。


当使用 asyncio 驱动程序时,上述 “DBAPI” 连接实际上是一个 SQLAlchemy 适配的连接形式,呈现同步样式 pep-249 样式的 API。 要访问实际的 asyncio 驱动程序连接,它将呈现原始的 asyncio API 中,可以通过 PoolProxiedConnection.driver_connection 属性 PoolProxiedConnection 的 PoolProxiedConnection 中。对于标准 pep-249 驱动程序, PoolProxiedConnection.dbapi_connection PoolProxiedConnection.driver_connection 是同义词。


在将连接返回到池之前,必须确保将连接上的任何隔离级别设置或其他特定于作的设置恢复正常。


作为还原设置的替代方法,您可以调用 Connection.detach() 方法 或 Proxied 连接,它将取消连接与池的关联 这样,当 Connection.close() 称为:

conn = engine.connect()
conn.detach()  # detaches the DBAPI connection from the connection pool
conn.connection.<go nuts>
conn.close()  # connection is closed for real, the pool replaces it with a new connection


如何将引擎/连接/会话与 Python multiprocessing 或 os.fork() 一起使用?


这在 将连接池与多处理 或 os.fork() 一起使用 一节中进行了介绍。