Java @GlobalLock注解详细分析讲解

目录
  • GlobalLock的作用
  • 全局锁
  • 为什么要使用GlobalLock
  • 工作原理

GlobalLock的作用

对于某条数据进行更新操作,如果全局事务正在进行,当某个本地事务需要更新该数据时,需要使用@GlobalLock确保其不会对全局事务正在操作的数据进行修改。防止的本地事务对全局事务的数据脏写。如果和select for update组合使用,还可以起到防止脏读的效果。

全局锁

首先我们知道,seata的AT模式是二段提交的,而且AT模式能够做到事务ACID四种特性中的A原子性和D持久性,默认情况下隔离级别也只能保证在读未提交

那么为了保证原子性,在全局事务未提交之前,其中被修改的数据会被加上全局锁,保证不再会被其他全局事务修改。

为什么要使用GlobalLock

但是全局锁仅仅能防止全局事务对一个上锁的数据再次进行修改,在很多业务场景中我们是没有跨系统的rpc调用的,通常是不会加分布式事务的。

例如有分布式事务执行完毕A系统的业务逻辑,正在继续执行B系统逻辑,并且A系统事务已经提交。此时A系统一个本地的spring事务去与分布式事务修改同一行数据,是可以正常修改的

由于本地的spring事务并不受seata的全局锁控制容易导致脏写,即全局事务修改数据后,还未提交,数据又被本地事务改掉了。这很容易发生数据出错的问题,而且十分有可能导致全局事务回滚时发现 数据已经dirty(与uodoLog中的beforeImage不同)。那么就会回滚失败,进而导致全局锁无法释放,后续的操作无法进行下去。也是比较严重的问题。

一种解决办法就是,针对所有相关操作都加上AT全局事务,但这显然是没必要的,因为全局事务意味者需要与seata-server进行通信,创建全局事务,注册分支事务,记录undoLog,判断锁冲突,注册锁。

那么对于不需要跨系统,跨库的的业务来说,使用GlobalTransactional实在是有点浪费了

那么更加轻量的GlobalLock就能够发挥作用了,其只需要判断本地的修改是否与全局锁冲突就够了

工作原理

加上@GlobalLock之后,会进入切面

io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke

进而进入这个方法,处理全局锁

    Object handleGlobalLock(final MethodInvocation methodInvocation,
        final GlobalLock globalLockAnno) throws Throwable {
        return globalLockTemplate.execute(new GlobalLockExecutor() {
            @Override
            public Object execute() throws Throwable {
                return methodInvocation.proceed();
            }
            @Override
            public GlobalLockConfig getGlobalLockConfig() {
                GlobalLockConfig config = new GlobalLockConfig();
                config.setLockRetryInternal(globalLockAnno.lockRetryInternal());
                config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
                return config;
            }
        });
    }

进入execute方法

public Object execute(GlobalLockExecutor executor) throws Throwable {
        boolean alreadyInGlobalLock = RootContext.requireGlobalLock();
        if (!alreadyInGlobalLock) {
            RootContext.bindGlobalLockFlag();
        }
        // set my config to config holder so that it can be access in further execution
        // for example, LockRetryController can access it with config holder
        GlobalLockConfig myConfig = executor.getGlobalLockConfig();
        GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig);
        try {
            return executor.execute();
        } finally {
            // only unbind when this is the root caller.
            // otherwise, the outer caller would lose global lock flag
            if (!alreadyInGlobalLock) {
                RootContext.unbindGlobalLockFlag();
            }
            // if previous config is not null, we need to set it back
            // so that the outer logic can still use their config
            if (previousConfig != null) {
                GlobalLockConfigHolder.setAndReturnPrevious(previousConfig);
            } else {
                GlobalLockConfigHolder.remove();
            }
        }
    }
}

先判断当前是否已经在globalLock范围之内,如果已经在范围之内,那么把上层的配置取出来,用新的配置替换,并在方法执行完毕时候,释放锁,或者将配置替换成之前的上层配置

如果开启全局锁,会在threadLocal put一个标记

    //just put something not null
CONTEXT_HOLDER.put(KEY_GLOBAL_LOCK_FLAG, VALUE_GLOBAL_LOCK_FLAG);

开始执行业务方法

那么加上相关GlobalLock标记的和普通方法的区别在哪里?

我们都知道,seata会对数据库连接做代理,在生成PreparedStatement时会进入

