您現在的位置是:首頁 > 籃球
Spring原理篇(17)——Spring事務的傳播機制;該篇章是Spring原理篇的最後一章
- 由 紙鶴視界 發表于 籃球
- 2022-09-11
事務的傳播機制有幾種
@TOC# Spring系列
記錄在程式走的每一步___auth:huf
由淺入深:
我們再使用@Transctional註解的時候;我們非常喜歡一個動作就是排程方法 直接就xxxx(); 就排程了Bean內的方法; 這種方式 是不起效果的 請注意
如果要事務起效 請用他的代理方法進行排程 那麼就會起效; 這個跟事務傳播沒有關係; 是跟AOP 是否使用了代理物件有關係;
我們前一篇文章 有大量原始碼。 可能看起來很枯燥。 昨天文章有大量的文字描述原理 也想睡覺; 今天我們圖文並茂。先來一段文字: 我們執行物件方法的時候 物件方法如何去開啟事務;
1: Spring 事務管理器 建立資料庫連線;(跟上章呼應上)
2: conn。setSqlLevel 設定隔離級別;
3: conn。setAuthCommit=false; 關閉自動提交
4: conn。setThreadlocal 它是一個Map 實際上就是把資料庫連線 conn放進去;
5: target。sql 物件方法執行;
如果這裡由呼叫了一個方法 並且方法上有 @Transactional(propagation = Propagation。REQUIRES_NEW) 關鍵的來了; 1)掛起上面一個事務; 2) Spring 事務管理器 建立資料庫連線; 3) conn。setSqlLevel;設定隔離級別; 4) conn。setAuthCommit = false;關閉自動提交 5) conn。setThreadlocal 它是一個Map 實際上就是把資料庫連線 conn放進去; target。sql 。。。。又遇到一個事務。。。又在執行一遍; conn。commit; 或者是 rollback 恢復 掛起的事務 ——>把掛起事務放進去Threadlocal
N:conn。commit; 或者是 rollback
原理部分就講清楚 講明白了; 下面上原始碼部分;
因為我們使用了@Trancastional 註解 所以它最終會進來 TransactionInterceptor 也就是上一篇文章 說的Advice 中 執行 invoke方法;
中間的proceed 就是去執行我們的被代理類的代理方法; 在這過程當中; 我們一步一步分析:在Invoke方法中 return invokeWithinTransaction 該方法點進去 我們一路看下去
@Nullable
protected
Object
invokeWithinTransaction
(
Method
method
,
@Nullable
Class
<
?
>
targetClass
,
final
InvocationCallback
invocation
)
throws
Throwable
{
“如果事務屬性為null,則該方法是非事務性的。”
TransactionAttributeSource
tas
=
getTransactionAttributeSource
(
)
;
“獲取propagation 等配置屬性 ”
final
TransactionAttribute
txAttr
=
(
tas
!=
null
?
tas
。
getTransactionAttribute
(
method
,
targetClass
)
:
null
)
;
“獲取事務管理器,事務管理器 就是去建立連線 Commit rollback 等一系列操作用的”
final
TransactionManager
tm
=
determineTransactionManager
(
txAttr
)
;
“是否是 ReactiveTransactionManager 管理器;目前筆者沒有用過這個管理器; ”
“我們在config配置的 是PlatformTransactionManager”
if
(
this
。
reactiveAdapterRegistry
!=
null
&&
tm
instanceof
ReactiveTransactionManager
)
{
“該段程式碼忽略”
“該段程式碼最後肯定是return的”
return
result
;
}
“事務管理器 : 可以在Config裡面進行配置; 只需要傳入dataSource() 就可以”
PlatformTransactionManager
ptm
=
asPlatformTransactionManager
(
tm
)
;
final
String
joinpointIdentification
=
methodIdentification
(
method
,
targetClass
,
txAttr
)
;
if
(
txAttr
==
null
||
!
(
ptm
instanceof
CallbackPreferringPlatformTransactionManager
)
)
{
“該方法內部有一個開啟了事務的核心方法, 下一自然段我們重點看這個方法”
TransactionInfo
txInfo
=
createTransactionIfNecessary
(
ptm
,
txAttr
,
joinpointIdentification
)
;
Object
retVal
;
try
{
“執行被代理物件的代理方法;”
retVal
=
invocation
。
proceedWithInvocation
(
)
;
}
catch
(
Throwable
ex
)
{
“如果報錯 進入該方法進行回滾 ”
completeTransactionAfterThrowing
(
txInfo
,
ex
)
;
“丟擲異常”
throw
ex
;
}
finally
{
cleanupTransactionInfo
(
txInfo
)
;
}
“設定回滾規則,自行配置規則,這裡不會訴說”
if
(
retVal
!=
null
&&
vavrPresent
&&
VavrDelegate
。
isVavrTry
(
retVal
)
)
{
“僅當Vavr故障與回滾規則匹配時,才進行et回滾”
TransactionStatus
status
=
txInfo
。
getTransactionStatus
(
)
;
if
(
status
!=
null
&&
txAttr
!=
null
)
{
retVal
=
VavrDelegate
。
evaluateTryFailure
(
retVal
,
txAttr
,
status
)
;
}
}
“提交事務”
commitTransactionAfterReturning
(
txInfo
)
;
return
retVal
;
}
“這下面還會return 一種情況”
}
我們先來看 事務傳播機制的規則:
Propagation
。
MANDATORY
“強制性”
Propagation
。
NESTED
“巢狀”
Propagation
。
NEVER
“從不”
Propagation
。
NOT_SUPPORTED
“不支援”
Propagation
。
REQUIRED
“預設事務”
Propagation
。
REQUIRES_NEW
“從新再開啟一個事務”
Propagation
。
SUPPORTS
“支援”
在說核心方法之前 我們先知道 在傳播機制規則中 我們可以配置這7種規則; 這七種規則 我並沒有詳細說明或者是直接說明它幹什麼的。 跟著原始碼 就知道了
createTransactionIfNecessary()方法
“再其方法內部 這裡就是開啟事務的核心方法了”
status
=
tm
。
getTransaction
(
txAttr
)
;
getTransaction()方法;
public
final
TransactionStatus
getTransaction
(
@Nullable
TransactionDefinition
definition
)
throws
TransactionException
{
“如果沒有給出事務定義,則使用預設值。”
TransactionDefinition
def
=
(
definition
!=
null
?
definition
:
TransactionDefinition
。
withDefaults
(
)
)
;
Object
transaction
=
doGetTransaction
(
)
;
boolean
debugEnabled
=
logger
。
isDebugEnabled
(
)
;
if
(
isExistingTransaction
(
transaction
)
)
{
“判斷傳播機制 這個方法就解釋了我們 的傳播機制規則; 下面會詳細說”
return
handleExistingTransaction
(
def
,
transaction
,
debugEnabled
)
;
}
if
(
def
。
getTimeout
(
)
<
TransactionDefinition
。
TIMEOUT_DEFAULT
)
{
throw
new
InvalidTimeoutException
(
“Invalid transaction timeout”
,
def
。
getTimeout
(
)
)
;
}
“注意 Propagation。MANDATORY 強制性 就會丟擲異常。 它的作用是 手動的開啟事務 ”
“必須要有事務。如果沒有事務 就會丟擲異常 。 MANDATORY得到了解釋 隨後總結會進行述說”
if
(
def
。
getPropagationBehavior
(
)
==
TransactionDefinition
。
PROPAGATION_MANDATORY
)
{
throw
new
IllegalTransactionStateException
(
“No existing transaction found for transaction marked with propagation ‘mandatory’”
)
;
}
“如果 PROPAGATION_REQUIRED PROPAGATION_REQUIRES_NEW TransactionDefinition。PROPAGATION_NESTED ”
“繼續往下走。”
else
if
(
def
。
getPropagationBehavior
(
)
==
TransactionDefinition
。
PROPAGATION_REQUIRED
||
def
。
getPropagationBehavior
(
)
==
TransactionDefinition
。
PROPAGATION_REQUIRES_NEW
||
def
。
getPropagationBehavior
(
)
==
TransactionDefinition
。
PROPAGATION_NESTED
)
{
“如果有事務 掛起事務。貼合上面所描述 ”
SuspendedResourcesHolder
suspendedResources
=
suspend
(
null
)
;
if
(
debugEnabled
)
{
logger
。
debug
(
“Creating new transaction with name [”
+
def
。
getName
(
)
+
“]: ”
+
def
)
;
}
try
{
“ 核心方法 : 開啟事務 ”
return
startTransaction
(
def
,
transaction
,
debugEnabled
,
suspendedResources
)
;
}
catch
(
RuntimeException
|
Error
ex
)
{
resume
(
null
,
suspendedResources
)
;
throw
ex
;
}
}
。
。
。
。
省略部分程式碼
}
}
startTransaction
private
TransactionStatus
startTransaction
(
TransactionDefinition
definition
,
Object
transaction
,
boolean
debugEnabled
,
@Nullable
SuspendedResourcesHolder
suspendedResources
)
{
boolean
newSynchronization
=
(
getTransactionSynchronization
(
)
!=
SYNCHRONIZATION_NEVER
)
;
DefaultTransactionStatus
status
=
newTransactionStatus
(
definition
,
transaction
,
true
,
newSynchronization
,
debugEnabled
,
suspendedResources
)
;
“開啟事務 看用了什麼元件;”
doBegin
(
transaction
,
definition
)
;
prepareSynchronization
(
status
,
definition
)
;
return
status
;
}
這裡 我們用了DataSource
@Override
protected
void
doBegin
(
Object
transaction
,
TransactionDefinition
definition
)
{
DataSourceTransactionObject
txObject
=
(
DataSourceTransactionObject
)
transaction
;
“申明資料庫連線”
Connection
con
=
null
;
try
{
if
(
!
txObject
。
hasConnectionHolder
(
)
||
txObject
。
getConnectionHolder
(
)
。
isSynchronizedWithTransaction
(
)
)
{
Connection
newCon
=
obtainDataSource
(
)
。
getConnection
(
)
;
if
(
logger
。
isDebugEnabled
(
)
)
{
logger
。
debug
(
“Acquired Connection [”
+
newCon
+
“] for JDBC transaction”
)
;
}
txObject
。
setConnectionHolder
(
new
ConnectionHolder
(
newCon
)
,
true
)
;
}
txObject
。
getConnectionHolder
(
)
。
setSynchronizedWithTransaction
(
true
)
;
con
=
txObject
。
getConnectionHolder
(
)
。
getConnection
(
)
;
“設定隔離級別”
Integer
previousIsolationLevel
=
DataSourceUtils
。
prepareConnectionForTransaction
(
con
,
definition
)
;
txObject
。
setPreviousIsolationLevel
(
previousIsolationLevel
)
;
“設定事務是否可讀”
txObject
。
setReadOnly
(
definition
。
isReadOnly
(
)
)
;
if
(
con
。
getAutoCommit
(
)
)
{
txObject
。
setMustRestoreAutoCommit
(
true
)
;
if
(
logger
。
isDebugEnabled
(
)
)
{
logger
。
debug
(
“Switching JDBC Connection [”
+
con
+
“] to manual commit”
)
;
}
“關閉事務自動提交!”
con
。
setAutoCommit
(
false
)
;
}
```省略部分程式碼
}
上面那一段原始碼 匹配上這段文字了
主要是設定autoCommit 為false;
接下來 我們看這個方法
再我們的
private
TransactionStatus
handleExistingTransaction
(
TransactionDefinition
definition
,
Object
transaction
,
boolean
debugEnabled
)
throws
TransactionException
{
“如果是never 就報錯 不支援當前有事務 後面會寫進去總結裡面”
if
(
definition
。
getPropagationBehavior
(
)
==
TransactionDefinition
。
PROPAGATION_NEVER
)
{
throw
new
IllegalTransactionStateException
(
“Existing transaction found for transaction marked with propagation ‘never’”
)
;
}
“NOT_SUPPORTED 意思是不開啟新的資料庫連線。 ”
“我們的事務 實際就是一個一個的資料庫連線;我們有兩個步驟 一個是先掛起 然後開啟新的事務 ”
“ 這個就是說 我掛起。 但是不開啟新的事務;”
“那後面的那個資料庫連線 實際上用的就是Jdbc 或者是 Mybatis裡面的資料庫連線了”
if
(
definition
。
getPropagationBehavior
(
)
==
TransactionDefinition
。
PROPAGATION_NOT_SUPPORTED
)
{
if
(
debugEnabled
)
{
logger
。
debug
(
“Suspending current transaction”
)
;
}
Object
suspendedResources
=
suspend
(
transaction
)
;
boolean
newSynchronization
=
(
getTransactionSynchronization
(
)
==
SYNCHRONIZATION_ALWAYS
)
;
return
prepareTransactionStatus
(
definition
,
null
,
false
,
newSynchronization
,
debugEnabled
,
suspendedResources
)
;
}
“REQUIRES_NEW 開啟一個新的事務 也就是開啟一個新的資料庫連線”
if
(
definition
。
getPropagationBehavior
(
)
==
TransactionDefinition
。
PROPAGATION_REQUIRES_NEW
)
{
if
(
debugEnabled
)
{
logger
。
debug
(
“Suspending current transaction, creating new transaction with name [”
+
definition
。
getName
(
)
+
“]”
)
;
}
SuspendedResourcesHolder
suspendedResources
=
suspend
(
transaction
)
;
try
{
return
startTransaction
(
definition
,
transaction
,
debugEnabled
,
suspendedResources
)
;
}
catch
(
RuntimeException
|
Error
beginEx
)
{
resumeAfterBeginException
(
transaction
,
suspendedResources
,
beginEx
)
;
throw
beginEx
;
}
}
“PROPAGATION_NESTED 不會去開啟新的資料庫連線 再資料庫裡 有一個savepoint的一個說法”
“也就是設定了這個savepoint 我們可以回滾到 某一個點上 例如 我執行A B C 3個sql”
“再B sql 上面我們建立了 savepoint 那麼 就會回滾到B sql 這裡來;”
if
(
definition
。
getPropagationBehavior
(
)
==
TransactionDefinition
。
PROPAGATION_NESTED
)
{
if
(
!
isNestedTransactionAllowed
(
)
)
{
throw
new
NestedTransactionNotSupportedException
(
“Transaction manager does not allow nested transactions by default - ”
+
“specify ‘nestedTransactionAllowed’ property with value ‘true’”
)
;
}
if
(
debugEnabled
)
{
logger
。
debug
(
“Creating nested transaction with name [”
+
definition
。
getName
(
)
+
“]”
)
;
}
if
(
useSavepointForNestedTransaction
(
)
)
{
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus。
// Usually uses JDBC 3。0 savepoints。 Never activates Spring synchronization。
DefaultTransactionStatus
status
=
prepareTransactionStatus
(
definition
,
transaction
,
false
,
false
,
debugEnabled
,
null
)
;
status
。
createAndHoldSavepoint
(
)
;
return
status
;
}
else
{
// Nested transaction through nested begin and commit/rollback calls。
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction。
return
startTransaction
(
definition
,
transaction
,
debugEnabled
,
null
)
;
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED。
if
(
debugEnabled
)
{
logger
。
debug
(
“Participating in existing transaction”
)
;
}
if
(
isValidateExistingTransaction
(
)
)
{
if
(
definition
。
getIsolationLevel
(
)
!=
TransactionDefinition
。
ISOLATION_DEFAULT
)
{
Integer
currentIsolationLevel
=
TransactionSynchronizationManager
。
getCurrentTransactionIsolationLevel
(
)
;
if
(
currentIsolationLevel
==
null
||
currentIsolationLevel
!=
definition
。
getIsolationLevel
(
)
)
{
Constants
isoConstants
=
DefaultTransactionDefinition
。
constants
;
throw
new
IllegalTransactionStateException
(
“Participating transaction with definition [”
+
definition
+
“] specifies isolation level which is incompatible with existing transaction: ”
+
(
currentIsolationLevel
!=
null
?
isoConstants
。
toCode
(
currentIsolationLevel
,
DefaultTransactionDefinition
。
PREFIX_ISOLATION
)
:
“(unknown)”
)
)
;
}
}
if
(
!
definition
。
isReadOnly
(
)
)
{
if
(
TransactionSynchronizationManager
。
isCurrentTransactionReadOnly
(
)
)
{
throw
new
IllegalTransactionStateException
(
“Participating transaction with definition [”
+
definition
+
“] is not marked as read-only but existing transaction is”
)
;
}
}
}
boolean
newSynchronization
=
(
getTransactionSynchronization
(
)
!=
SYNCHRONIZATION_NEVER
)
;
return
prepareTransactionStatus
(
definition
,
transaction
,
false
,
newSynchronization
,
debugEnabled
,
null
)
;
}
以上 我們就講解了 事務的傳播機制;我們掛起 以及恢復掛起 提交 以及回滾的原始碼; 並沒有詳細說道。 這裡如果感興趣的同學 可以去看一下原始碼;
總結
1:
Propagation
。
MANDATORY
“強制性”
“強制性要求開啟事務 用於手動開啟事務”
Propagation
。
NESTED
“巢狀”
“回滾到某一個savepoint 的那個點上 不會從新開啟資料庫連線 ”
Propagation
。
NEVER
“從不”
“不允許有事務的存在”
Propagation
。
NOT_SUPPORTED
“不支援”
“掛起當前事務,不建立新的事務”
Propagation
。
REQUIRED
“預設事務”
Propagation
。
REQUIRES_NEW
“從新再開啟一個事務,與之前事務毫不相關”
Propagation
。
SUPPORTS
“支援,如果上一個Bean 或者方法 使用了事務 那麼就延用事務。 如果沒用事務 那麼就不用事務”
2:
Spring開啟事務的全部流程
Spring 核心原始碼 在這裡 就結束了;
see you
,https://blog。csdn。net/wenaicoo/article/details/121171406