您好!
欢迎来到京东云开发者社区
登录
首页
博文
课程
大赛
工具
用户中心
开源
首页
博文
课程
大赛
工具
开源
更多
用户中心
开发者社区
>
博文
>
Narayana-XA事务恢复(5)
分享
打开微信扫码分享
点击前往QQ分享
点击前往微博分享
点击复制链接
Narayana-XA事务恢复(5)
Apache ShardingSphere
2021-01-04
IP归属:未知
243480浏览
说事务恢复流程之前,我们来讨论下,会啥会出现事务恢复?XA二阶段提交协议不是强一致性的吗?要解答这个问题,我们就要来看看XA二阶段协议有什么问题? ## 问题一 :单点故障 由于协调者的重要性,一旦协调者TM发生故障。参与者RM会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。(如果是协调者挂掉,可以重新选举一个协调者,但是无法解决因为协调者宕机导致的参与者处于阻塞状态的问题) ## 问题二 :数据不一致 数据不一致。在二阶段提交的阶段二中,当协调者向参与者发送commit请求之后,发生了局部网络异常或者在发送commit请求过程中协调者发生了故障,这会导致只有一部分参与者接受到了commit请求。而在这部分参与者接到commit请求之后就会执行commit操作。但是其他部分未接到commit请求的机器则无法执行事务提交。于是整个分布式系统便出现了数据不一致性的现象。 ## 如何解决? 解决的方案简单,就是我们在事务的操作的每一步,我们都需要对事务状态的日志进行人为的记录,我们可以把日志记录存储在我们想存储的地方,可以是本地存储,也可以中心化的存储。Narayana的开源版本,提供了`file`,`db` 2种方式存储,file只能支持单机环境,而db是可以支持集群环境。 # Narayana 事务恢复流程。 ![image.gif](https://cdn.nlark.com/yuque/0/2021/webp/485026/1609739989787-612f9dee-b35e-4882-8d52-9e7d32f425a7.webp) Narayana使用了单线程轮询RM,执行XA recovery语句,来判断是否有需要恢复的语句。 具体的代码 `com.arjuna.ats.internal.arjuna.recovery.PeriodicRecovery.run()` 方法。以下是代码: ``` public void run () { doInitialWait(); boolean finished = false; do { boolean workToDo = false; // ok, get to the point where we are ready to start a scan synchronized(_stateLock) { if (getStatus() == Status.SCANNING) { // need to wait for some other scan to finish if (tsLogger.logger.isDebugEnabled()) { tsLogger.logger.debug("PeriodicRecovery: background thread waiting on other scan"); } doScanningWait(); // we don't wait around if a worker scan request has just come in if (getMode() == Mode.ENABLED && !_workerScanRequested) { // the last guy just finished scanning so we ought to wait a bit rather than just // pile straight in to do some work if (tsLogger.logger.isDebugEnabled()) { tsLogger.logger.debug("PeriodicRecovery: background thread backing off"); } doPeriodicWait(); // if we got told to stop then do so finished = (getMode() == Mode.TERMINATED); } } else { // status == INACTIVE so we can go ahead and scan if scanning is enabled switch (getMode()) { case ENABLED: // ok grab our chance to be the scanning thread if (tsLogger.logger.isDebugEnabled()) { tsLogger.logger.debug("PeriodicRecovery: background thread Status <== SCANNING"); } setStatus(Status.SCANNING); // must kick any other waiting threads _stateLock.notifyAll(); workToDo = true; break; case SUSPENDED: // we need to wait while we are suspended if (tsLogger.logger.isDebugEnabled()) { tsLogger.logger.debug("PeriodicRecovery: background thread wait while SUSPENDED"); } doSuspendedWait(); // we come out of here with the lock and either ENABLED or TERMINATED finished = (getMode() == Mode.TERMINATED); break; case TERMINATED: finished = true; break; } } } // its ok to start work if requested -- we cannot be stopped now by a mode change to SUSPEND // or TERMINATE until we get through phase 1 and maybe phase 2 if we are lucky if (workToDo) { // ok it is now this thread's turn to run a scan. before starting we check if there is a // worker waiting and reset the waiting flag. we will check again after the scan has // completed to see if a worker request has come in after starting this scan. // if so we avoid notifying the worker ensuring a requst is only confirmed when a // full scan has happened afetr the request was made boolean notifyRequired; synchronized(_stateLock) { notifyRequired = _workerScanRequested; _workerScanRequested = false; } // we are in state SCANNING so actually do the scan if (tsLogger.logger.isDebugEnabled()) { tsLogger.logger.debug("PeriodicRecovery: background thread scanning"); } doWorkInternal(); // clear the SCANNING state now we have done synchronized(_stateLock) { if (tsLogger.logger.isDebugEnabled()) { tsLogger.logger.debug("PeriodicRecovery: background thread Status <== INACTIVE"); } setStatus(Status.INACTIVE); // must kick any other waiting threads _stateLock.notifyAll(); // check if we need to notify a listener worker that we just finished a scan if (notifyRequired && !_workerScanRequested) { notifyWorker(); } if (getMode() == Mode.ENABLED && !_workerScanRequested) { // we managed a full scan and scanning is still enabled // so wait a bit before the next attempt if (tsLogger.logger.isDebugEnabled()) { tsLogger.logger.debug("PeriodicRecovery: background thread backing off"); } doPeriodicWait(); } finished = (getMode() == Mode.TERMINATED); } } } while (!finished); // make sure the worker thread is not wedged waiting for a scan to complete synchronized(_stateLock) { if (_workerScanRequested) { notifyWorker(); } } if (tsLogger.logger.isDebugEnabled()) { tsLogger.logger.debug("PeriodicRecovery: background thread exiting"); } } ``` - 别被吓到了,我们重点来关注 `doWorkInternal();` 我们来看看这个方法。 ``` //获取所有的RecoveryModule ,然后一个一个执行 Vector copyOfModules = getModules(); Enumeration modules = copyOfModules.elements(); while (modules.hasMoreElements()) { RecoveryModule m = (RecoveryModule) modules.nextElement(); // we need to ensure we use the class loader context of the recovery module while we are executing // its methods ClassLoader cl = switchClassLoader(m); try { m.periodicWorkFirstPass(); } finally { restoreClassLoader(cl); } if (tsLogger.logger.isDebugEnabled()) { tsLogger.logger.debug(" "); } } // take the lock again so we can do a backoff wait on it synchronized (_stateLock) { // we have to wait for a bit to avoid catching (too many) // transactions etc. that are really progressing quite happily doBackoffWait(); // we carry on scanning even if scanning is SUSPENDED because the suspending thread // might be waiting on us to complete and we don't want to risk deadlocking it by waiting // here for a resume. // if we have been TERMINATED we bail out now // n.b. if we give up here the caller is responsible for clearing the active scan if (getMode() == Mode.TERMINATED) { if (tsLogger.logger.isDebugEnabled()) { tsLogger.logger.debug("PeriodicRecovery: scan TERMINATED at phase 1"); } return; } } // move on to phase 2 if (tsLogger.logger.isDebugEnabled()) { tsLogger.logger.debug("Periodic recovery second pass at "+_theTimestamper.format(new Date())); } modules = copyOfModules.elements(); while (modules.hasMoreElements()) { RecoveryModule m = (RecoveryModule) modules.nextElement(); ClassLoader cl = switchClassLoader(m); try { m.periodicWorkSecondPass(); } finally { restoreClassLoader(cl); } if (tsLogger.logger.isDebugEnabled()) { tsLogger.logger.debugf("PeriodicRecovery: recovery module '%s' second pass processed", m); } } ``` - 首先会获取框架所有的`RecoveryModule`类,然后一个一个执行,我们先来看看这个类: ``` public interface RecoveryModule { /** * Called by the RecoveryManager at start up, and then * PERIODIC_RECOVERY_PERIOD seconds after the completion, for all RecoveryModules, * of the second pass */ public void periodicWorkFirstPass (); /** * Called by the RecoveryManager RECOVERY_BACKOFF_PERIOD seconds * after the completion of the first pass */ public void periodicWorkSecondPass (); } ``` RecoveryModule的实现类有 XARecoveryModule ,AtomicActionRecoveryModule,SubordinateAtomicActionRecoveryModule,CommitMarkableResourceRecordRecoveryModule。等4个实现类。 ## 恢复执行第一个阶段 - XARecoveryModule : 它的作用就是执行XA recovery 命令从RM,获取 Xid数组。然后缓存起来。核心代码为: ``` //从数据库获取 trans = xares.recover(XAResource.TMSTARTRSCAN); //缓存刷新 refreshXidScansForEquivalentXAResourceImpl(xares, trans); ``` - AtomicActionRecoveryModule: 从事务日志里面获取需要恢复的UID,具体代码为: ``` // Transaction type boolean AtomicActions = false ; // uids per transaction type InputObjectState aa_uids = new InputObjectState() ; try { if (tsLogger.logger.isDebugEnabled()) { tsLogger.logger.debug("AtomicActionRecoveryModule first pass"); } AtomicActions = _recoveryStore.allObjUids( _transactionType, aa_uids ); } catch ( ObjectStoreException ex ) { tsLogger.i18NLogger.warn_recovery_AtomicActionRecoveryModule_1(ex); } if ( AtomicActions ) { _transactionUidVector = processTransactions( aa_uids ) ; } ``` ## 恢复执行第二个阶段 首先执行的代码为 : ``` // move on to phase 2 if (tsLogger.logger.isDebugEnabled()) { tsLogger.logger.debug("Periodic recovery second pass at "+_theTimestamper.format(new Date())); } modules = copyOfModules.elements(); while (modules.hasMoreElements()) { RecoveryModule m = (RecoveryModule) modules.nextElement(); ClassLoader cl = switchClassLoader(m); try { m.periodicWorkSecondPass(); } finally { restoreClassLoader(cl); } if (tsLogger.logger.isDebugEnabled()) { tsLogger.logger.debugf("PeriodicRecovery: recovery module '%s' second pass processed", m); } } ``` - AtomicActionRecoveryModule: 进入 `processTransactionsStatus()`,最终会调用到 `com.arjuna.ats.arjuna.recovery.RecoverAtomicAction.replayPhase2()`。我们来看看这个方法。 ``` //省略无关代码 if ( (_theStatus == ActionStatus.PREPARED) || (_theStatus == ActionStatus.COMMITTING) || (_theStatus == ActionStatus.COMMITTED) || (_theStatus == ActionStatus.H_COMMIT) || (_theStatus == ActionStatus.H_MIXED) || (_theStatus == ActionStatus.H_HAZARD) ) { super.phase2Commit( _reportHeuristics ) ; } else if ( (_theStatus == ActionStatus.ABORTED) || (_theStatus == ActionStatus.H_ROLLBACK) || (_theStatus == ActionStatus.ABORTING) || (_theStatus == ActionStatus.ABORT_ONLY) ) { super.phase2Abort( _reportHeuristics ) ; } ``` - 判断事务状态,如果是需要commit阶段的状态,进行commit,否则进行rollback - XARecoveryModule : 尝试在进行恢复。核心代码为 ``` private void bottomUpRecovery() { for (XAResource xaResource : _resources) { try { xaRecoverySecondPass(xaResource); } catch (Exception ex) { jtaLogger.i18NLogger.warn_recovery_getxaresource(ex); } } // JBTM-895 garbage collection is now done when we return XAResources {@see XARecoveryModule#getNewXAResource(XAResourceRecord)} // JBTM-924 requires this here garbage collection, see JBTM-1155: if (_xidScans != null) { Set<XAResource> keys = new HashSet<XAResource>(_xidScans.keySet()); for(XAResource theKey : keys) { RecoveryXids recoveryXids = _xidScans.get(theKey); if(recoveryXids.isStale()) { _xidScans.remove(theKey); } } } } ``` 文章到此,已经写的很长很多了,我们分析了ShardingSphere对于XA方案,提供了一套SPI解决方案,对Atomikos进行了整合,也分析了Atomikos初始化流程,开始事务流程,获取连接流程,提交事务流程,回滚事务流程,事务恢复流程。 希望对大家理解XA的原理有所帮助。
原创文章,需联系作者,授权转载
上一篇:Shardingsphere整合Narayana对XA分布式事务的支持(4)
下一篇:Atomikos-XA 事务恢复(3)
Apache ShardingSphere
文章数
96
阅读量
231327
作者其他文章
01
突破关系型数据库桎梏:云原生数据库中间件核心剖析
数据库技术的发展与变革方兴未艾,NewSQL的出现,只是将各种所需技术组合在一起,而这些技术组合在一起所实现的核心功能,推动着云原生数据库的发展。 NewSQL的三种分类中,新架构和云数据库涉及了太多与数据库相关的底层实现,为了保证本文的范围不至太过发散,我们重点介绍透明化分片数据库中间件的核心功能与实现原理,另外两种类型的NewSQL在核心功能上类似,但实现原理会有所差别。
01
Apache ShardingSphere数据脱敏全解决方案详解(上)
Apache ShardingSphere针对新业务上线、旧业务改造分别提供了相应的全套脱敏解决方案。
01
Shardingsphere整合Narayana对XA分布式事务的支持(4)
ShardingSphere对于XA方案,提供了一套SPI解决方案,对Narayana进行了整合,Narayana初始化流程,开始事务流程,获取连接流程,提交事务流程,回滚事务流程。
01
从中间件到分布式数据库生态,ShardingSphere 5.x革新变旧
5.x 是 Apache ShardingSphere从分库分表中间件向分布式数据库生态转化的里程碑,从 4.x 版本后期开始打磨的可插拔架构在 5.x 版本已逐渐成型,项目的设计理念和 API 都进行了大幅提升。欢迎大家测试使用!
Apache ShardingSphere
文章数
96
阅读量
231327
作者其他文章
01
突破关系型数据库桎梏:云原生数据库中间件核心剖析
01
Apache ShardingSphere数据脱敏全解决方案详解(上)
01
Shardingsphere整合Narayana对XA分布式事务的支持(4)
01
从中间件到分布式数据库生态,ShardingSphere 5.x革新变旧
添加企业微信
获取1V1专业服务
扫码关注
京东云开发者公众号