io.seata.rm.datasource.AbstractConnectionProxy#prepareStatement(java.lang.String)

  @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        String dbType = getDbType();
        // support oracle 10.2+
        PreparedStatement targetPreparedStatement = null;
        if (BranchType.AT == RootContext.getBranchType()) {
            List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
            if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
                    TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
                            sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
                    String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
                    tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
                    targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
                }
            }
        }
        if (targetPreparedStatement == null) {
            targetPreparedStatement = getTargetConnection().prepareStatement(sql);
        }
        return new PreparedStatementProxy(this, targetPreparedStatement, sql);
    }

这里显然不会进入AT模式的逻辑,那么直接通过真正的数据库连接,生成PreparedStatement,再使用PreparedStatementProxy进行包装,代理增强

在使用PreparedStatementProxy执行sql时,会进入seata定义的一些逻辑

 public boolean execute() throws SQLException {
        return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
    }

最终来到

io.seata.rm.datasource.exec.ExecuteTemplate#execute(java.util.List<io.seata.sqlparser.SQLRecognizer>, io.seata.rm.datasource.StatementProxy, io.seata.rm.datasource.exec.StatementCallback<T,S>, java.lang.Object…)

   public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {
        if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
            // Just work as original statement
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }
        String dbType = statementProxy.getConnectionProxy().getDbType();
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            sqlRecognizers = SQLVisitorFactory.get(
                    statementProxy.getTargetSQL(),
                    dbType);
        }
        Executor<T> executor;
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            executor = new PlainExecutor<>(statementProxy, statementCallback);
        } else {
            if (sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                switch (sqlRecognizer.getSQLType()) {
                    case INSERT:
                        executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                new Object[]{statementProxy, statementCallback, sqlRecognizer});
                        break;
                    case UPDATE:
                        executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case DELETE:
                        executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case SELECT_FOR_UPDATE:
                        executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    default:
                        executor = new PlainExecutor<>(statementProxy, statementCallback);
                        break;
                }
            } else {
                executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
            }
        }
        T rs;
        try {
            rs = executor.execute(args);
        } catch (Throwable ex) {
            if (!(ex instanceof SQLException)) {
                // Turn other exception into SQLException
                ex = new SQLException(ex);
            }
            throw (SQLException) ex;
        }
        return rs;
    }

如果当前线程不需要锁并且不不在AT模式的分支事务下,直接使用原生的preparedStatement执行就好了

这里四种操作,通过不同的接口去执行,接口又有多种不同的数据库类型实现

插入分为不同的数据库类型,通过spi获取

seata提供了三种数据库的实现,

update,delete,select三种没有多个实现类

他们在执行时都会执行父类的方法

io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitTrue

   protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        try {
            connectionProxy.changeAutoCommit();
            return new LockRetryPolicy(connectionProxy).execute(() -> {
                T result = executeAutoCommitFalse(args);
                connectionProxy.commit();
                return result;
            });
        } catch (Exception e) {
            // when exception occur in finally,this exception will lost, so just print it here
            LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
            if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
                connectionProxy.getTargetConnection().rollback();
            }
            throw e;
        } finally {
            connectionProxy.getContext().reset();
            connectionProxy.setAutoCommit(true);
        }
    }

全局锁的策略, 是在一个while(true)循环里不断执行

 protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {
            LockRetryController lockRetryController = new LockRetryController();
            while (true) {
                try {
                    return callable.call();
                } catch (LockConflictException lockConflict) {
                    onException(lockConflict);
                    lockRetryController.sleep(lockConflict);
                } catch (Exception e) {
                    onException(e);
                    throw e;
                }
            }
        }

如果出现异常是LockConflictException,进入sleep

public void sleep(Exception e) throws LockWaitTimeoutException {
        if (--lockRetryTimes < 0) {
            throw new LockWaitTimeoutException("Global lock wait timeout", e);
        }
        try {
            Thread.sleep(lockRetryInternal);
        } catch (InterruptedException ignore) {
        }
    }

这两个变量就是@GlobalLock注解的两个配置,一个是重试次数,一个重试之间的间隔时间。

继续就是执行数据库更新操作

io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitFalse

发现这里也会生成,undoLog,beforeImage和afterImage,其实想想,在GlobalLock下,是没必要生成undoLog的。但是现有逻辑确实要生成,这个seata后续应该会优化。

protected T executeAutoCommitFalse(Object[] args) throws Exception {
        if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
            throw new NotSupportYetException("multi pk only support mysql!");
        }
        TableRecords beforeImage = beforeImage();
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        TableRecords afterImage = afterImage(beforeImage);
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }

