事务和连接管理¶

管理交易

新建的 Session 可以说处于“开始”状态。在这种状态下, Session 尚未与任何 Engine 可能与之关联的对象。

这个 Session 然后接收对数据库连接进行操作的请求。通常,这意味着调用它来使用特定的 Engine ,可能是通过 Session.query()Session.execute() 或在挂起数据的刷新操作中,当该状态存在时发生,并且 Session.commit()Session.flush() 被称为。

当收到这些请求时,每个新的 Engine 遇到的与由 Session . 当第一 Engine 操作时, Session 可以说已经离开了“开始”状态,进入了“事务”状态。对于每一个 Engine 遇到了 Connection 与之关联,通过 Engine.contextual_connect() 方法。如果A Connection 直接与 Session (见 将会话加入外部事务(如测试套件) 例如,它直接添加到事务状态。

对于每一个 Connection , the Session 同时保持 Transaction 对象,通过调用 Connection.begin() 在每一个 Connection ,或者如果 Session 已使用标志建立对象 twophase=True ,A TwoPhaseTransaction 通过以下方式获取的对象 Connection.begin_twophase() . 这些事务都是与调用 Session.commit()Session.rollback() 方法。提交操作还将调用 TwoPhaseTransaction.prepare() 所有交易的方法(如果适用)。

当事务状态在回滚或提交之后完成时, Session releases 全部的 TransactionConnection 资源,并返回到“begin”状态,该状态将再次调用new ConnectionTransaction 接收对象作为发出SQL语句的新请求。

下面的示例说明了此生命周期:

engine = create_engine("...")
Session = sessionmaker(bind=engine)

# new session.   no connections are in use.
session = Session()
try:
    # first query.  a Connection is acquired
    # from the Engine, and a Transaction
    # started.
    item1 = session.query(Item).get(1)

    # second query.  the same Connection/Transaction
    # are used.
    item2 = session.query(Item).get(2)

    # pending changes are created.
    item1.foo = 'bar'
    item2.bar = 'foo'

    # commit.  The pending changes above
    # are flushed via flush(), the Transaction
    # is committed, the Connection object closed
    # and discarded, the underlying DBAPI connection
    # returned to the connection pool.
    session.commit()
except:
    # on rollback, the same closure of state
    # as that of commit proceeds.
    session.rollback()
    raise
finally:
    # close the Session.  This will expunge any remaining
    # objects as well as reset any existing SessionTransaction
    # state.  Neither of these steps are usually essential.
    # However, if the commit() or rollback() itself experienced
    # an unanticipated internal failure (such as due to a mis-behaved
    # user-defined event handler), .close() will ensure that
    # invalid state is removed.
    session.close()

使用保存点

如果基础引擎支持保存点事务,则可以使用 begin_nested() 方法:

Session = sessionmaker()
session = Session()
session.add(u1)
session.add(u2)

session.begin_nested() # establish a savepoint
session.add(u3)
session.rollback()  # rolls back u3, keeps u1 and u2

session.commit() # commits u1 and u2

begin_nested() 可以调用任意次数,这将为每个调用发出一个具有唯一标识符的新保存点。对于每一个 begin_nested() 调用,对应的 rollback()commit() 必须发布。(但请注意,如果返回值用作上下文管理器,即在WITH语句中,则此回滚/提交由上下文管理器在退出上下文时发出,因此不应显式添加。)

什么时候? begin_nested() 被称为 flush() 无条件发布(无论 autoflush 设置)。这是因为当 rollback() 发生时,会话的完整状态将过期,从而导致所有后续属性/实例访问引用 Session 就在之前 begin_nested() 被叫来。

begin_nested() 与不常用的 begin() 方法,返回 SessionTransaction 用作上下文管理器的对象。它可以简洁地用于各个记录插入,以便捕获诸如唯一约束异常之类的内容:

for record in records:
    try:
        with session.begin_nested():
            session.merge(record)
    except:
        print("Skipped record %s" % record)
session.commit()

自动提交模式

会话生命周期示例 管理交易 参考A Session 以默认模式运行的 autocommit=False . 在这种模式下, Session 一旦需要对数据库连接执行操作,就自动开始新事务;然后事务将继续进行,直到 Session.commit()Session.rollback() 方法被调用。

这个 Session 还有一个旧的传统使用模式,叫做 自动提交模式 ,其中事务不是隐式启动的,并且除非 Session.begin() 方法被调用, Session 将对从连接池签出的新连接执行每个数据库操作,该连接在操作完成后立即释放回池。这是指像 Session.execute() 以及执行由返回的查询时 Session.query() . 对于刷新操作, Session 在刷新期间启动一个新事务,并在完成时提交它。

警告

“自动提交”模式是 传统使用模式 不应考虑新项目。如果使用自动提交模式,强烈建议应用程序至少确保通过 Session.begin() 方法,而不是在纯自动提交模式下使用会话。

如果 Session.begin() 未使用方法,并且允许使用具有即时自动提交功能的即席连接继续操作,则应用程序可能应设置 autoflush=False, expire_on_commit=False ,因为这些功能仅用于数据库事务的上下文中。

“自动提交模式”的现代用法倾向于框架集成,这些框架集成希望在“开始”状态发生时进行特定控制。使用配置的会话 autocommit=True 可以使用 Session.begin() 方法。循环完成后 Session.commit()Session.rollback() ,连接和事务资源是 released 以及 Session 回到“自动提交”模式,直到 Session.begin() 再次调用:

Session = sessionmaker(bind=engine, autocommit=True)
session = Session()
session.begin()
try:
    item1 = session.query(Item).get(1)
    item2 = session.query(Item).get(2)
    item1.foo = 'bar'
    item2.bar = 'foo'
    session.commit()
except:
    session.rollback()
    raise

这个 Session.begin() 方法还返回与 with 声明:

Session = sessionmaker(bind=engine, autocommit=True)
session = Session()
with session.begin():
    item1 = session.query(Item).get(1)
    item2 = session.query(Item).get(2)
    item1.foo = 'bar'
    item2.bar = 'foo'

使用自动提交的子事务

子事务表示 Session.begin() 方法与 subtransactions=True 标志。这将生成一个非事务性的定界构造,允许嵌套调用 begin()commit() . 它的目的是允许构造代码,该代码可以在事务中运行,既独立于启动事务的任何外部代码,也可以在已划分事务的块中运行。

subtransactions=True 通常只与autocommit结合使用,相当于 事务块的嵌套 ,可以调用任意数量的函数 Connection.begin()Transaction.commit() 就好像他们是交易的发起人,但实际上可能参与了一个已经在进行中的交易:

# method_a starts a transaction and calls method_b
def method_a(session):
    session.begin(subtransactions=True)
    try:
        method_b(session)
        session.commit()  # transaction is committed here
    except:
        session.rollback() # rolls back the transaction
        raise

# method_b also starts a transaction, but when
# called from method_a participates in the ongoing
# transaction.
def method_b(session):
    session.begin(subtransactions=True)
    try:
        session.add(SomeObject('bat', 'lala'))
        session.commit()  # transaction is not committed yet
    except:
        session.rollback() # rolls back the transaction, in this case
                           # the one that was initiated in method_a().
        raise

# create a Session and call method_a
session = Session(autocommit=True)
method_a(session)
session.close()

子事务由 Session.flush() 确保刷新操作在事务中发生的过程,而不考虑自动提交。当自动提交被禁用时,它仍然有用,因为它强制 Session 进入“挂起回滚”状态,因为在中间操作中无法恢复失败的刷新,最终用户仍然维护事务的“范围”。

启用两阶段提交

对于支持两阶段操作(目前是mysql和postgresql)的后端,可以指示会话使用两阶段提交语义。这将协调跨数据库提交事务,以便在所有数据库中提交或回滚事务。你也可以 prepare() 用于与不由SqlAlchemy管理的事务交互的会话。要使用两阶段事务,请设置标志 twophase=True 会议内容:

engine1 = create_engine('postgresql://db1')
engine2 = create_engine('postgresql://db2')

Session = sessionmaker(twophase=True)

# bind User operations to engine 1, Account operations to engine 2
Session.configure(binds={User:engine1, Account:engine2})

session = Session()

# .... work with accounts and users

# commit.  session will issue a flush to all DBs, and a prepare step to all DBs,
# before committing both transactions
session.commit()

设置事务隔离级别

Isolation 在数据库级别上,与同时发生的其他事务相关的事务的行为。有四种众所周知的隔离模式,通常python dbapi允许通过显式API或特定于数据库的调用,在每个连接的基础上设置这些模式。

SQLAlchemy的方言支持可设置的隔离模式 -Engine 或每 -Connection 基础,在 create_engine() 水平以及 Connection.execution_options() 水平。

使用ORM时 Session 它作为一个 外观 对于引擎和连接,但不直接公开事务隔离。因此,为了影响事务隔离级别,我们需要根据 EngineConnection 适当时。

设置引擎范围内的隔离

建立一个 Sessionsessionmaker 在全局具有特定隔离级别的情况下,使用 create_engine.isolation_level 参数::

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

eng = create_engine(
    "postgresql://scott:tiger@localhost/test",
    isolation_level='REPEATABLE_READ')

maker = sessionmaker(bind=eng)

session = maker()

为单个会话设置隔离

当我们做一个新的 Session ,或者直接使用构造函数,或者当我们调用由 sessionmaker 我们可以通过 bind 直接参数,重写预先存在的绑定。我们可以把这个和 Engine.execution_options() 制作原件副本的方法 Engine 这将添加此选项:

session = maker(
    bind=engine.execution_options(isolation_level='SERIALIZABLE'))

对于这个案件 Sessionsessionmaker 配置了多个“绑定”,我们可以重新指定 binds 完全变元,或者如果我们只想替换特定的绑定,则可以使用 Session.bind_mapper()Session.bind_table() 方法::

session = maker()
session.bind_mapper(
    User, user_engine.execution_options(isolation_level='SERIALIZABLE'))

我们也可以使用下面的单独交易方法。

为单个事务设置隔离

关于隔离级别的一个关键警告是不能在 Connection 已启动事务的位置。数据库无法更改正在进行的事务的隔离级别,并且某些DBAPI和SQLAlchemy方言在这方面的行为不一致。有些可能隐式地发出回滚,有些可能隐式地发出提交,其他可能会忽略设置,直到下一个事务。因此,如果在事务已在运行时设置了此选项,则SQLAlchemy会发出警告。这个 Session 对象没有为我们提供 Connection 用于尚未开始事务的事务。所以在这里,我们需要将执行选项传递给 Session 在事务开始时通过传递 Session.connection.execution_options 提供的 Session.connection() 方法:

from sqlalchemy.orm import Session

sess = Session(bind=engine)
sess.connection(execution_options={'isolation_level': 'SERIALIZABLE'})

# work with session

# commit transaction.  the connection is released
# and reverted to its previous isolation level.
sess.commit()

上面,我们首先生产 Session 使用构造函数或 sessionmaker . 然后我们通过调用 Session.connection() ,它提供将在事务开始之前传递给连接的执行选项。如果我们与 Session 具有多个绑定或其他自定义方案的 Session.get_bind() ,我们可以将其他参数传递给 Session.connection() 为了影响如何获取绑定:

sess = my_sesssionmaker()

# set up a transaction for the bind associated with
# the User mapper
sess.connection(
    mapper=User,
    execution_options={'isolation_level': 'SERIALIZABLE'})

# work with session

# commit transaction.  the connection is released
# and reverted to its previous isolation level.
sess.commit()

这个 Session.connection.execution_options 只有在 第一 调用给 Session.connection() 对于事务中的特定绑定。如果在目标连接上已开始事务,则会发出警告::

>>> session = Session(eng)
>>> session.execute("select 1")
<sqlalchemy.engine.result.ResultProxy object at 0x1017a6c50>
>>> session.connection(execution_options={'isolation_level': 'SERIALIZABLE'})
sqlalchemy/orm/session.py:310: SAWarning: Connection is already established
for the given bind; execution_options ignored

0.9.9 新版功能: 增加了 Session.connection.execution_options 参数到 Session.connection() .

用事件跟踪事务状态

见剖面图 事务事件 有关会话事务状态更改的可用事件挂钩的概述。

将会话加入外部事务(如测试套件)

如果A Connection 正在使用已处于事务状态的(即 Transaction A) Session 只需绑定 Session 到那个 Connection . 这通常的基本原理是一个测试套件,它允许ORM代码与 Session 包括调用的能力 Session.commit() ,之后将回滚整个数据库交互::

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from unittest import TestCase

# global application scope.  create Session class, engine
Session = sessionmaker()

engine = create_engine('postgresql://...')

class SomeTest(TestCase):
    def setUp(self):
        # connect to the database
        self.connection = engine.connect()

        # begin a non-ORM transaction
        self.trans = self.connection.begin()

        # bind an individual Session to the connection
        self.session = Session(bind=self.connection)

    def test_something(self):
        # use the session in tests.

        self.session.add(Foo())
        self.session.commit()

    def tearDown(self):
        self.session.close()

        # rollback - everything that happened with the
        # Session above (including calls to commit())
        # is rolled back.
        self.trans.rollback()

        # return connection to the Engine
        self.connection.close()

以上,我们发布 Session.commit() 以及 Transaction.rollback() . 这是我们利用 Connection 对象的维护能力 子事务 或者嵌套的begin/commit或rollback对,其中只有最外层的begin/commit对实际提交事务,或者如果最外层的块回滚,则所有内容都将回滚。

使用回滚支持测试

除了需要实际调用的测试外,上面的方法对于任何类型的启用数据库的测试都能很好地工作。 Session.rollback() 在测试本身的范围内。上述方法可以扩展,以便 Session 始终在保存点范围内运行所有操作,该保存点在每个事务开始时建立,这样测试也可以回滚“事务”,同时仍保留在从未提交的更大“事务”范围内,使用两个额外事件:

from sqlalchemy import event


class SomeTest(TestCase):

    def setUp(self):
        # connect to the database
        self.connection = engine.connect()

        # begin a non-ORM transaction
        self.trans = connection.begin()

        # bind an individual Session to the connection
        self.session = Session(bind=self.connection)

        # start the session in a SAVEPOINT...
        self.session.begin_nested()

        # then each time that SAVEPOINT ends, reopen it
        @event.listens_for(self.session, "after_transaction_end")
        def restart_savepoint(session, transaction):
            if transaction.nested and not transaction._parent.nested:

                # ensure that state is expired the way
                # session.commit() at the top level normally does
                # (optional step)
                session.expire_all()

                session.begin_nested()

    # ... the tearDown() method stays the same