分佈式事務 TCC-Transaction 源碼分析——TCC 實現
本文主要基於 TCC-Transaction 1.2.3.3 正式版
1. 概述
2. TCC 原理
3. TCC-Transaction 原理
4. 事務與參與者
4.1 事務
4.2 參與者
5. 事務管理器
5.1 發起根事務
5.2 傳播發起分支事務
5.3 傳播獲取分支事務
5.4 提交事務
5.5 回滾事務
5.6 添加參與者到事務
6. 事務攔截器
6.1 Compensable
6.2 可補償事務攔截器
6.3 資源協調者攔截器
666. 彩蛋
1. 概述
本文分享 TCC 實現。主要涉及如下三個 Maven 項目:
tcc-transaction-core
:tcc-transaction 底層實現。tcc-transaction-api
:tcc-transaction 使用 API。tcc-transaction-spring
:tcc-transaction Spring 支持。
你行好事會因為得到讚賞而愉悅
同理,開源項目貢獻者會因為 Star 而更加有動力
為 TCC-Transaction 點贊!傳送門
OK,開始我們的第一段 TCC 旅程吧。
ps:筆者假設你已經閱讀過《tcc-transaction 官方文檔 —— 使用指南1.2.x》。
ps2:未特殊說明的情況下,本文事務指的是 TCC事務。
2. TCC 原理
FROM https://support.hwclouds.com/devg-servicestage/zh-cn_topic_0056814426.html
TCC事務
為了解決在事務運行過程中大顆粒度資源鎖定的問題,業界提出一種新的事務模型,它是基於業務層面的事務定義。鎖粒度完全由業務自己控制。它本質是一種補償的思路。它把事務運行過程分成 Try、Confirm / Cancel 兩個階段。在每個階段的邏輯由業務代碼控制。這樣就事務的鎖粒度可以完全自由控制。業務可以在犧牲隔離性的情況下,獲取更高的性能。
Try 階段
完成所有業務檢查( 一致性 )
預留必須業務資源( 準隔離性 )
Try :嘗試執行業務
Confirm / Cancel 階段:
釋放 Try 階段預留的業務資源
Cancel 操作滿足冪等性
真正執行業務
不做任務業務檢查
Confirm 操作滿足冪等性
Confirm :確認執行業務
Cancel :取消執行業務
Confirm 與 Cancel 互斥
整體流程如下圖:
本文主要基於 TCC-Transaction 1.2.3.3 正式版
1. 概述
2. TCC 原理
3. TCC-Transaction 原理
4. 事務與參與者
4.1 事務
4.2 參與者
5. 事務管理器
5.1 發起根事務
5.2 傳播發起分支事務
5.3 傳播獲取分支事務
5.4 提交事務
5.5 回滾事務
5.6 添加參與者到事務
6. 事務攔截器
6.1 Compensable
6.2 可補償事務攔截器
6.3 資源協調者攔截器
666. 彩蛋
1. 概述
本文分享 TCC 實現。主要涉及如下三個 Maven 項目:
tcc-transaction-core
:tcc-transaction 底層實現。tcc-transaction-api
:tcc-transaction 使用 API。tcc-transaction-spring
:tcc-transaction Spring 支持。
你行好事會因為得到讚賞而愉悅
同理,開源項目貢獻者會因為 Star 而更加有動力
為 TCC-Transaction 點贊!傳送門
OK,開始我們的第一段 TCC 旅程吧。
ps:筆者假設你已經閱讀過《tcc-transaction 官方文檔 —— 使用指南1.2.x》。
ps2:未特殊說明的情況下,本文事務指的是 TCC事務。
2. TCC 原理
FROM https://support.hwclouds.com/devg-servicestage/zh-cn_topic_0056814426.html
TCC事務
為了解決在事務運行過程中大顆粒度資源鎖定的問題,業界提出一種新的事務模型,它是基於業務層面的事務定義。鎖粒度完全由業務自己控制。它本質是一種補償的思路。它把事務運行過程分成 Try、Confirm / Cancel 兩個階段。在每個階段的邏輯由業務代碼控制。這樣就事務的鎖粒度可以完全自由控制。業務可以在犧牲隔離性的情況下,獲取更高的性能。
Try 階段
完成所有業務檢查( 一致性 )
預留必須業務資源( 準隔離性 )
Try :嘗試執行業務
Confirm / Cancel 階段:
釋放 Try 階段預留的業務資源
Cancel 操作滿足冪等性
真正執行業務
不做任務業務檢查
Confirm 操作滿足冪等性
Confirm :確認執行業務
Cancel :取消執行業務
Confirm 與 Cancel 互斥
整體流程如下圖:
紅框部分功能由
tcc-transaction-core
實現:
啟動業務活動
登記業務操作
提交 / 回滾業務活動
黃框部分功能由
tcc-transaction-http-sample
實現( 官方提供的示例項目 ):
Try 操作
Confirm 操作
Cancel 操作
與 2PC協議 比較:
位於業務服務層而非自願層
沒有單獨的準備( Prepare )階段,Try 操作兼備自願操作與準備能力
Try 操作可以靈活選擇業務資源的鎖定粒度
較高開發成本
參考資料:
《支付寶運營架構中柔性事務指的是什麼?》
《分佈式事務的典型處理方式:2PC、TCC、異步確保和最大努力型》
3. TCC-Transaction 原理
在 TCC 裡,一個業務活動可以有多個事務,每個業務操作歸屬於不同的事務,即一個事務可以包含多個業務操作。TCC-Transaction 將每個業務操作抽象成事務參與者,每個事務可以包含多個參與者。
參與者需要聲明 try / confirm / cancel 三個類型的方法,和 TCC 的操作一一對應。在程序裡,通過 @Compensable 註解標記在 try 方法上,並填寫對應的 confirm / cancel 方法,示例代碼如下:
// try@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class)public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}// confirmpublic void confirmRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}// cancelpublic void cancelRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}
在示例代碼中,我們看到 TransactionContext,事務上下文,這個是怎麼生成的呢?這裡先賣一個關子。
TCC-Transaction 有兩個攔截器,通過對 @Compensable AOP 切面( 參與者 try 方法 )進行攔截,透明化對參與者 confirm / cancel 方法調用,從而實現 TCC 。簡化流程如下圖:
本文主要基於 TCC-Transaction 1.2.3.3 正式版
1. 概述
2. TCC 原理
3. TCC-Transaction 原理
4. 事務與參與者
4.1 事務
4.2 參與者
5. 事務管理器
5.1 發起根事務
5.2 傳播發起分支事務
5.3 傳播獲取分支事務
5.4 提交事務
5.5 回滾事務
5.6 添加參與者到事務
6. 事務攔截器
6.1 Compensable
6.2 可補償事務攔截器
6.3 資源協調者攔截器
666. 彩蛋
1. 概述
本文分享 TCC 實現。主要涉及如下三個 Maven 項目:
tcc-transaction-core
:tcc-transaction 底層實現。tcc-transaction-api
:tcc-transaction 使用 API。tcc-transaction-spring
:tcc-transaction Spring 支持。
你行好事會因為得到讚賞而愉悅
同理,開源項目貢獻者會因為 Star 而更加有動力
為 TCC-Transaction 點贊!傳送門
OK,開始我們的第一段 TCC 旅程吧。
ps:筆者假設你已經閱讀過《tcc-transaction 官方文檔 —— 使用指南1.2.x》。
ps2:未特殊說明的情況下,本文事務指的是 TCC事務。
2. TCC 原理
FROM https://support.hwclouds.com/devg-servicestage/zh-cn_topic_0056814426.html
TCC事務
為了解決在事務運行過程中大顆粒度資源鎖定的問題,業界提出一種新的事務模型,它是基於業務層面的事務定義。鎖粒度完全由業務自己控制。它本質是一種補償的思路。它把事務運行過程分成 Try、Confirm / Cancel 兩個階段。在每個階段的邏輯由業務代碼控制。這樣就事務的鎖粒度可以完全自由控制。業務可以在犧牲隔離性的情況下,獲取更高的性能。
Try 階段
完成所有業務檢查( 一致性 )
預留必須業務資源( 準隔離性 )
Try :嘗試執行業務
Confirm / Cancel 階段:
釋放 Try 階段預留的業務資源
Cancel 操作滿足冪等性
真正執行業務
不做任務業務檢查
Confirm 操作滿足冪等性
Confirm :確認執行業務
Cancel :取消執行業務
Confirm 與 Cancel 互斥
整體流程如下圖:
紅框部分功能由
tcc-transaction-core
實現:
啟動業務活動
登記業務操作
提交 / 回滾業務活動
黃框部分功能由
tcc-transaction-http-sample
實現( 官方提供的示例項目 ):
Try 操作
Confirm 操作
Cancel 操作
與 2PC協議 比較:
位於業務服務層而非自願層
沒有單獨的準備( Prepare )階段,Try 操作兼備自願操作與準備能力
Try 操作可以靈活選擇業務資源的鎖定粒度
較高開發成本
參考資料:
《支付寶運營架構中柔性事務指的是什麼?》
《分佈式事務的典型處理方式:2PC、TCC、異步確保和最大努力型》
3. TCC-Transaction 原理
在 TCC 裡,一個業務活動可以有多個事務,每個業務操作歸屬於不同的事務,即一個事務可以包含多個業務操作。TCC-Transaction 將每個業務操作抽象成事務參與者,每個事務可以包含多個參與者。
參與者需要聲明 try / confirm / cancel 三個類型的方法,和 TCC 的操作一一對應。在程序裡,通過 @Compensable 註解標記在 try 方法上,並填寫對應的 confirm / cancel 方法,示例代碼如下:
// try@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class)public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}// confirmpublic void confirmRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}// cancelpublic void cancelRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}
在示例代碼中,我們看到 TransactionContext,事務上下文,這個是怎麼生成的呢?這裡先賣一個關子。
TCC-Transaction 有兩個攔截器,通過對 @Compensable AOP 切面( 參與者 try 方法 )進行攔截,透明化對參與者 confirm / cancel 方法調用,從而實現 TCC 。簡化流程如下圖:
第一個攔截器,可補償事務攔截器,實現如下功能:
在 Try 階段,對事務的發起、傳播。
在 Confirm / Cancel 階段,對事務提交或回滾。
為什麼會有對事務的傳播呢?在遠程調用服務的參與者時,會通過"參數"( 需要序列化 )的形式傳遞事務給遠程參與者。
第二個攔截器,資源協調者攔截器,實現如下功能:
在 Try 階段,添加參與者到事務中。當事務上下文不存在時,進行創建。
實際攔截器對事務的處理會比上圖複雜一些,在本文「6. 事務攔截器」詳細解析。
在 TCC-Transaction 代碼實現上,組件分層如下圖:
本文主要基於 TCC-Transaction 1.2.3.3 正式版
1. 概述
2. TCC 原理
3. TCC-Transaction 原理
4. 事務與參與者
4.1 事務
4.2 參與者
5. 事務管理器
5.1 發起根事務
5.2 傳播發起分支事務
5.3 傳播獲取分支事務
5.4 提交事務
5.5 回滾事務
5.6 添加參與者到事務
6. 事務攔截器
6.1 Compensable
6.2 可補償事務攔截器
6.3 資源協調者攔截器
666. 彩蛋
1. 概述
本文分享 TCC 實現。主要涉及如下三個 Maven 項目:
tcc-transaction-core
:tcc-transaction 底層實現。tcc-transaction-api
:tcc-transaction 使用 API。tcc-transaction-spring
:tcc-transaction Spring 支持。
你行好事會因為得到讚賞而愉悅
同理,開源項目貢獻者會因為 Star 而更加有動力
為 TCC-Transaction 點贊!傳送門
OK,開始我們的第一段 TCC 旅程吧。
ps:筆者假設你已經閱讀過《tcc-transaction 官方文檔 —— 使用指南1.2.x》。
ps2:未特殊說明的情況下,本文事務指的是 TCC事務。
2. TCC 原理
FROM https://support.hwclouds.com/devg-servicestage/zh-cn_topic_0056814426.html
TCC事務
為了解決在事務運行過程中大顆粒度資源鎖定的問題,業界提出一種新的事務模型,它是基於業務層面的事務定義。鎖粒度完全由業務自己控制。它本質是一種補償的思路。它把事務運行過程分成 Try、Confirm / Cancel 兩個階段。在每個階段的邏輯由業務代碼控制。這樣就事務的鎖粒度可以完全自由控制。業務可以在犧牲隔離性的情況下,獲取更高的性能。
Try 階段
完成所有業務檢查( 一致性 )
預留必須業務資源( 準隔離性 )
Try :嘗試執行業務
Confirm / Cancel 階段:
釋放 Try 階段預留的業務資源
Cancel 操作滿足冪等性
真正執行業務
不做任務業務檢查
Confirm 操作滿足冪等性
Confirm :確認執行業務
Cancel :取消執行業務
Confirm 與 Cancel 互斥
整體流程如下圖:
紅框部分功能由
tcc-transaction-core
實現:
啟動業務活動
登記業務操作
提交 / 回滾業務活動
黃框部分功能由
tcc-transaction-http-sample
實現( 官方提供的示例項目 ):
Try 操作
Confirm 操作
Cancel 操作
與 2PC協議 比較:
位於業務服務層而非自願層
沒有單獨的準備( Prepare )階段,Try 操作兼備自願操作與準備能力
Try 操作可以靈活選擇業務資源的鎖定粒度
較高開發成本
參考資料:
《支付寶運營架構中柔性事務指的是什麼?》
《分佈式事務的典型處理方式:2PC、TCC、異步確保和最大努力型》
3. TCC-Transaction 原理
在 TCC 裡,一個業務活動可以有多個事務,每個業務操作歸屬於不同的事務,即一個事務可以包含多個業務操作。TCC-Transaction 將每個業務操作抽象成事務參與者,每個事務可以包含多個參與者。
參與者需要聲明 try / confirm / cancel 三個類型的方法,和 TCC 的操作一一對應。在程序裡,通過 @Compensable 註解標記在 try 方法上,並填寫對應的 confirm / cancel 方法,示例代碼如下:
// try@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class)public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}// confirmpublic void confirmRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}// cancelpublic void cancelRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}
在示例代碼中,我們看到 TransactionContext,事務上下文,這個是怎麼生成的呢?這裡先賣一個關子。
TCC-Transaction 有兩個攔截器,通過對 @Compensable AOP 切面( 參與者 try 方法 )進行攔截,透明化對參與者 confirm / cancel 方法調用,從而實現 TCC 。簡化流程如下圖:
第一個攔截器,可補償事務攔截器,實現如下功能:
在 Try 階段,對事務的發起、傳播。
在 Confirm / Cancel 階段,對事務提交或回滾。
為什麼會有對事務的傳播呢?在遠程調用服務的參與者時,會通過"參數"( 需要序列化 )的形式傳遞事務給遠程參與者。
第二個攔截器,資源協調者攔截器,實現如下功能:
在 Try 階段,添加參與者到事務中。當事務上下文不存在時,進行創建。
實際攔截器對事務的處理會比上圖複雜一些,在本文「6. 事務攔截器」詳細解析。
在 TCC-Transaction 代碼實現上,組件分層如下圖:
本文按照如下順序分享:
「4. 事務攔截器」
「5. 事務管理器」
「6. 事務管理器」
內容是自下而上的方式分享,每個組件可以更加整體的被認識。當然這可能對你理解時產生一臉悶逼,所以推薦兩種閱讀方式:
簡讀 x 1 + 深讀 x 1
倒著讀,發現未分享的方法,全文檢索該方法。
事務存儲器在《TCC-Transaction 源碼解析 —— 事務存儲於恢復》詳細解析。
事務恢復在《TCC-Transaction 源碼解析 —— 事務恢復》詳細解析。
4. 事務與參與者
在 TCC 裡,一個事務( org.mengyun.tcctransaction.Transaction
) 可以有多個參與者( org.mengyun.tcctransaction.Participant
)參與業務活動。類圖關係如下( 打開大圖 ):
本文主要基於 TCC-Transaction 1.2.3.3 正式版
1. 概述
2. TCC 原理
3. TCC-Transaction 原理
4. 事務與參與者
4.1 事務
4.2 參與者
5. 事務管理器
5.1 發起根事務
5.2 傳播發起分支事務
5.3 傳播獲取分支事務
5.4 提交事務
5.5 回滾事務
5.6 添加參與者到事務
6. 事務攔截器
6.1 Compensable
6.2 可補償事務攔截器
6.3 資源協調者攔截器
666. 彩蛋
1. 概述
本文分享 TCC 實現。主要涉及如下三個 Maven 項目:
tcc-transaction-core
:tcc-transaction 底層實現。tcc-transaction-api
:tcc-transaction 使用 API。tcc-transaction-spring
:tcc-transaction Spring 支持。
你行好事會因為得到讚賞而愉悅
同理,開源項目貢獻者會因為 Star 而更加有動力
為 TCC-Transaction 點贊!傳送門
OK,開始我們的第一段 TCC 旅程吧。
ps:筆者假設你已經閱讀過《tcc-transaction 官方文檔 —— 使用指南1.2.x》。
ps2:未特殊說明的情況下,本文事務指的是 TCC事務。
2. TCC 原理
FROM https://support.hwclouds.com/devg-servicestage/zh-cn_topic_0056814426.html
TCC事務
為了解決在事務運行過程中大顆粒度資源鎖定的問題,業界提出一種新的事務模型,它是基於業務層面的事務定義。鎖粒度完全由業務自己控制。它本質是一種補償的思路。它把事務運行過程分成 Try、Confirm / Cancel 兩個階段。在每個階段的邏輯由業務代碼控制。這樣就事務的鎖粒度可以完全自由控制。業務可以在犧牲隔離性的情況下,獲取更高的性能。
Try 階段
完成所有業務檢查( 一致性 )
預留必須業務資源( 準隔離性 )
Try :嘗試執行業務
Confirm / Cancel 階段:
釋放 Try 階段預留的業務資源
Cancel 操作滿足冪等性
真正執行業務
不做任務業務檢查
Confirm 操作滿足冪等性
Confirm :確認執行業務
Cancel :取消執行業務
Confirm 與 Cancel 互斥
整體流程如下圖:
紅框部分功能由
tcc-transaction-core
實現:
啟動業務活動
登記業務操作
提交 / 回滾業務活動
黃框部分功能由
tcc-transaction-http-sample
實現( 官方提供的示例項目 ):
Try 操作
Confirm 操作
Cancel 操作
與 2PC協議 比較:
位於業務服務層而非自願層
沒有單獨的準備( Prepare )階段,Try 操作兼備自願操作與準備能力
Try 操作可以靈活選擇業務資源的鎖定粒度
較高開發成本
參考資料:
《支付寶運營架構中柔性事務指的是什麼?》
《分佈式事務的典型處理方式:2PC、TCC、異步確保和最大努力型》
3. TCC-Transaction 原理
在 TCC 裡,一個業務活動可以有多個事務,每個業務操作歸屬於不同的事務,即一個事務可以包含多個業務操作。TCC-Transaction 將每個業務操作抽象成事務參與者,每個事務可以包含多個參與者。
參與者需要聲明 try / confirm / cancel 三個類型的方法,和 TCC 的操作一一對應。在程序裡,通過 @Compensable 註解標記在 try 方法上,並填寫對應的 confirm / cancel 方法,示例代碼如下:
// try@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class)public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}// confirmpublic void confirmRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}// cancelpublic void cancelRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {}
在示例代碼中,我們看到 TransactionContext,事務上下文,這個是怎麼生成的呢?這裡先賣一個關子。
TCC-Transaction 有兩個攔截器,通過對 @Compensable AOP 切面( 參與者 try 方法 )進行攔截,透明化對參與者 confirm / cancel 方法調用,從而實現 TCC 。簡化流程如下圖:
第一個攔截器,可補償事務攔截器,實現如下功能:
在 Try 階段,對事務的發起、傳播。
在 Confirm / Cancel 階段,對事務提交或回滾。
為什麼會有對事務的傳播呢?在遠程調用服務的參與者時,會通過"參數"( 需要序列化 )的形式傳遞事務給遠程參與者。
第二個攔截器,資源協調者攔截器,實現如下功能:
在 Try 階段,添加參與者到事務中。當事務上下文不存在時,進行創建。
實際攔截器對事務的處理會比上圖複雜一些,在本文「6. 事務攔截器」詳細解析。
在 TCC-Transaction 代碼實現上,組件分層如下圖:
本文按照如下順序分享:
「4. 事務攔截器」
「5. 事務管理器」
「6. 事務管理器」
內容是自下而上的方式分享,每個組件可以更加整體的被認識。當然這可能對你理解時產生一臉悶逼,所以推薦兩種閱讀方式:
簡讀 x 1 + 深讀 x 1
倒著讀,發現未分享的方法,全文檢索該方法。
事務存儲器在《TCC-Transaction 源碼解析 —— 事務存儲於恢復》詳細解析。
事務恢復在《TCC-Transaction 源碼解析 —— 事務恢復》詳細解析。
4. 事務與參與者
在 TCC 裡,一個事務( org.mengyun.tcctransaction.Transaction
) 可以有多個參與者( org.mengyun.tcctransaction.Participant
)參與業務活動。類圖關係如下( 打開大圖 ):
4.1 事務
Transaction 實現代碼如下:
public class Transaction implements Serializable { private static final long serialVersionUID = 7291423944314337931L; /** * 事務編號 */ private TransactionXid xid; /** * 事務狀態 */ private TransactionStatus status; /** * 事務類型 */ private TransactionType transactionType; /** * 重試次數 */ private volatile int retriedCount = 0; /** * 創建時間 */ private Date createTime = new Date(); /** * 最後更新時間 */ private Date lastUpdateTime = new Date(); /** * 版本號 */ private long version = 1; /** * 參與者集合 */ private List<Participant> participants = new ArrayList<Participant>(); /** * 附帶屬性映射 */ private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>(); /** * 添加參與者 * * @param participant 參與者 */ public void enlistParticipant(Participant participant) { participants.add(participant); } /** * 提交 TCC 事務 */ public void commit() { for (Participant participant : participants) { participant.commit(); } } /** * 回滾 TCC 事務 */ public void rollback() { for (Participant participant : participants) { participant.rollback(); } }}
xid,事務編號( TransactionXid ),用於唯一標識一個事務。使用 UUID 算法生成,保證唯一性。
org.mengyun.tcctransaction.api.TransactionXid
實現javax.transaction.xa.Xid
接口,實現代碼如下:public class TransactionXid implements Xid, Serializable { private static final long serialVersionUID = -6817267250789142043L; /**
*/ private int formatId = 1; /** * 全局事務編號 */ private byte[] globalTransactionId; /** * 分支事務編號 */ private byte[] branchQualifier;}```* TODO 為什麼要繼承 Xid 接口?
status,事務狀態( TransactionStatus )。
org.mengyun.tcctransaction.api.TransactionStatus
實現代碼如下:public enum TransactionStatus { /**
*/ TRYING(1), /** * 確認中狀態 */ CONFIRMING(2), /** * 取消中狀態 */ CANCELLING(3); private int id;}```
transactionType,事務類型( TransactionType )。
org.mengyun.tcctransaction.common.TransactionType
實現代碼如下:public enum TransactionType { /**
*/ ROOT(1), /** * 分支事務 */ BRANCH(2); int id;}```* 在[「6.2 可補償事務攔截器」](#)有詳細解析,可以看到看到這兩種事務是如何發起。
retriedCount,重試次數。在 TCC 過程中,可能參與者異常崩潰,這個時候會進行重試直到成功或超過最大次數。在《TCC-Transaction 源碼解析 —— 事務恢復》詳細解析。
version,版本號,用於樂觀鎖更新事務。在《TCC-Transaction 源碼解析 —— 事務存儲器》詳細解析。
attachments,附帶屬性映射。在《TCC-Transaction 源碼解析 —— Dubbo 支持》詳細解析。
提供
#enlistParticipant()
方法,添加事務參與者。提供
#commit()
方法,調用參與者們提交事務。提供
#rollback()
方法,調用參與者回滾事務。
4.2 參與者
Participant 實現代碼如下:
public class Participant implements Serializable { private static final long serialVersionUID = 4127729421281425247L; /** * 事務編號 */ private TransactionXid xid; /** * 確認執行業務方法調用上下文 */ private InvocationContext confirmInvocationContext; /** * 取消執行業務方法 */ private InvocationContext cancelInvocationContext; /** * 執行器 */ private Terminator terminator = new Terminator(); /** * 事務上下文編輯 */ Class<? extends TransactionContextEditor> transactionContextEditorClass; /** * 提交事務 */ public void commit() { terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass); } /** * 回滾事務 */ public void rollback() { terminator.invoke(new TransactionContext(xid, TransactionStatus.CANCELLING.getId()), cancelInvocationContext, transactionContextEditorClass); }}
xid,參與者事務編號。通過
TransactionXid.globalTransactionId
屬性,關聯上其所屬的事務。當參與者進行遠程調用時,遠程的分支事務的事務編號等於該參與者的事務編號。通過事務編號的關聯,TCC Confirm / Cancel 階段,使用參與者的事務編號和遠程的分支事務進行關聯,從而實現事務的提交和回滾,在「5.2 傳播發起分支事務」 + 「6.2 可補償事務攔截器」可以看到具體實現。confirmInvocationContext,確認執行業務方法調用上下文( InvocationContext )。
org.mengyun.tcctransaction.InvocationContext
實現代碼如下:public class InvocationContext implements Serializable { private static final long serialVersionUID = -7969140711432461165L; /**
*/ private Class targetClass; /** * 方法名 */ private String methodName; /** * 參數類型數組 */ private Class[] parameterTypes; /** * 參數數組 */ private Object[] args;}```* InvocationContext,執行方法調用上下文,記錄類、方法名、參數類型數組、參數數組。通過這些屬性,可以執行提交 / 回滾事務。在 `org.mengyun.tcctransaction.Terminator` 會看到具體的代碼實現。**本質上,TCC 通過多個參與者的 try / confirm / cancel 方法,實現事務的最終一致性**。
cancelInvocationContext,取消執行業務方法調用上下文( InvocationContext )。
terminator,執行器( Terminator )。
org.mengyun.tcctransaction.Terminator
實現代碼如下:public class Terminator implements Serializable { private static final long serialVersionUID = -164958655471605778L; public Object invoke(TransactionContext transactionContext, InvocationContext invocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) { if (StringUtils.isNotEmpty(invocationContext.getMethodName())) { try { // 獲得 參與者對象 Object target = FactoryBuilder.factoryOf(invocationContext.getTargetClass()).getInstance(); // 獲得 方法 Method method = target.getClass().getMethod(invocationContext.getMethodName(), invocationContext.getParameterTypes()); // 設置 事務上下文 到方法參數 FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().set(transactionContext, target, method, invocationContext.getArgs()); // 執行方法 return method.invoke(target, invocationContext.getArgs()); } catch (Exception e) { throw new SystemException(e); } } return null; }}
FactoryBuilder,工廠 Builder,感興趣的同學點擊鏈接查看,已經添加完整中文代碼註釋。
TransactionContextEditor,在本文「6.1 Compensable」詳細解析。
transactionContextEditorClass,事務上下文編輯,在「6.1 Compensable」詳細解析。
提交
#commit()
方法,提交參與者自己的事務。提交
#rollback()
方法,回滾參與者自己的事務。
5. 事務管理器
org.mengyun.tcctransaction.TransactionManager
,事務管理器,提供事務的獲取、發起、提交、回滾,參與者的新增等等方法。
5.1 發起根事務
提供 begin()
方法,發起根事務。該方法在調用方法類型為 MethodType.ROOT 並且 事務處於 Try 階段被調用。MethodType 在「6.2 可補償事務攔截器」詳細解析。
實現代碼如下:
// TransactionManager.java/*** 發起根事務** @return 事務*/public Transaction begin() { // 創建 根事務 Transaction transaction = new Transaction(TransactionType.ROOT); // 存儲 事務 transactionRepository.create(transaction); // 註冊 事務 registerTransaction(transaction); return transaction;}
調用 Transaction 構造方法,創建根事務。實現代碼如下:
// Transaction.java/**
** @param transactionType 事務類型*/public Transaction(TransactionType transactionType) { this.xid = new TransactionXid(); this.status = TransactionStatus.TRYING; // 嘗試中狀態 this.transactionType = transactionType;}```* 目前該構造方法只有 `TransactionManager#begin()` 在調用,即只創建**根事務**。
調用
TransactionRepository#crete()
方法,存儲事務。調用
#registerTransaction(...)
方法,註冊事務到當前線程事務隊列。實現代碼如下:// TransactionManager.java/**
*/private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>();/*** 註冊事務到當前線程事務隊列** @param transaction 事務*/private void registerTransaction(Transaction transaction) { if (CURRENT.get() == null) { CURRENT.set(new LinkedList<Transaction>()); } CURRENT.get().push(transaction); // 添加到頭部}```* **可能有同學會比較好奇,為什麼使用隊列存儲當前線程事務**?TCC-Transaction 支持**多個**的事務**獨立存在**,後創建的事務先提交,類似 Spring 的`org.springframework.transaction.annotation.Propagation.REQUIRES_NEW` 。在下文,很快我們就會看到 TCC-Transaction 自己的 `org.mengyun.tcctransaction.api.Propagation` 。
5.2 傳播發起分支事務
調用 #propagationNewBegin(...)
方法,傳播發起分支事務。該方法在調用方法類型為 MethodType.PROVIDER 並且 事務處於 Try 階段被調用。MethodType 在「6.2 可補償事務攔截器」詳細解析。
實現代碼如下:
/*** 傳播發起分支事務** @param transactionContext 事務上下文* @return 分支事務*/public Transaction propagationNewBegin(TransactionContext transactionContext) { // 創建 分支事務 Transaction transaction = new Transaction(transactionContext); // 存儲 事務 transactionRepository.create(transaction); // 註冊 事務 registerTransaction(transaction); return transaction;}
調用 Transaction 構造方法,創建分支事務。實現代碼如下:
/**
** @param transactionContext 事務上下文*/public Transaction(TransactionContext transactionContext) { this.xid = transactionContext.getXid(); // 事務上下文的 xid this.status = TransactionStatus.TRYING; // 嘗試中狀態 this.transactionType = TransactionType.BRANCH; // 分支事務}```* **分支**事務使用傳播的事務上下文的事務編號。
調用
TransactionRepository#crete()
方法,存儲事務。為什麼要存儲分支事務,在「6.3 資源協調者攔截器」詳細解析。調用
#registerTransaction(...)
方法,註冊事務到當前線程事務隊列。
5.3 傳播獲取分支事務
調用 #propagationExistBegin(...)
方法,傳播發起分支事務。該方法在調用方法類型為 MethodType.PROVIDER 並且 事務處於 Confirm / Cancel 階段被調用。MethodType 在「6.2 可補償事務攔截器」詳細解析。
實現代碼如下:
/*** 傳播獲取分支事務** @param transactionContext 事務上下文* @return 分支事務* @throws NoExistedTransactionException 當事務不存在時*/public Transaction propagationExistBegin(TransactionContext transactionContext) throws NoExistedTransactionException { // 查詢 事務 Transaction transaction = transactionRepository.findByXid(transactionContext.getXid()); if (transaction != null) { // 設置 事務 狀態 transaction.changeStatus(TransactionStatus.valueOf(transactionContext.getStatus())); // 註冊 事務 registerTransaction(transaction); return transaction; } else { throw new NoExistedTransactionException(); }}
調用
TransactionRepository#findByXid()
方法,查詢事務。調用
Transaction#changeStatus(...)
方法,設置事務狀態為 CONFIRMING 或 CANCELLING。調用
#registerTransaction(...)
方法,註冊事務到當前線程事務隊列。為什麼此處是分支事務呢?結合
#propagationNewBegin(...)
思考下。
5.4 提交事務
調用 #commit(...)
方法,提交事務。該方法在事務處於 Confirm / Cancel 階段被調用。
實現代碼如下:
/*** 提交事務*/public void commit() { // 獲取 事務 Transaction transaction = getCurrentTransaction(); // 設置 事務狀態 為 CONFIRMING transaction.changeStatus(TransactionStatus.CONFIRMING); // 更新 事務 transactionRepository.update(transaction); try { // 提交 事務 transaction.commit(); // 刪除 事務 transactionRepository.delete(transaction); } catch (Throwable commitException) { logger.error("compensable transaction confirm failed.", commitException); throw new ConfirmingException(commitException); }}
調用
#getCurrentTransaction()
方法, 獲取事務。實現代碼如下:public Transaction getCurrentTransaction() { if (isTransactionActive()) { return CURRENT.get().peek(); // 獲得頭部元素 } return null;}public boolean isTransactionActive() { Deque<Transaction> transactions = CURRENT.get(); return transactions != null && !transactions.isEmpty();}
為什麼獲得隊列頭部元素呢?該元素即是上文調用
#registerTransaction(...)
註冊到隊列頭部。
調用
Transaction#changeStatus(...)
方法, 設置事務狀態為 CONFIRMING。調用
TransactionRepository#update(...)
方法, 更新事務。調用
Transaction#commit(...)
方法, 提交事務。調用
TransactionRepository#delete(...)
方法,刪除事務。
5.5 回滾事務
調用 #rollback(...)
方法,取消事務,和 #commit()
方法基本類似。該方法在事務處於 Confirm / Cancel 階段被調用。
實現代碼如下:
/*** 回滾事務*/public void rollback() { // 獲取 事務 Transaction transaction = getCurrentTransaction(); // 設置 事務狀態 為 CANCELLING transaction.changeStatus(TransactionStatus.CANCELLING); // 更新 事務 transactionRepository.update(transaction); try { // 提交 事務 transaction.rollback(); // 刪除 事務 transactionRepository.delete(transaction); } catch (Throwable rollbackException) { logger.error("compensable transaction rollback failed.", rollbackException); throw new CancellingException(rollbackException); }}
調用
#getCurrentTransaction()
方法,獲取事務。調用
Transaction#changeStatus(...)
方法, 設置事務狀態為 CANCELLING。調用
TransactionRepository#update(...)
方法, 更新事務。調用
Transaction#rollback(...)
方法, 回滾事務。調用
TransactionRepository#delete(...)
方法,刪除事務。
5.6 添加參與者到事務
調用 #enlistParticipant(...)
方法,添加參與者到事務。該方法在事務處於 Try 階段被調用,在「6.3 資源協調者攔截器」有詳細解析。
實現代碼如下:
/*** 添加參與者到事務** @param participant 參與者*/public void enlistParticipant(Participant participant) { // 獲取 事務 Transaction transaction = this.getCurrentTransaction(); // 添加參與者 transaction.enlistParticipant(participant); // 更新 事務 transactionRepository.update(transaction);}
調用
#getCurrentTransaction()
方法,獲取事務。調用
Transaction#enlistParticipant(...)
方法, 添加參與者到事務。調用
TransactionRepository#update(...)
方法, 更新事務。
6. 事務攔截器
TCC-Transaction 基於 org.mengyun.tcctransaction.api.@Compensable
+ org.aspectj.lang.annotation.@Aspect
註解 AOP 切面實現業務方法的 TCC 事務聲明攔截,同 Spring 的 org.springframework.transaction.annotation.@Transactional
的實現。
TCC-Transaction 有兩個攔截器:
org.mengyun.tcctransaction.interceptor.CompensableTransactionInterceptor
,可補償事務攔截器。org.mengyun.tcctransaction.interceptor.ResourceCoordinatorInterceptor
,資源協調者攔截器。
在分享攔截器的實現之前,我們先一起看看 @Compensable 註解。
6.1 Compensable
@Compensable,標記可補償的方法註解。實現代碼如下:
public @interface Compensable { /** * 傳播級別 */ Propagation propagation() default Propagation.REQUIRED; /** * 確認執行業務方法 */ String confirmMethod() default ""; /** * 取消執行業務方法 */ String cancelMethod() default ""; /** * 事務上下文編輯 */ Class<? extends TransactionContextEditor> transactionContextEditor() default DefaultTransactionContextEditor.class;}
propagation,傳播級別( Propagation ),默認 Propagation.REQUIRED。和 Spring 的 Propagation 除了缺少幾個屬性,基本一致。實現代碼如下:
public enum Propagation { /**
*/ REQUIRED(0), /** * 支持當前事務,如果當前沒有事務,就以非事務方式執行。 */ SUPPORTS(1), /** * 支持當前事務,如果當前沒有事務,就拋出異常。 */ MANDATORY(2), /** * 新建事務,如果當前存在事務,把當前事務掛起。 */ REQUIRES_NEW(3); private final int value;}```
confirmMethod,確認執行業務方法名。
cancelMethod,取消執行業務方法名。
TransactionContextEditor,事務上下文編輯器( TransactionContextEditor ),用於設置和獲得事務上下文( TransactionContext ),在「6.3 資源協調者攔截器」可以看到被調用,此處只看它的代碼實現。
org.mengyun.tcctransaction.api.TransactionContextEditor
接口代碼如下:public interface TransactionContextEditor { /**
* * @param target 對象 * @param method 方法 * @param args 參數 * @return 事務上下文 */ TransactionContext get(Object target, Method method, Object[] args); /** * 設置事務上下文到參數中 * * @param transactionContext 事務上下文 * @param target 對象 * @param method 方法 * @param args 參數 */ void set(TransactionContext transactionContext, Object target, Method method, Object[] args);}```* DefaultTransactionContextEditor,**默認**事務上下文編輯器實現。實現代碼如下: ```Java class DefaultTransactionContextEditor implements TransactionContextEditor { @Override public TransactionContext get(Object target, Method method, Object[] args) { int position = getTransactionContextParamPosition(method.getParameterTypes()); if (position >= 0) { return (TransactionContext) args[position]; } return null; } @Override public void set(TransactionContext transactionContext, Object target, Method method, Object[] args) { int position = getTransactionContextParamPosition(method.getParameterTypes()); if (position >= 0) { args[position] = transactionContext; // 設置方法參數 } } /** * 獲得事務上下文在方法參數裡的位置 * * @param parameterTypes 參數類型集合 * @return 位置 */ public static int getTransactionContextParamPosition(Class<?>[] parameterTypes) { int position = -1; for (int i = 0; i < parameterTypes.length; i++) { if (parameterTypes[i].equals(org.mengyun.tcctransaction.api.TransactionContext.class)) { position = i; break; } } return position; } } ``` * x
NullableTransactionContextEditor,無事務上下文編輯器實現。實現代碼如下:
class NullableTransactionContextEditor implements TransactionContextEditor { @Override public TransactionContext get(Object target, Method method, Object[] args) { return null; } @Override public void set(TransactionContext transactionContext, Object target, Method method, Object[] args) { } }
DubboTransactionContextEditor,Dubbo 事務上下文編輯器實現,通過 Dubbo 隱式傳參方式獲得事務上下文,在《TCC-Transaction 源碼解析 —— Dubbo 支持》詳細解析。
6.2 可補償事務攔截器
先一起來看下可補償事務攔截器對應的切面 org.mengyun.tcctransaction.interceptor.CompensableTransactionAspect
,實現代碼如下:
@Aspectpublic abstract class CompensableTransactionAspect { private CompensableTransactionInterceptor compensableTransactionInterceptor; public void setCompensableTransactionInterceptor(CompensableTransactionInterceptor compensableTransactionInterceptor) { this.compensableTransactionInterceptor = compensableTransactionInterceptor; } @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)") public void compensableService() { } @Around("compensableService()") public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable { return compensableTransactionInterceptor.interceptCompensableMethod(pjp); } public abstract int getOrder();}
通過
org.aspectj.lang.annotation.@Pointcut
+org.aspectj.lang.annotation.@Around
註解,配置對 @Compensable 註解的方法進行攔截,調用CompensableTransactionInterceptor#interceptCompensableMethod(...)
方法進行處理。
CompensableTransactionInterceptor 實現代碼如下:
public class CompensableTransactionInterceptor { private TransactionManager transactionManager; private Set<Class<? extends Exception>> delayCancelExceptions; public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable { // 獲得帶 @Compensable 註解的方法 Method method = CompensableMethodUtils.getCompensableMethod(pjp); // Compensable compensable = method.getAnnotation(Compensable.class); Propagation propagation = compensable.propagation(); // 獲得 事務上下文 TransactionContext transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()); // 當前線程是否在事務中 boolean isTransactionActive = transactionManager.isTransactionActive(); // 判斷事務上下文是否合法 if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, propagation, transactionContext)) { throw new SystemException("no active compensable transaction while propagation is mandatory for method " + method.getName()); } // 計算方法類型 MethodType methodType = CompensableMethodUtils.calculateMethodType(propagation, isTransactionActive, transactionContext); // 處理 switch (methodType) { case ROOT: return rootMethodProceed(pjp); case PROVIDER: return providerMethodProceed(pjp, transactionContext); default: return pjp.proceed(); } }}
調用
CompensableMethodUtils#getCompensableMethod(...)
方法,獲得帶 @Compensable 註解的方法。實現代碼如下:// CompensableMethodUtils.java/**
** @param pjp 切面點* @return 方法*/public static Method getCompensableMethod(ProceedingJoinPoint pjp) { Method method = ((MethodSignature) (pjp.getSignature())).getMethod(); // 代理方法對象 if (method.getAnnotation(Compensable.class) == null) { try { method = pjp.getTarget().getClass().getMethod(method.getName(), method.getParameterTypes()); // 實際方法對象 } catch (NoSuchMethodException e) { return null; } } return method;}```
調用
TransactionContextEditor#get(...)
方法,從參數中獲得事務上下文。為什麼從參數中可以獲得事務上下文呢?在「6.3 資源協調者攔截器」揭曉答案。調用
TransactionManager#isTransactionActive()
方法,當前線程是否在事務中。實現代碼如下:// TransactionManager.javaprivate static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>();public boolean isTransactionActive() { Deque<Transaction> transactions = CURRENT.get(); return transactions != null && !transactions.isEmpty();}
調用
TransactionUtils#isLegalTransactionContext(...)
方法,判斷事務上下文是否合法。實現代碼如下:// TransactionUtils.java/*** 判斷事務上下文是否合法
** @param isTransactionActive 是否* @param propagation 傳播級別* @param transactionContext 事務上下文* @return 是否合法*/public static boolean isLegalTransactionContext(boolean isTransactionActive, Propagation propagation, TransactionContext transactionContext) { if (propagation.equals(Propagation.MANDATORY) && !isTransactionActive && transactionContext == null) { return false; } return true;}```* 當傳播級別為 Propagation.MANDATORY 時,要求必須在事務中。
調用
CompensableMethodUtils#calculateMethodType(...)
方法,計算方法類型。實現代碼如下:/**
** @param propagation 傳播級別* @param isTransactionActive 是否事務開啟* @param transactionContext 事務上下文* @return 方法類型*/public static MethodType calculateMethodType(Propagation propagation, boolean isTransactionActive, TransactionContext transactionContext) { if ((propagation.equals(Propagation.REQUIRED) && !isTransactionActive && transactionContext == null) // Propagation.REQUIRED:支持當前事務,當前沒有事務,就新建一個事務。 || propagation.equals(Propagation.REQUIRES_NEW)) { // Propagation.REQUIRES_NEW:新建事務,如果當前存在事務,把當前事務掛起。 return MethodType.ROOT; } else if ((propagation.equals(Propagation.REQUIRED) // Propagation.REQUIRED:支持當前事務 || propagation.equals(Propagation.MANDATORY)) // Propagation.MANDATORY:支持當前事務 && !isTransactionActive && transactionContext != null) { return MethodType.PROVIDER; } else { return MethodType.NORMAL; }}```* 計算方法類型( MethodType )的目的,可以根據不同方法類型,做不同的事務處理。* 方法類型為 MethodType.ROOT 時,發起**根事務**,判斷條件如下二選一: * 事務傳播級別為 Propagation.REQUIRED,並且當前沒有事務。 * 事務傳播級別為 Propagation.REQUIRES_NEW,新建事務,如果當前存在事務,把當前事務掛起。**此時,事務管理器的當前線程事務隊列可能會存在多個事務**。
事務傳播級別為 Propagation.REQUIRED,並且當前不存在事務,並且方法參數傳遞了事務上下文。
事務傳播級別為 Propagation.PROVIDER,並且當前不存在事務,並且方法參數傳遞了事務上下文。
當前不存在事務,方法參數傳遞了事務上下文是什麼意思?當跨服務遠程調用時,被調用服務本身( 服務提供者 )不在事務中,通過傳遞事務上下文參數,融入當前事務。
方法類型為 MethodType.ROOT 時,發起分支事務,判斷條件如下二選一:
方法類型為 MethodType.Normal 時,不進行事務處理。
MethodType.CONSUMER 項目已經不再使用,猜測已廢棄。
當方法類型為 MethodType.ROOT 時,調用
#rootMethodProceed(...)
方法,發起 TCC 整體流程。實現代碼如下:private Object rootMethodProceed(ProceedingJoinPoint pjp) throws Throwable { Object returnValue; Transaction transaction = null; try { // 發起根事務 transaction = transactionManager.begin(); // 執行方法原邏輯 try { returnValue = pjp.proceed(); } catch (Throwable tryingException) { if (isDelayCancelException(tryingException)) { // 是否延遲迴滾 } else { logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException); // 回滾事務 transactionManager.rollback(); } throw tryingException; } // 提交事務 transactionManager.commit(); } finally { // 將事務從當前線程事務隊列移除 transactionManager.cleanAfterCompletion(transaction); } return returnValue;}
x
調用
#transactionManager()
方法,發起根事務,TCC Try 階段開始。調用
ProceedingJoinPoint#proceed()
方法,執行方法原邏輯( 即 Try 邏輯 )。當原邏輯執行異常時,TCC Try 階段失敗,調用
TransactionManager#rollback(...)
方法,TCC Cancel 階段,回滾事務。此處#isDelayCancelException(...)
方法,判斷異常是否為延遲取消回滾異常,部分異常不適合立即回滾事務,在《TCC-Transaction 源碼分析 —— 事務恢復》詳細解析。當原邏輯執行成功時,TCC Try 階段成功,調用
TransactionManager#commit(...)
方法,TCC Confirm 階段,提交事務。調用
TransactionManager#cleanAfterCompletion(...)
方法,將事務從當前線程事務隊列移除,避免線程衝突。實現代碼如下:// TransactionManager.java public void cleanAfterCompletion(Transaction transaction) { if (isTransactionActive() && transaction != null) { Transaction currentTransaction = getCurrentTransaction(); if (currentTransaction == transaction) { CURRENT.get().pop(); } else { throw new SystemException("Illegal transaction when clean after completion"); } } }
當方法類型為 Propagation.PROVIDER 時,服務提供者參與 TCC 整體流程。實現代碼如下:
private Object providerMethodProceed(ProceedingJoinPoint pjp, TransactionContext transactionContext) throws Throwable { Transaction transaction = null; try { switch (TransactionStatus.valueOf(transactionContext.getStatus())) { case TRYING: // 傳播發起分支事務 transaction = transactionManager.propagationNewBegin(transactionContext); return pjp.proceed(); case CONFIRMING: try { // 傳播獲取分支事務 transaction = transactionManager.propagationExistBegin(transactionContext); // 提交事務 transactionManager.commit(); } catch (NoExistedTransactionException excepton) { //the transaction has been commit,ignore it. } break; case CANCELLING: try { // 傳播獲取分支事務 transaction = transactionManager.propagationExistBegin(transactionContext); // 回滾事務 transactionManager.rollback(); } catch (NoExistedTransactionException exception) { //the transaction has been rollback,ignore it. } break; } } finally { // 將事務從當前線程事務隊列移除 transactionManager.cleanAfterCompletion(transaction); } // 返回空值 Method method = ((MethodSignature) (pjp.getSignature())).getMethod(); return ReflectionUtils.getNullValue(method.getReturnType());}
為什麼要傳播發起分支事務?在根事務進行 Confirm / Cancel 時,調用根事務上的參與者們提交或回滾事務時,進行遠程服務方法調用的參與者,可以通過自己的事務編號關聯上傳播的分支事務( 兩者的事務編號相等 ),進行事務的提交或回滾。
當事務處於 TransactionStatus.TRYING 時,調用
TransactionManager#propagationExistBegin(...)
方法,傳播發起分支事務。發起分支事務完成後,調用ProceedingJoinPoint#proceed()
方法,執行方法原邏輯( 即 Try 邏輯 )。當事務處於 TransactionStatus.CONFIRMING 時,調用
TransactionManager#commit()
方法,提交事務。當事務處於 TransactionStatus.CANCELLING 時,調用
TransactionManager#rollback()
方法,提交事務。調用
TransactionManager#cleanAfterCompletion(...)
方法,將事務從當前線程事務隊列移除,避免線程衝突。當事務處於 TransactionStatus.CONFIRMING / TransactionStatus.CANCELLING 時,調用
ReflectionUtils#getNullValue(...)
方法,返回空值。為什麼返回空值?Confirm / Cancel 相關方法,是通過 AOP 切面調用,只調用,不處理返回值,但是又不能沒有返回值,因此直接返回空。實現代碼如下:public static Object getNullValue(Class type) { // 處理基本類型 if (boolean.class.equals(type)) { return false; } else if (byte.class.equals(type)) { return 0; } else if (short.class.equals(type)) { return 0; } else if (int.class.equals(type)) { return 0; } else if (long.class.equals(type)) { return 0; } else if (float.class.equals(type)) { return 0; } else if (double.class.equals(type)) { return 0; } // 處理對象 return null; }
當方法類型為 Propagation.NORMAL 時,執行方法原邏輯,不進行事務處理。
6.3 資源協調者攔截器
先一起來看下資源協調者攔截器 對應的切面 org.mengyun.tcctransaction.interceptor.CompensableTransactionAspect
,實現代碼如下:
@Aspectpublic abstract class ResourceCoordinatorAspect { private ResourceCoordinatorInterceptor resourceCoordinatorInterceptor; @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)") public void transactionContextCall() { } @Around("transactionContextCall()") public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable { return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp); } public void setResourceCoordinatorInterceptor(ResourceCoordinatorInterceptor resourceCoordinatorInterceptor) { this.resourceCoordinatorInterceptor = resourceCoordinatorInterceptor; } public abstract int getOrder();}
通過
org.aspectj.lang.annotation.@Pointcut
+org.aspectj.lang.annotation.@Around
註解,配置對 @Compensable 註解的方法進行攔截,調用ResourceCoordinatorInterceptor#interceptTransactionContextMethod(...)
方法進行處理。
ResourceCoordinatorInterceptor 實現代碼如下:
public class ResourceCoordinatorInterceptor { private TransactionManager transactionManager; public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable { Transaction transaction = transactionManager.getCurrentTransaction(); if (transaction != null) { switch (transaction.getStatus()) { case TRYING: // 添加事務參與者 enlistParticipant(pjp); break; case CONFIRMING: break; case CANCELLING: break; } } // 執行方法原邏輯 return pjp.proceed(pjp.getArgs()); }}
當事務處於 TransactionStatus.TRYING 時,調用
#enlistParticipant(...)
方法,添加事務參與者。調用
ProceedingJoinPoint#proceed(...)
方法,執行方法原邏輯。
ResourceCoordinatorInterceptor#enlistParticipant() 實現代碼如下:
private void enlistParticipant(ProceedingJoinPoint pjp) throws IllegalAccessException, InstantiationException { // 獲得 @Compensable 註解 Method method = CompensableMethodUtils.getCompensableMethod(pjp); if (method == null) { throw new RuntimeException(String.format("join point not found method, point is : %s", pjp.getSignature().getName())); } Compensable compensable = method.getAnnotation(Compensable.class); // 獲得 確認執行業務方法 和 取消執行業務方法 String confirmMethodName = compensable.confirmMethod(); String cancelMethodName = compensable.cancelMethod(); // 獲取 當前線程事務第一個(頭部)元素 Transaction transaction = transactionManager.getCurrentTransaction(); // 創建 事務編號 TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId()); // TODO if (FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()) == null) { FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().set(new TransactionContext(xid, TransactionStatus.TRYING.getId()), pjp.getTarget(), ((MethodSignature) pjp.getSignature()).getMethod(), pjp.getArgs()); } // 獲得類 Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes()); // 創建 確認執行方法調用上下文 和 取消執行方法調用上下文 InvocationContext confirmInvocation = new InvocationContext(targetClass, confirmMethodName, method.getParameterTypes(), pjp.getArgs()); InvocationContext cancelInvocation = new InvocationContext(targetClass, cancelMethodName, method.getParameterTypes(), pjp.getArgs()); // 創建 事務參與者 Participant participant = new Participant( xid, confirmInvocation, cancelInvocation, compensable.transactionContextEditor()); // 添加 事務參與者 到 事務 transactionManager.enlistParticipant(participant);}
調用
CompensableMethodUtils#getCompensableMethod(...)
方法,獲得帶 @Compensable 註解的方法。調用
#getCurrentTransaction()
方法, 獲取事務。調用 TransactionXid 構造方法,創建分支事務編號。實現代碼如下:
/**
*/private byte[] globalTransactionId;/*** 分支事務編號*/private byte[] branchQualifier;public TransactionXid(byte[] globalTransactionId) { this.globalTransactionId = globalTransactionId; branchQualifier = uuidToByteArray(UUID.randomUUID()); // 生成 分支事務編號}```* 分支事務編號( `branchQualifier` ) 需要生成。
TODO TransactionContext 和 Participant 的關係。
調用
ReflectionUtils#getDeclaringType(...)
方法,獲得聲明 @Compensable 方法的實際類。實現代碼如下:public static Class getDeclaringType(Class aClass, String methodName, Class<?>[] parameterTypes) { Method method; Class findClass = aClass; do { Class[] clazzes = findClass.getInterfaces(); for (Class clazz : clazzes) { try { method = clazz.getDeclaredMethod(methodName, parameterTypes); } catch (NoSuchMethodException e) { method = null; } if (method != null) { return clazz; } } findClass = findClass.getSuperclass(); } while (!findClass.equals(Object.class)); return aClass;}
調用 InvocationContext 構造方法,分別創建確認執行方法調用上下文和取消執行方法調用上下文。實現代碼如下:
/**
*/private Class targetClass;/*** 方法名*/private String methodName;/*** 參數類型數組*/private Class[] parameterTypes;/*** 參數數組*/private Object[] args; public InvocationContext(Class targetClass, String methodName, Class[] parameterTypes, Object... args) { this.methodName = methodName; this.parameterTypes = parameterTypes; this.targetClass = targetClass; this.args = args;}```
調用 Participant 構造方法,創建事務參與者。實現代碼如下:
public class Participant implements Serializable { private static final long serialVersionUID = 4127729421281425247L; /**
*/ private TransactionXid xid; /** * 確認執行業務方法調用上下文 */ private InvocationContext confirmInvocationContext; /** * 取消執行業務方法 */ private InvocationContext cancelInvocationContext; /** * 執行器 */ private Terminator terminator = new Terminator(); /** * 事務上下文編輯 */ Class<? extends TransactionContextEditor> transactionContextEditorClass; public Participant() { } public Participant(TransactionXid xid, InvocationContext confirmInvocationContext, InvocationContext cancelInvocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) { this.xid = xid; this.confirmInvocationContext = confirmInvocationContext; this.cancelInvocationContext = cancelInvocationContext; this.transactionContextEditorClass = transactionContextEditorClass; }}```
調用
TransactionManager#enlistParticipant(...)
方法,添加事務參與者到事務。
666. 彩蛋
受限於本人的能力,蠻多處表達不夠清晰或者易懂,非常抱歉。如果你對任何地方有任何疑問,歡迎關注本人,期待與你的交流。不限於 TCC,也可以是分佈式事務,也可以是微服務,以及等等。
外送一本武林祕籍:帶中文註釋的 TCC-Transaction 倉庫地址,目前正在慢慢完善。傳送門:https://github.com/YunaiV/tcc-transaction。
再送一本葵花寶典:《TCC型分佈式事務原理和實現》系列。
胖友,分享一個朋友圈可好?