生成beforeImage和aferImage的逻辑也比较简单。分别在执行更新前,查询数据库,和更新后查询数据库

可见记录undoLog是十分影响性能的,查询就多了两次,如果undoLog入库还要再多一次入库操作。

再看prepareUndoLog

 protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
        if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
            return;
        }
        if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {
            if (beforeImage.getRows().size() != afterImage.getRows().size()) {
                throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");
            }
        }
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
        String lockKeys = buildLockKey(lockKeyRecords);
        if (null != lockKeys) {
            connectionProxy.appendLockKey(lockKeys);

            SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
            connectionProxy.appendUndoLog(sqlUndoLog);
        }
    }

将lockKeys,和undoLog,暂时记录在connectionProxy中,也就是说至此还没有将uodoLog记录到数据库,也没有判断全局锁,这些事情都留到了事务提交

io.seata.rm.datasource.ConnectionProxy#doCommit

 private void doCommit() throws SQLException {
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks();
        } else {
            targetConnection.commit();
        }
    }

进入io.seata.rm.datasource.ConnectionProxy#processLocalCommitWithGlobalLocks

这个 方法很简单就是首先进行锁的检查,并没有我想象中的加索全局事务。

 private void processLocalCommitWithGlobalLocks() throws SQLException {
        checkLock(context.buildLockKeys());
        try {
            targetConnection.commit();
        } catch (Throwable ex) {
            throw new SQLException(ex);
        }
        context.reset();
    }

也就是说,使用GlobalLock会对全局锁检测,但是并不会对记录加全局锁。但是配合全局事务这样已经能够保证全局事务的原子性了。可见GlobalLock还是要和本地事务组合一起使用的,这样才能保证,GlobalLock执行完毕本地事务未提交的数据不会被别的本地事务/分布式事务修改掉。

