新建的 Session
可以说处于“开始”状态。在这种状态下, Session
尚未与任何 Engine
可能与之关联的对象。
这个 Session
然后接收对数据库连接进行操作的请求。通常,这意味着调用它来使用特定的 Engine
,可能是通过 Session.query()
, Session.execute()
或在挂起数据的刷新操作中,当该状态存在时发生,并且 Session.commit()
或 Session.flush()
被称为。
当收到这些请求时,每个新的 Engine
遇到的与由 Session
. 当第一 Engine
操作时, Session
可以说已经离开了“开始”状态,进入了“事务”状态。对于每一个 Engine
遇到了 Connection
与之关联,通过 Engine.contextual_connect()
方法。如果A Connection
直接与 Session
(见 将会话加入外部事务(如测试套件) 例如,它直接添加到事务状态。
对于每一个 Connection
, the Session
同时保持 Transaction
对象,通过调用 Connection.begin()
在每一个 Connection
,或者如果 Session
已使用标志建立对象 twophase=True
,A TwoPhaseTransaction
通过以下方式获取的对象 Connection.begin_twophase()
. 这些事务都是与调用 Session.commit()
和 Session.rollback()
方法。提交操作还将调用 TwoPhaseTransaction.prepare()
所有交易的方法(如果适用)。
当事务状态在回滚或提交之后完成时, Session
releases 全部的 Transaction
和 Connection
资源,并返回到“begin”状态,该状态将再次调用new Connection
和 Transaction
接收对象作为发出SQL语句的新请求。
下面的示例说明了此生命周期:
engine = create_engine("...")
Session = sessionmaker(bind=engine)
# new session. no connections are in use.
session = Session()
try:
# first query. a Connection is acquired
# from the Engine, and a Transaction
# started.
item1 = session.query(Item).get(1)
# second query. the same Connection/Transaction
# are used.
item2 = session.query(Item).get(2)
# pending changes are created.
item1.foo = 'bar'
item2.bar = 'foo'
# commit. The pending changes above
# are flushed via flush(), the Transaction
# is committed, the Connection object closed
# and discarded, the underlying DBAPI connection
# returned to the connection pool.
session.commit()
except:
# on rollback, the same closure of state
# as that of commit proceeds.
session.rollback()
raise
finally:
# close the Session. This will expunge any remaining
# objects as well as reset any existing SessionTransaction
# state. Neither of these steps are usually essential.
# However, if the commit() or rollback() itself experienced
# an unanticipated internal failure (such as due to a mis-behaved
# user-defined event handler), .close() will ensure that
# invalid state is removed.
session.close()
如果基础引擎支持保存点事务,则可以使用 begin_nested()
方法:
Session = sessionmaker()
session = Session()
session.add(u1)
session.add(u2)
session.begin_nested() # establish a savepoint
session.add(u3)
session.rollback() # rolls back u3, keeps u1 and u2
session.commit() # commits u1 and u2
begin_nested()
可以调用任意次数,这将为每个调用发出一个具有唯一标识符的新保存点。对于每一个 begin_nested()
调用,对应的 rollback()
或 commit()
必须发布。(但请注意,如果返回值用作上下文管理器,即在WITH语句中,则此回滚/提交由上下文管理器在退出上下文时发出,因此不应显式添加。)
什么时候? begin_nested()
被称为 flush()
无条件发布(无论 autoflush
设置)。这是因为当 rollback()
发生时,会话的完整状态将过期,从而导致所有后续属性/实例访问引用 Session
就在之前 begin_nested()
被叫来。
begin_nested()
与不常用的 begin()
方法,返回 SessionTransaction
用作上下文管理器的对象。它可以简洁地用于各个记录插入,以便捕获诸如唯一约束异常之类的内容:
for record in records:
try:
with session.begin_nested():
session.merge(record)
except:
print("Skipped record %s" % record)
session.commit()
会话生命周期示例 管理交易 参考A Session
以默认模式运行的 autocommit=False
. 在这种模式下, Session
一旦需要对数据库连接执行操作,就自动开始新事务;然后事务将继续进行,直到 Session.commit()
或 Session.rollback()
方法被调用。
这个 Session
还有一个旧的传统使用模式,叫做 自动提交模式 ,其中事务不是隐式启动的,并且除非 Session.begin()
方法被调用, Session
将对从连接池签出的新连接执行每个数据库操作,该连接在操作完成后立即释放回池。这是指像 Session.execute()
以及执行由返回的查询时 Session.query()
. 对于刷新操作, Session
在刷新期间启动一个新事务,并在完成时提交它。
警告
“自动提交”模式是 传统使用模式 不应考虑新项目。如果使用自动提交模式,强烈建议应用程序至少确保通过 Session.begin()
方法,而不是在纯自动提交模式下使用会话。
如果 Session.begin()
未使用方法,并且允许使用具有即时自动提交功能的即席连接继续操作,则应用程序可能应设置 autoflush=False, expire_on_commit=False
,因为这些功能仅用于数据库事务的上下文中。
“自动提交模式”的现代用法倾向于框架集成,这些框架集成希望在“开始”状态发生时进行特定控制。使用配置的会话 autocommit=True
可以使用 Session.begin()
方法。循环完成后 Session.commit()
或 Session.rollback()
,连接和事务资源是 released 以及 Session
回到“自动提交”模式,直到 Session.begin()
再次调用:
Session = sessionmaker(bind=engine, autocommit=True)
session = Session()
session.begin()
try:
item1 = session.query(Item).get(1)
item2 = session.query(Item).get(2)
item1.foo = 'bar'
item2.bar = 'foo'
session.commit()
except:
session.rollback()
raise
这个 Session.begin()
方法还返回与 with
声明:
Session = sessionmaker(bind=engine, autocommit=True)
session = Session()
with session.begin():
item1 = session.query(Item).get(1)
item2 = session.query(Item).get(2)
item1.foo = 'bar'
item2.bar = 'foo'
子事务表示 Session.begin()
方法与 subtransactions=True
标志。这将生成一个非事务性的定界构造,允许嵌套调用 begin()
和 commit()
. 它的目的是允许构造代码,该代码可以在事务中运行,既独立于启动事务的任何外部代码,也可以在已划分事务的块中运行。
subtransactions=True
通常只与autocommit结合使用,相当于 事务块的嵌套 ,可以调用任意数量的函数 Connection.begin()
和 Transaction.commit()
就好像他们是交易的发起人,但实际上可能参与了一个已经在进行中的交易:
# method_a starts a transaction and calls method_b
def method_a(session):
session.begin(subtransactions=True)
try:
method_b(session)
session.commit() # transaction is committed here
except:
session.rollback() # rolls back the transaction
raise
# method_b also starts a transaction, but when
# called from method_a participates in the ongoing
# transaction.
def method_b(session):
session.begin(subtransactions=True)
try:
session.add(SomeObject('bat', 'lala'))
session.commit() # transaction is not committed yet
except:
session.rollback() # rolls back the transaction, in this case
# the one that was initiated in method_a().
raise
# create a Session and call method_a
session = Session(autocommit=True)
method_a(session)
session.close()
子事务由 Session.flush()
确保刷新操作在事务中发生的过程,而不考虑自动提交。当自动提交被禁用时,它仍然有用,因为它强制 Session
进入“挂起回滚”状态,因为在中间操作中无法恢复失败的刷新,最终用户仍然维护事务的“范围”。
对于支持两阶段操作(目前是mysql和postgresql)的后端,可以指示会话使用两阶段提交语义。这将协调跨数据库提交事务,以便在所有数据库中提交或回滚事务。你也可以 prepare()
用于与不由SqlAlchemy管理的事务交互的会话。要使用两阶段事务,请设置标志 twophase=True
会议内容:
engine1 = create_engine('postgresql://db1')
engine2 = create_engine('postgresql://db2')
Session = sessionmaker(twophase=True)
# bind User operations to engine 1, Account operations to engine 2
Session.configure(binds={User:engine1, Account:engine2})
session = Session()
# .... work with accounts and users
# commit. session will issue a flush to all DBs, and a prepare step to all DBs,
# before committing both transactions
session.commit()
Isolation 在数据库级别上,与同时发生的其他事务相关的事务的行为。有四种众所周知的隔离模式,通常python dbapi允许通过显式API或特定于数据库的调用,在每个连接的基础上设置这些模式。
SQLAlchemy的方言支持可设置的隔离模式 -Engine
或每 -Connection
基础,在 create_engine()
水平以及 Connection.execution_options()
水平。
使用ORM时 Session
它作为一个 外观 对于引擎和连接,但不直接公开事务隔离。因此,为了影响事务隔离级别,我们需要根据 Engine
或 Connection
适当时。
参见
建立一个 Session
或 sessionmaker
在全局具有特定隔离级别的情况下,使用 create_engine.isolation_level
参数::
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
eng = create_engine(
"postgresql://scott:tiger@localhost/test",
isolation_level='REPEATABLE_READ')
maker = sessionmaker(bind=eng)
session = maker()
当我们做一个新的 Session
,或者直接使用构造函数,或者当我们调用由 sessionmaker
我们可以通过 bind
直接参数,重写预先存在的绑定。我们可以把这个和 Engine.execution_options()
制作原件副本的方法 Engine
这将添加此选项:
session = maker(
bind=engine.execution_options(isolation_level='SERIALIZABLE'))
对于这个案件 Session
或 sessionmaker
配置了多个“绑定”,我们可以重新指定 binds
完全变元,或者如果我们只想替换特定的绑定,则可以使用 Session.bind_mapper()
或 Session.bind_table()
方法::
session = maker()
session.bind_mapper(
User, user_engine.execution_options(isolation_level='SERIALIZABLE'))
我们也可以使用下面的单独交易方法。
关于隔离级别的一个关键警告是不能在 Connection
已启动事务的位置。数据库无法更改正在进行的事务的隔离级别,并且某些DBAPI和SQLAlchemy方言在这方面的行为不一致。有些可能隐式地发出回滚,有些可能隐式地发出提交,其他可能会忽略设置,直到下一个事务。因此,如果在事务已在运行时设置了此选项,则SQLAlchemy会发出警告。这个 Session
对象没有为我们提供 Connection
用于尚未开始事务的事务。所以在这里,我们需要将执行选项传递给 Session
在事务开始时通过传递 Session.connection.execution_options
提供的 Session.connection()
方法:
from sqlalchemy.orm import Session
sess = Session(bind=engine)
sess.connection(execution_options={'isolation_level': 'SERIALIZABLE'})
# work with session
# commit transaction. the connection is released
# and reverted to its previous isolation level.
sess.commit()
上面,我们首先生产 Session
使用构造函数或 sessionmaker
. 然后我们通过调用 Session.connection()
,它提供将在事务开始之前传递给连接的执行选项。如果我们与 Session
具有多个绑定或其他自定义方案的 Session.get_bind()
,我们可以将其他参数传递给 Session.connection()
为了影响如何获取绑定:
sess = my_sesssionmaker()
# set up a transaction for the bind associated with
# the User mapper
sess.connection(
mapper=User,
execution_options={'isolation_level': 'SERIALIZABLE'})
# work with session
# commit transaction. the connection is released
# and reverted to its previous isolation level.
sess.commit()
这个 Session.connection.execution_options
只有在 第一 调用给 Session.connection()
对于事务中的特定绑定。如果在目标连接上已开始事务,则会发出警告::
>>> session = Session(eng)
>>> session.execute("select 1")
<sqlalchemy.engine.result.ResultProxy object at 0x1017a6c50>
>>> session.connection(execution_options={'isolation_level': 'SERIALIZABLE'})
sqlalchemy/orm/session.py:310: SAWarning: Connection is already established
for the given bind; execution_options ignored
0.9.9 新版功能: 增加了 Session.connection.execution_options
参数到 Session.connection()
.
如果A Connection
正在使用已处于事务状态的(即 Transaction
A) Session
只需绑定 Session
到那个 Connection
. 这通常的基本原理是一个测试套件,它允许ORM代码与 Session
包括调用的能力 Session.commit()
,之后将回滚整个数据库交互::
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from unittest import TestCase
# global application scope. create Session class, engine
Session = sessionmaker()
engine = create_engine('postgresql://...')
class SomeTest(TestCase):
def setUp(self):
# connect to the database
self.connection = engine.connect()
# begin a non-ORM transaction
self.trans = self.connection.begin()
# bind an individual Session to the connection
self.session = Session(bind=self.connection)
def test_something(self):
# use the session in tests.
self.session.add(Foo())
self.session.commit()
def tearDown(self):
self.session.close()
# rollback - everything that happened with the
# Session above (including calls to commit())
# is rolled back.
self.trans.rollback()
# return connection to the Engine
self.connection.close()
以上,我们发布 Session.commit()
以及 Transaction.rollback()
. 这是我们利用 Connection
对象的维护能力 子事务 或者嵌套的begin/commit或rollback对,其中只有最外层的begin/commit对实际提交事务,或者如果最外层的块回滚,则所有内容都将回滚。
使用回滚支持测试
除了需要实际调用的测试外,上面的方法对于任何类型的启用数据库的测试都能很好地工作。 Session.rollback()
在测试本身的范围内。上述方法可以扩展,以便 Session
始终在保存点范围内运行所有操作,该保存点在每个事务开始时建立,这样测试也可以回滚“事务”,同时仍保留在从未提交的更大“事务”范围内,使用两个额外事件:
from sqlalchemy import event
class SomeTest(TestCase):
def setUp(self):
# connect to the database
self.connection = engine.connect()
# begin a non-ORM transaction
self.trans = connection.begin()
# bind an individual Session to the connection
self.session = Session(bind=self.connection)
# start the session in a SAVEPOINT...
self.session.begin_nested()
# then each time that SAVEPOINT ends, reopen it
@event.listens_for(self.session, "after_transaction_end")
def restart_savepoint(session, transaction):
if transaction.nested and not transaction._parent.nested:
# ensure that state is expired the way
# session.commit() at the top level normally does
# (optional step)
session.expire_all()
session.begin_nested()
# ... the tearDown() method stays the same