到此这篇关于Java @GlobalLock注解详细分析讲解的文章就介绍到这了,更多相关Java @GlobalLock内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java中注解与原理分析详解

    目录 一.注解基础 二.注解原理 三.常用注解 1.JDK注解 2.Lombok注解 四.自定义注解 1.同步控制 2.类型引擎 一.注解基础 注解即标注与解析,在Java的代码工程中,注解的使用几乎是无处不在,甚至多到被忽视: 无论是在JDK源码或者框架组件,都在使用注解能力完成各种识别和解析动作:在对系统功能封装时,也会依赖注解能力简化各种逻辑的重复实现: 基础接口 在Annotation的源码注释中有说明:所有的注解类型都需要继承该公共接口,本质上看注解是接口,但是代码并没有显式声明继承关

  • Java 栈与队列超详细分析讲解

    目录 一.栈(Stack) 1.什么是栈? 2.栈的常见方法 3.自己实现一个栈(底层用一个数组实现) 二.队列(Queue) 1.什么是队列? 2.队列的常见方法 3.队列的实现(单链表实现) 4.循环队列 一.栈(Stack) 1.什么是栈? 栈其实就是一种数据结构 - 先进后出(先入栈的数据后出来,最先入栈的数据会被压入栈底) 什么是java虚拟机栈? java虚拟机栈只是JVM当中的一块内存,该内存一般用来存放 例如:局部变量当调用函数时,我们会为函数开辟一块内存,叫做 栈帧,在 jav

  • Java详细分析讲解自动装箱自动拆箱与Integer缓存的使用

    目录 1. 前言 2. 包装类 3. 自动装箱与自动拆箱 4. Interger缓存 5. 回答题目 1. 前言 自动装箱和自动拆箱是什么?Integer缓存是什么?它们之间有什么关系? 先来看一道题目. Integer a = new Integer(1); Integer b = new Integer(1); System.out.println(a==b); Integer c = 1; Integer d = 1; System.out.println(c==d); Integer e

  • Java详细分析讲解泛型

    目录 1.泛型概念 2.泛型的使用 2.1泛型类语法 2.2泛型方法语法 2.3泛型接口语法 2.4泛型在main方法中的使用 3.擦除机制 4.泛型的上界 5.通配符 5.1通配符的上界 5.2通配符的下界 6.包装类 6.1装箱和拆箱 1.泛型概念 泛型就是将类型参数化 所谓类型参数化就是将类型定义成参数的形式,然后在使用此类型的时候的时候再传入具体的类型 到这我们可以看出来:泛型在定义的时候是不知道具体类型的,需要在使用的时候传入具体的类型,泛型可以用在类.接口和方法中,这样做的好处是一个

  • Java超详细分析讲解哈希表

    目录 哈希表概念 哈希函数的构造 平均数取中法 折叠法 保留余数法 哈希冲突问题以及解决方法 开放地址法 再哈希函数法 公共溢出区法 链式地址法 哈希表的填充因子 代码实现 哈希函数 添加数据 删除数据 判断哈希表是否为空 遍历哈希表 获得哈希表已存键值对个数 哈希表概念 散列表,又称为哈希表(Hash table),采用散列技术将记录存储在一块连续的存储空间中. 在散列表中,我们通过某个函数f,使得存储位置 = f(关键字),这样我们可以不需要比较关键字就可获得需要的记录的存储位置. 散列技术

  • Java详细分析讲解HashMap

    目录 1.HashMap数据结构 2.HashMap特点 3.HashMap中put方法流程 java集合容器类分为Collection和Map两大类,Collection类的子接口有Set.List.Queue,Map类子接口有SortedMap.如ArrayList.HashMap的继承实现关系分别如下 1.HashMap数据结构 在jdk1.8中,底层数据结构是“数组+链表+红黑树”.HashMap其实底层实现还是数组,只是数组的每一项都是一条链,如下 当链表过长时,会严重影响HashMa

  • Java超详细分析讲解final关键字的用法

    目录 基本介绍 final细节01 final细节02 基本介绍 final 可以修饰类.属性.方法和局部变量. 在某些情况下,程序员可能有以下需求,就会使用到final: Base Sub 类 1)当不希望类被继承时,可以用final修饰. 2)当不希望父类的某个方法被子类覆盖/重写(override)时,可以用final关键字 修饰.[案例演示:访问修饰符 final 返回类型方法名] 3)当不希望类的的某个属性的值被修改,可以用final修饰.[案例演示: public final dou

  • Java泛型与注解全面分析讲解

    目录 1.什么是泛型 2.为何使用泛型 2.1.如何定义泛型 2.2.通配符 2.3.受限泛型 2.4.泛型接口 2.5.泛型方法 3.java高级--注解 3.1.预定义注解 3.2.自定义注解(初级) 3.3.元注解 3.4.自定义注解(高级) 1.什么是泛型 其实我们在使用集合时就用过泛型List<T> 创建一个List对象List<Student> list=new ArrayList():<T>它就是泛型. 所谓的泛型就是在类定义时,不为类中属性和方法指定数据

  • Java数据结构超详细分析二叉搜索树

    目录 1.搜索树的概念 2.二叉搜索树的简单实现 2.1查找 2.2插入 2.3删除 2.4修改 3.二叉搜索树的性能 1.搜索树的概念 二叉搜索树是一种特殊的二叉树,又称二叉查找树,二叉排序树,它有几个特点: 如果左子树存在,则左子树每个结点的值均小于根结点的值. 如果右子树存在,则右子树每个结点的值均大于根结点的值. 中序遍历二叉搜索树,得到的序列是依次递增的. 二叉搜索树的左右子树均为二叉搜索树. 二叉搜索树的结点的值不能发生重复. 2.二叉搜索树的简单实现 我们来简单实现以下搜索树,就不

  • Java 数组高频考点分析讲解

    目录 1.数组理论基础 2.常见考点 1.二分查找 2.移除元素 1.数组理论基础 数组是存放在连续内存空间上的相同类型数据的集合,可以通过下标索引的方式获取到下标下对应的数据. 举个栗子(字符数组)~ 可以看到: 1.数组的下标从0开始 2.数组在内存中的地址是连续的 所以在删除元素时,只能用覆盖的方式进行. 例如,要删除下标为2的元素~ 就需要将从2之后的元素依次移到前一个,覆盖掉要删除的元素. 所以删除元素并不是将该元素的空间释放了,而是将后面的元素移到前面,覆盖掉要删除的元素,然后将数组

  • Java 泛型超详细入门讲解

    目录 1.什么是泛型? 2.泛型是怎么编译的 泛型的编译机制:擦除机制 1.什么是泛型? 泛型其实就是将类型作为参数传递,泛型允许程序员在编写代码时使用一些以后才指定的类型 ,在实例化该类时将想要的类型作为参数传递,来指明这些类型. 为什么要引入泛型? 例如:自己实现一个顺序表 public class MyArrayList { public int[] elem; public int usedSize; public MyArrayList() { this.elem = new int[

随机推荐

其他