当前位置: 代码网 > it编程>数据库>Redis > 1 分布式锁(分别通过数据库、Redis、Zookeeper三种方式实现)

1 分布式锁(分别通过数据库、Redis、Zookeeper三种方式实现)

2024年08月01日 Redis 我要评论
分布式锁(分别通过数据库、Redis、Zookeeper三种方式实现)

1、什么是锁

场景描述

        锁在java中是一个非常重要的概念,尤其是在当今的互联网时代,高并发的场景,更是离不开锁。那么锁到底是什么呢?在计算机科学中,锁(lock)或互斥(mutex)是一种同步机制,用于在有许多执行线程的环境中强制对资源的访问限制。锁旨在强制实施互斥排他、并发控制策略。咱们举一个生活中的例子:大家都去过超市买东西,如果你随身带了包呢,要放到储物柜里。咱们把这个例子再极端一下,假如柜子只有一个,现在同时来了3个人a,b,c都要往这个柜子里放东西。这个场景就构造了一个多线程,多线程自然离不开锁。如下图所示:

        a,b,c都要往柜子里放东西,可是柜子只能放一件东西,那怎么办呢?这个时候就引出了锁的概念,3个人谁抢到了柜子的锁,谁就可以使用这个柜子,其他人只能等待。比如:c抢到了锁,c可以使用这个柜子。a和b只能等待,等c使用完了,释放锁以后,a和b再争抢锁,谁抢到了,再继续使用柜子。

代码实例

        我们再将上面的场景反映到程序中,首先创建一个柜子的类:

public class cabinet {
    //柜子中存储的数字
    private int storenumber;
    
    public void setstorenumber(int storenumber){
        this.storenumber = storenumber;
    }
    
    public int getstorenumber(){
        return this.storenumber;
    }
    
}

        柜子中存储的是数字。

        然后我们将3个用户抽象成一个类:

public class user {
    //柜子
    private cabinet cabinet;
    //存储的数字
    private int storenumber;
    
    public user(cabinet cabinet,int storenumber){
        this.cabinet = cabinet;
        this.storenumber = storenumber;
    }
    
    //使用柜子
    public void usecabinet(){
        cabinet.setstorenumber(storenumber);
    }
    
}

        在用户的构造方法中,需要传入两个参数,一个是要使用的柜子,另一个是要存储的数字。到这里,柜子和用户都已经抽象成了类,接下来我们再写一个启动类,模拟一下3个用户使用柜子的场景:

public class starter {
    public static void main(string[] args){
        cabinet cabinet = new cabinet();
        executorservice es = executors.newfixedthreadpool(3);
        for (int i = 0; i < 3; i++){
            final int  storenumber = i;
            es.execute(()->{
                user user = new user(cabinet,storenumber);
                user.usecabinet();
                system.out.println("我是用户"+storenumber+",我存储的数字是:"+cabinet.getstorenumber());
            });
        }
        es.shutdown();
    }
}

        我们仔细看一下这个main函数的过程:

  • 首先创建一个柜子的实例,由于场景中只有一个柜子,所以我们只创建了一个柜子实例。
  • 然后我们新建了一个线程池,线程池中有3个线程,每个线程执行一个用户的操作。
  • 再来看看每个线程具体的执行过程,新建用户实例,传入的是用户使用的柜子,我们这里只有一个柜子,所以传入这个柜子的实例,然后传入这个用户要存储的数字,分别是1,2,3,也分别对应着用户a,用户b和用户c。
  • 再调用使用柜子的操作,也就是向柜子中放入要存储的数字,然后立即从柜子中取出数字并打印出来。

        我们运行一下main函数,看看打印的结果是什么?

我是用户0,我存储的数字是:2
我是用户2,我存储的数字是:2
我是用户1,我存储的数字是:2

        从结果中我们可以看出,3个用户在柜子中存储的数字都变成了2。我们再次运行程序,结果如下:

我是用户1,我存储的数字是:1
我是用户2,我存储的数字是:1
我是用户0,我存储的数字是:1

        这次又变成了1。这是为什么呢?问题就出在user.usecabinet()这个方法上,这是因为柜子这个实例没有加锁的原因,3个用户并行的执行,向柜子中存储他们的数字,虽然是3个用户并行的同时操作,但是在具体赋值时,也是有顺序的,因为变量storenumber只占有一块内存,storenumber只存储一个值,存储最后的线程所设置的值。至于哪个线程排在最后,则完全不确定。赋值语句完成后,进入到打印语句,打印语句取storenumber的值并打印,这时storenumber存储的是最后一个线程所设置的值,3个线程取到的值是相同的,就像上面打印的结果一样。

        那么如何解决这个文体呢?这就引出了我们要讲解的重点内容——锁。我们在赋值语句上加锁,这样当多个线程(本文当中的多个用户)同时赋值时,谁抢到了这把锁,谁才能赋值。这样保证同一时刻只能有一个线程进行赋值操作,避免了之前的混乱的情况。

        那个在程序中如何加锁呢?这就要使用java中的一个关键字——synchronized。synchronized分为synchronized方法和synchronized同步代码块。下面我们看一下两者的具体用法:

  • synchronized方法,顾名思义,是吧synchronized关键字写在方法上,它表示这个方法是加了所的,当多个线程同时调用这个方法时,只有获得锁的线程才可以执行。我们看一下下面的例子:
    public synchronized string getticket(){
        return "xxx";
    }

        我们可以看到getticket()方法加了锁,当多个线程并发执行的时候,只有获得锁的线程才可以执行,其他线程只能等待。

  • 我们再来看看synchronized块,synchronized块语法是:
synchronized (对象锁){
    ……
}

        我们将需要加锁的语句都写在synchronized块内,而在对象锁的位置,需要填写锁的对象,他的含义是,当多个线程并发执行时,只有获得你写的这个对象的锁,才能执行后面的语句,其他的线程只能等待。synchronized块通常的写法是synchronized(this),这个this是当前类的实例,也就是说获得当前这个类的对象的锁,才能执行这个方法,这样写的效果和synchronized方法时一样的。

        再回到我们的示例当中,如何解决storenumber混乱的问题呢?咱们可以在设置storenumber的方法上加锁,这样保证同时只有一个线程能调用这个方法。如下所示:

public class cabinet {


    //柜子中存储的数字
    private int storenumber;

    public synchronized void  setstorenumber(int storenumber){
        this.storenumber = storenumber;
    }

    public int getstorenumber(){
        return this.storenumber;
    }
}

        我们在set方法上加了synchronized关键字,这样在存储数字时,就不会并行的执行了,而是哪个用户抢到锁,哪个用户执行存储数字的方法。我们再运行一下main函数,看看运行的结果:

我是用户1,我存储的数字是:1
我是用户2,我存储的数字是:1
我是用户0,我存储的数字是:1

        咦?结果还是混乱的,为什么?我们再检查一下代码:

es.execute(()->{
    user user = new user(cabinet,storenumber);
    user.usecabinet();
    system.out.println("我是用户"+storenumber+",我存储的数字是:"+cabinet.getstorenumber());
});

        我们可以看到在usecabinet和打印的方法是两个语句,并没有保持原子性,虽然在set方法上加了锁,但是在打印时又存在了一个并发,打印语句是有锁的,但是不能确定哪个线程去执行。所以这里,我们要保证usecabinet和打印的方法的原子性,我们使用synchronized块,但是synchronized块里的对象我们使用谁的?这又是一个问题,user还是cabinet?当然是cabinet,因为每个每个线程都初始化了user,总共有3个user对象了,而cabinet对象只有一个,所以synchronized要用cabinet对象。如下:

synchronized (cabinet){
    user.usecabinet();
    system.out.println("我是用户"+storenumber+",我存储的数字是:"+cabinet.getstorenumber());
}

        我们再去运行一下:

我是用户1,我存储的数字是:1
我是用户2,我存储的数字是:2
我是用户0,我存储的数字是:0

        由于我们加了synchronized块,保证了存储和取出的原子性,这样用户存储的数字和取出的数字就对应上了,不会造成混乱。

        最后我们通过一张图说明一下上面的整体情况:

         如上图所示,线程a,线程b,线程c同时调用cabinet类的setstorenumber方法,线程b获得了锁,所以线程b可以执行setstorenumber的方法,线程a和线程c只能等待。

2、java中单体应用锁的局限性&分布式锁

        前面内容中讲到的锁都是由jdk官方提供的锁的解决方案,也就是说这些锁只能在一个jvm进程内有效,我们把这种锁叫做单体应用锁。但是,在互联网告诉发展的今天,单体应用锁能够满足我们的需求吗?

互联网系统架构的演进

        在互联网系统发展之初,系统比较简单,消耗资源小,用户访问量也比较少,我们只部署一个tomcat应用就可以满足需求。系统架构图如下:

         一个tomcat可以看做是一个jvm进程,当大量的请求并发到达系统时,所有的请求都落在这唯一的一个tomcat上,如果某些请求方法是需要加锁的,比如:秒杀扣减库存,是可以满足需求的,这和我们前面章节所讲的内容是一样的。但是随着访问量的增加,导致一个tomcat难以支撑,这时我们就要集群部署tomcat,使用多个tomcat共同支撑整个系统。系统架构图如下:

         上图中,我们部署了两个tomcat,共同支撑系统。当一个请求到达系统时,首先会经过nginx,nginx主要是做负载转发的,它会根据自己配置的负载均衡策略将请求转发到其中一个tomcat中。当大量的请求并发访问时,两个tomcat共同承担所有的访问量,这时,我们同样在秒杀扣减库存的场景中,使用单体应用锁还能满足要求吗?

单体应用锁的局限性

        如上图所示,在整个系统架构中,存在两个tomcat,每个tomcat是一个jvm。在进行秒杀业务的时候,由于大家都在抢购秒杀商品,大量的请求同时到达系统,通过nginx分发到两个tomcat上。我们通过一个极端的案例场景,可以更好地理解单体应用的局限性。假如,秒杀商品的数量只有1个,这时,这些大量的请求当中,只有一个请求可以成功的抢到这个商品,这就需要在扣减库存的方法上加锁,扣减库存的动作只能一个一个去执行,而不能同时去执行,如果同时执行,这1个商品可能同时被多个人抢到,从而产生超卖现象。加锁之后,扣减库存的动作一个一个去执行,凡是将库存扣减为负数的,都抛出异常,提示该用户没有抢到商品。通过加锁看似解决了秒杀的问题,但是事实真的是这样吗?

        我们看到系统中存在两个tomcat,我们加的锁是jdk官方提供的锁,这种锁只能在一个jvm下起作用,也就是在一个tomcat内是没问题的。当存在两个或两个以上的tomcat时,大量的并发请求分散到不同的tomcat上,在每一个tomcat中都可以防止并发的产生,但是在多个tomcat之间,每个tomcat中获得的这个请求,又产生了并发,从而产生超卖现象。这也就是单体应用锁的局限性,它只能在一个jvm内加锁,而不能从这个应用层面去加锁。

        那么这个问题如何解决呢?这就需要使用分布式锁了,在整个应用层面去加锁。什么是分布式锁呢?我们怎么去使用分布式锁呢?

什么是分布式锁

        在说分布式锁之前,我们看一下单体应用锁的特点,单体应用锁是在一个jvm进程内有效,无法跨jvm、跨进程。那么分布式锁的定义就出来了,分布式锁就是可以跨越多个jvm、跨越多个进程的锁,这种锁就叫做分布式锁。

分布式锁的设计思路

         在上图中,由于tomcat是由java启动的,所以每个tomcat可以看出一个jvm,jvm内部的锁是无法跨越多个进程的。所以,我们要实现分布式锁,我们只能在这些jvm之外去寻找,通过其他的组件来实现分布式锁。系统架构如下图所示:

         两个tomcat通过第三方的组件实现跨jvm、跨进程的分布式锁。这就是分布式锁的解决思路,找到所有jvm可以共同访问的第三方组件,通过第三方组件实现分布式锁。

目前存在的分布式的解决方案

        分布式锁都是通过第三方组件来实现的,目前比较流行的分布式锁的解决方案有:

  • 数据库,通过数据库可以实现分布式分布式锁,但是在高并发的情况下对数据库压力比较大,所以很少使用。
  • redis,借助redis也可以实现分布式锁,而且redis的java客户端种类很多,使用的方法也不尽相同。
  • zookeeper,zookeeper也可以实现分布式锁,同样zookeeper也存在多个java客户端,使用方法也不相同。

3、java中锁的解决方案

乐观锁与悲观锁

        乐观锁与悲观锁应该是每个开发人员最先接触的两种锁。小编最早接触的就是这两种锁,但是不是在java中接触的,而是在数据库当中。当时的应用场景主要是在更新数据的时候,更新数据这个场景也是使用锁的非常主要的场景之一。更新数据的主要流程如下:

  1. 检索出要更新的数据,供操作人员查看;
  2. 操作人员更改需要修改的数据;
  3. 点击保存,更新数据。

        这个流程看似简单,但是我们用多线程的思维去思考,这也应该算是一种互联网思维吧,就会发现其中隐藏着问题。我们具体看一下:

  1. a检测出数据;
  2. b检测出数据;
  3. b修改了数据;
  4. a修改数据,系统会修改成功吗?

        当然啦,a修改成功与否,要看程序怎么写。咱们抛开程序,从常理考虑,a保存数据的时候,系统要给提示,说“您修改的数据已被其他人修改过,请重新查询确认”。那么我们程序中要怎么实现呢?

  1. 在检索数据时,将数据的版本号(version)或者最后更新时间一并检索出来;
  2. 操作员更改数据以后,点击保存,在书库执行update操作;
  3. 执行update操作时,用步骤1检索出来的版本号或者最后的更新时间与数据库中的记录作比较;
  4. 如果版本号或者最后更新时间一致,则可以更新;
  5. 如果不一致,就要给出上面的提示;

        上述的流程就是乐观锁的实现方式。在java中乐观锁并没有确定的方法,或者关键字,他只是一个处理流程、策略。咱们看懂上面的例子之后,再来看看java中乐观锁。

        乐观锁,它是假设一个线程在获取数据的时候不会被其他线程更改数据,就像上面的例子那样,但是在更新数据的时候会校验数据有没有被修改过。它是一种比较交换的机制,简称cas(compare and swap)机制。一旦检测到有冲突产生,也就是上面说到的版本号或者最后更新时间不一致,它是会进行重试,直到没有冲突为止。

乐观锁的机制如图所示:

         咱们看一下java中最常见的i++,咱们思考一个问题,i++它的执行顺序是什么样子的?它是线程安全的吗?当多个线程并发执行i++的时候,会不会有问题?接下来咱们通过程序看一下:

public class test {
	
    private int i=0;
    public static void main(string[] args) {
        test test = new test();
        //线程池:50个线程
        executorservice es = executors.newfixedthreadpool(50);
        //闭锁
        countdownlatch cdl = new countdownlatch(5000);
        for (int i = 0;i < 5000; i++){
            es.execute(()->{
                test.i++;
                cdl.countdown();
            });
        }
        es.shutdown();
        try {
            //等待5000个任务执行完成后,打印出执行结果
            cdl.await();
            system.out.println("执行完成后,i="+test.i);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
    }
}

        上面的程序中,我们模拟了50个线程同时执行i++,总共执行5000次,按照常规的理解,得到的结果应该是5000,我们运行一下程序,看看之心结果如何:

执行完成后,i=4975
执行完成后,i=4986
执行完成后,i=4971

        这时我们运行3次以后得到的结果,可以看到每次执行的结果都不一样,而且不是5000,这是为什么?这就说明i++并不是一个原子性的操作,在多线程的情况下并不安全。我们把i++的详细执行步骤拆解一下:

  1. 从内存中取出i的当前值;
  2. 将i的值加1;
  3. 将计算好的值放入到内存当中;

        这个流程和我们上面讲解的数据库的操作流程是一样的。在多线程的场景下,我们可以想象一下,线程a和线程b同时从内存中取出i的值,假如i的值是1000,然后线程a和线程b再同时执行+1的操作,然后把值再放入内存当中,这时,内存中的值是1001,而我们期望的值是1002,正是这个原因,导致了上面的错误。那么我们如何解决呢?在java1.5以后,jdk官方提供了大量的原子类,这些类的内部都是基于cas机制的,也就是使用了乐观锁。我们将上面的程序稍微改造一下,如下:

public class test {

    private atomicinteger i = new atomicinteger(0);
    public static void main(string[] args) {
        test test = new test();
        executorservice es = executors.newfixedthreadpool(50);
        countdownlatch cdl = new countdownlatch(5000);
        for (int i = 0;i < 5000; i++){
            es.execute(()->{
                test.i.incrementandget();
                cdl.countdown();
            });
        }
        es.shutdown();
        try {
            cdl.await();
            system.out.println("执行完成后,i="+test.i);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
    }
}

        我们将变量i的类型改为atomicinteger,atomicinteger是一个原子类。我们在之前调用i++的地方改成了i.incrementandget(),incrementandget()方法采用了cas机制,也就是说使用了乐观锁。我们再运行一下程序,看看结果如何。

执行完成后,i=5000
执行完成后,i=5000
执行完成后,i=5000

        我们同样执行了3次,3次的结果都是5000,符合了我们预期。这个就是乐观锁。我们对乐观锁稍加总结,乐观锁在读取数据的时不做任何限制,而是在更新数据的时候,进行数据的比较,保证数据的版本一致时再更新数据。根据他的这个特点,可以看出乐观锁适用于读操作多,而写操作少的场景。

        悲观锁与乐观锁恰恰相反,悲观锁从读取数据的时候就显式的加锁,直到数据更新完成,释放锁为止。在这期间只能有一个线程去操作,其他的线程只能等待。在java中,悲观锁可以使用synchronized关键字或者reentrantlock类来实现。还是上面的例子,我们分别使用这两种方式来实现一下。首先是使用synchronized关键字来实现:

public class test {

    private int i=0;
    public static void main(string[] args) {
        test test = new test();
        executorservice es = executors.newfixedthreadpool(50);
        countdownlatch cdl = new countdownlatch(5000);
        for (int i = 0;i < 5000; i++){
            es.execute(()->{
                //修改部分  开始
                synchronized (test){
                    test.i++;
                }
                //修改部分  结束
                cdl.countdown();
            });
        }
        es.shutdown();
        try {
            cdl.await();
            system.out.println("执行完成后,i="+test.i);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
    }
}

        我们唯一的改动就是增加了synchronized块,它锁住的对象是test,在所有线程中,谁获得了test对象的锁,谁才能执行i++操作。我们使用了synchronized悲观锁的方式,使得i++线程安全。我们运行一下,看看结果如何。

执行完成后,i=5000
执行完成后,i=5000
执行完成后,i=5000

        我们运行3次,结构都是5000,符合预期。接下来,我们再使用reentrantlock类来实现悲观锁。

public class test {
    //添加了reentrantlock锁
    lock lock = new reentrantlock();
    private int i=0;
    public static void main(string[] args) {
        test test = new test();
        executorservice es = executors.newfixedthreadpool(50);
        countdownlatch cdl = new countdownlatch(5000);
        for (int i = 0;i < 5000; i++){
            es.execute(()->{
                //修改部分  开始
                test.lock.lock();
                test.i++;
                test.lock.unlock();
                //修改部分  结束
                cdl.countdown();
            });
        }
        es.shutdown();
        try {
            cdl.await();
            system.out.println("执行完成后,i="+test.i);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
    }
}

        我们在类中显式的增加了lock lock = new reentrantlock();,而且在i++之前增加了lock.lock(),加锁操作,在i++之后增加了lock.unlock()释放锁的操作。我们同样运行3次,看看结果。

执行完成后,i=5000
执行完成后,i=5000
执行完成后,i=5000

        3次运行结果都是5000,完全符合预期。我们再来总结一下悲观锁,悲观锁从读取数据的时候就加了锁,而且在更新数据的时候,保证只有一个线程在执行更新操作,没有像乐观锁那样进行数据版本的比较。所以悲观锁适用于读相对少,写相对多的操作。

公平锁与非公平锁

        从名字不难看出,公平锁在多线程情况下,对待每个线程都是公平的;而非公平锁恰好与之相反。从字面上理解还是有些晦涩难懂,我们还是举例说明,场景还是去超市买东西,在储物柜存东西的例子。储物柜只有一个,同时来了3个人使用储物柜,这时a先抢到了柜子,a去使用,b和c自觉进行排队。a使用完以后,后面排队的第一个人将继续使用柜子,这就是公平锁。在公平锁当中,所有的线程都自觉排队,一个线程执行完以后,排在后面的线程继续使用。

        非公平锁则不然,a在使用柜子的时候,b和c并不会排队,a使用完以后,将柜子的钥匙往后一抛,b和c谁抢到了谁用,甚至可能突然跑来一个d,这个d抢到了钥匙,那么d将使用柜子,这个就是非公平锁。

公平锁如图所示:

         多个线程同时执行方法,线程a抢到了锁,a可以执行方法。其他线程则在队列里进行排队,a执行完方法后,会从队列里取出下一个线程b,再去执行方法。以此类推,对于每一个线程来说都是公平的,不会存在后加入的线程先执行的情况。

非公平锁如下图所示:

         多个线程同时执行方法,线程a抢到了锁,a可以执行方法。其他的线程并没有排队,a执行完方法,释放锁后,其他的线程谁抢到了锁,谁去执行方法。会存在后加入的线程,反而先抢到锁的情况。

        公平锁与非公平锁都在reentrantlock类里给出了实现,我们来看一下reentrantlock的源码:

    /**
     * creates an instance of {@code reentrantlock}.
     * this is equivalent to using {@code reentrantlock(false)}.
     */
    public reentrantlock() {
        sync = new nonfairsync();
    }

    /**
     * creates an instance of {@code reentrantlock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public reentrantlock(boolean fair) {
        sync = fair ? new fairsync() : new nonfairsync();
    }

        reentrantlock有两个构造方法,默认的构造方法中,sync = new nonfairsync();我们可以从字面意思看出它是一个非公平锁。再看看第二个构造方法,它需要传入一个参数,参数是一个布尔型,true是公平锁,false是非公平锁。从字面的源代码我们可以看出sync有两个实现类,分别是fairsync和nonfairsync,我们再看看获取锁的核心方法,收拾公平锁fairsync的,

@reservedstackaccess
protected final boolean tryacquire(int acquires) {
    final thread current = thread.currentthread();
    int c = getstate();
    if (c == 0) {
        if (!hasqueuedpredecessors() &&
            compareandsetstate(0, acquires)) {
            setexclusiveownerthread(current);
            return true;
        }
    }
    else if (current == getexclusiveownerthread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new error("maximum lock count exceeded");
        setstate(nextc);
        return true;
    }
    return false;
}

然后是非公平锁nonfairsync的,

@reservedstackaccess
final boolean nonfairtryacquire(int acquires) {
    final thread current = thread.currentthread();
    int c = getstate();
    if (c == 0) {
        if (compareandsetstate(0, acquires)) {
            setexclusiveownerthread(current);
            return true;
        }
    }
    else if (current == getexclusiveownerthread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new error("maximum lock count exceeded");
        setstate(nextc);
        return true;
    }
    return false;
}

        通过对比两个方法,我们可以看出唯一不同之处在于!hasqueuedpredecessors()这个方法,很明显这个方法是一个队列,由此可以推断,公平锁是将所有的线程放在一个队列中,一个线程执行完成后,从队列中取出下一个线程,而非公平锁则没有这个队列。这些都是公平锁与非公平锁底层的实现原理,我们在使用的时候不用追到这么深层次的代码,只需要了解公平锁与非公平锁的含义,并且在调用构造方法时,传入true和false即可。

   

4、redisson介绍

        redis有很多java客户端,我们比较常用的有jedis,spring-data-redis,lettuce等。今天我们介绍一个非常好用的redis的java客户端——redission。我们先看一下redis官网中介绍的java客户端列表:

         在这个列表中,我们可以看到redission的后面有星,说明还是比较受欢迎的。再看看后面的简介,redission是一个在redis服务至上的,分布式、可扩展的java数据结构。我们进入redission的官网,看看官网是怎么介绍的。

         上面一段话看起来有点晦涩难懂,总结起来可以归结为以下几点:

  • redission提供了使用redis的最简单和最快捷的方法;
  • 开发人员不需要过分关注redis,集中精力关注业务即可;
  • 基于redis,提供了在java中具有分布式特性的工具类;
  • 使java中的并发工具包获得了协调多机多线程并发的能力;

redission特性

        上面我们对redission有了一个整体的印象,接下来我们来看看它有哪些特点。

支持redis配置

        redission支持多种redis配置,无论你的redis是单点、集群、主从还是哨兵模式,它都是支持的。只需要在redission的配置文件中,增加相应的配置就可以了。

支持的java实体

        redission支持多种java实体,使其具有分布式的特性。我们比较常用的有:atomiclong(原子long)、atomicdouble(原子double)、publishsubscribe(发布订阅)等。

java分布式锁与同步器

        redission支持java并发包中的多种锁,比如:lock(可重入锁)、fairlock(公平锁)、multilock(联锁)、redlock(红锁)、readwritelock(读写锁)、semaphore(信号量)、countdownlatch(闭锁)等。我们注意到这些都是java并发包中的类,redission借助于redis又重新实现了一套,使其具有分布式的特点。以后我们在使用redission中的这些类的时候,可以跨进程跨jvm去使用。

分布式java集合

        redission对java的集合也进行了封装,使其具有分布式的特性。如:map、set、list、queue、deque、blockingqueue等。以后我们就可以在分布式的环境中使用这些集合了。

与spring框架整合

        redission可以与spring大家族中的很多框架进行整合,其中包括:spring基础框架、spring cache、spring session、spring data redis、spring boot等。在项目中我们可以轻松的与这些框架整合,通过简单的配置就可以实现项目的需求。

5、实战解决电商超卖问题

 

 

 

 

 

 

 

 

 

 

 

synchronized在方法上加锁,由于事务提交是交由spring管理,在锁释放后,该线程所在的事务有可能未提交,mysql innodb的默认隔离级别是rr,这就会导致下一个事务并不能及时获取到更新后的值,从而导致超卖。解决方案是手动提交事务,并且提交事务的操作必须在锁的控制方法内。

import com.lvxiaosha.distributedemo.dao.orderitemmapper;
import com.lvxiaosha.distributedemo.dao.ordermapper;
import com.lvxiaosha.distributedemo.dao.productmapper;
import com.lvxiaosha.distributedemo.model.order;
import com.lvxiaosha.distributedemo.model.orderitem;
import com.lvxiaosha.distributedemo.model.product;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;
import org.springframework.transaction.platformtransactionmanager;
import org.springframework.transaction.transactiondefinition;
import org.springframework.transaction.transactionstatus;

import javax.annotation.resource;
import java.math.bigdecimal;
import java.util.date;
import java.util.concurrent.locks.lock;
import java.util.concurrent.locks.reentrantlock;

@service
@slf4j
public class orderservice {

    @resource
    private ordermapper ordermapper;
    @resource
    private orderitemmapper orderitemmapper;
    @resource
    private productmapper productmapper;
    //购买商品id
    private int purchaseproductid = 100100;
    //购买商品数量
    private int purchaseproductnum = 1;
    @autowired
    private platformtransactionmanager platformtransactionmanager;
    @autowired
    private transactiondefinition transactiondefinition;

    private lock lock = new reentrantlock();


//    @transactional(rollbackfor = exception.class)
    public integer createorder() throws exception{
        product product = null;

        lock.lock();
        try {
            transactionstatus transaction1 = platformtransactionmanager.gettransaction(transactiondefinition);
            product = productmapper.selectbyprimarykey(purchaseproductid);
            if (product==null){
                platformtransactionmanager.rollback(transaction1);
                throw new exception("购买商品:"+purchaseproductid+"不存在");
            }

            //商品当前库存
            integer currentcount = product.getcount();
            system.out.println(thread.currentthread().getname()+"库存数:"+currentcount);
            //校验库存
            if (purchaseproductnum > currentcount){
                platformtransactionmanager.rollback(transaction1);
                throw
                        new exception("商品"+purchaseproductid+"仅剩"+currentcount+"件,无法购买");
            }

            productmapper.updateproductcount(purchaseproductnum,"xxx",new date(),product.getid());
            platformtransactionmanager.commit(transaction1);
        }finally {
            lock.unlock();
        }

        transactionstatus transaction = platformtransactionmanager.gettransaction(transactiondefinition);
        order order = new order();
        order.setorderamount(product.getprice().multiply(new bigdecimal(purchaseproductnum)));
        order.setorderstatus(1);//待处理
        order.setreceivername("xxx");
        order.setreceivermobile("13311112222");
        order.setcreatetime(new date());
        order.setcreateuser("xxx");
        order.setupdatetime(new date());
        order.setupdateuser("xxx");
        ordermapper.insertselective(order);

        orderitem orderitem = new orderitem();
        orderitem.setorderid(order.getid());
        orderitem.setproductid(product.getid());
        orderitem.setpurchaseprice(product.getprice());
        orderitem.setpurchasenum(purchaseproductnum);
        orderitem.setcreateuser("xxx");
        orderitem.setcreatetime(new date());
        orderitem.setupdatetime(new date());
        orderitem.setupdateuser("xxx");
        orderitemmapper.insertselective(orderitem);
        platformtransactionmanager.commit(transaction);
        return order.getid();
    }

}

数据库表设计:

/*
navicat mysql data transfer

source server         : 本地数据库
source server version : 80014
source host           : localhost:3306
source database       : distribute

target server type    : mysql
target server version : 80014
file encoding         : 65001

date: 2020-07-21 14:09:14
*/

set foreign_key_checks=0;

-- ----------------------------
-- table structure for distribute_lock
-- ----------------------------
drop table if exists `distribute_lock`;
create table `distribute_lock` (
  `id` int(11) not null auto_increment,
  `business_code` varchar(255) not null,
  `business_name` varchar(255) not null,
  primary key (`id`)
) engine=innodb auto_increment=2 default charset=utf8mb4 collate=utf8mb4_0900_ai_ci;

-- ----------------------------
-- records of distribute_lock
-- ----------------------------
insert into `distribute_lock` values ('1', 'demo', 'demo演示');

-- ----------------------------
-- table structure for order
-- ----------------------------
drop table if exists `order`;
create table `order` (
  `id` int(11) not null auto_increment,
  `order_status` int(1) not null default '1' comment '订单状态 1:待支付;',
  `receiver_name` varchar(255) not null comment '收货人姓名',
  `receiver_mobile` varchar(11) not null comment '收货人手机号',
  `order_amount` decimal(11,2) not null comment '订单金额',
  `create_time` time not null comment '创建时间',
  `create_user` varchar(255) not null comment '创建人',
  `update_time` time not null comment '更新时间',
  `update_user` varchar(255) not null comment '更新人',
  primary key (`id`)
) engine=innodb auto_increment=46 default charset=utf8mb4 collate=utf8mb4_0900_ai_ci;

-- ----------------------------
-- records of order
-- ----------------------------
insert into `order` values ('35', '1', 'xxx', '13311112222', '5.00', '16:53:27', 'xxx', '16:53:27', 'xxx');
insert into `order` values ('36', '1', 'xxx', '13311112222', '5.00', '16:53:27', 'xxx', '16:53:27', 'xxx');
insert into `order` values ('37', '1', 'xxx', '13311112222', '5.00', '16:56:14', 'xxx', '16:56:14', 'xxx');
insert into `order` values ('38', '1', 'xxx', '13311112222', '5.00', '16:56:14', 'xxx', '16:56:14', 'xxx');
insert into `order` values ('39', '1', 'xxx', '13311112222', '5.00', '17:06:10', 'xxx', '17:06:10', 'xxx');
insert into `order` values ('40', '1', 'xxx', '13311112222', '5.00', '17:09:49', 'xxx', '17:09:49', 'xxx');
insert into `order` values ('41', '1', 'xxx', '13311112222', '5.00', '17:11:07', 'xxx', '17:11:07', 'xxx');
insert into `order` values ('42', '1', 'xxx', '13311112222', '5.00', '17:11:07', 'xxx', '17:11:07', 'xxx');
insert into `order` values ('43', '1', 'xxx', '13311112222', '5.00', '17:12:53', 'xxx', '17:12:53', 'xxx');
insert into `order` values ('44', '1', 'xxx', '13311112222', '5.00', '17:40:24', 'xxx', '17:40:24', 'xxx');
insert into `order` values ('45', '1', 'xxx', '13311112222', '5.00', '18:03:06', 'xxx', '18:03:06', 'xxx');

-- ----------------------------
-- table structure for order_item
-- ----------------------------
drop table if exists `order_item`;
create table `order_item` (
  `id` int(11) not null auto_increment,
  `order_id` int(11) not null comment '订单id',
  `product_id` int(11) not null comment '商品数量',
  `purchase_price` decimal(11,2) not null comment '购买金额',
  `purchase_num` int(3) not null comment '购买数量',
  `create_time` time not null comment '创建时间',
  `create_user` varchar(255) not null comment '创建人',
  `update_time` time not null comment '更新时间',
  `update_user` varchar(255) not null comment '更新人',
  primary key (`id`)
) engine=innodb auto_increment=46 default charset=utf8mb4 collate=utf8mb4_0900_ai_ci;

-- ----------------------------
-- records of order_item
-- ----------------------------
insert into `order_item` values ('35', '35', '100100', '5.00', '1', '16:53:27', 'xxx', '16:53:27', 'xxx');
insert into `order_item` values ('36', '36', '100100', '5.00', '1', '16:53:27', 'xxx', '16:53:27', 'xxx');
insert into `order_item` values ('37', '37', '100100', '5.00', '1', '16:56:14', 'xxx', '16:56:14', 'xxx');
insert into `order_item` values ('38', '38', '100100', '5.00', '1', '16:56:14', 'xxx', '16:56:14', 'xxx');
insert into `order_item` values ('39', '39', '100100', '5.00', '1', '17:06:10', 'xxx', '17:06:10', 'xxx');
insert into `order_item` values ('40', '40', '100100', '5.00', '1', '17:09:49', 'xxx', '17:09:49', 'xxx');
insert into `order_item` values ('41', '41', '100100', '5.00', '1', '17:11:07', 'xxx', '17:11:07', 'xxx');
insert into `order_item` values ('42', '42', '100100', '5.00', '1', '17:11:07', 'xxx', '17:11:07', 'xxx');
insert into `order_item` values ('43', '43', '100100', '5.00', '1', '17:12:53', 'xxx', '17:12:53', 'xxx');
insert into `order_item` values ('44', '44', '100100', '5.00', '1', '17:40:24', 'xxx', '17:40:24', 'xxx');
insert into `order_item` values ('45', '45', '100100', '5.00', '1', '18:03:06', 'xxx', '18:03:06', 'xxx');

-- ----------------------------
-- table structure for product
-- ----------------------------
drop table if exists `product`;
create table `product` (
  `id` int(11) not null auto_increment,
  `product_name` varchar(255) not null comment '商品名称',
  `price` decimal(11,2) not null comment '商品金额',
  `count` int(5) not null comment '数量',
  `product_desc` varchar(255) not null comment '商品描述',
  `create_time` time not null comment '创建时间',
  `create_user` varchar(255) not null comment '创建人',
  `update_time` time not null comment '更新时间',
  `update_user` varchar(255) not null comment '更新人',
  primary key (`id`)
) engine=innodb auto_increment=100101 default charset=utf8mb4 collate=utf8mb4_0900_ai_ci;

-- ----------------------------
-- records of product
-- ----------------------------
insert into `product` values ('100100', '测试商品', '5.00', '1', '测试商品', '11:01:57', 'xxx', '18:03:06', 'xxx');

 

        详细代码请见github地址:https://github.com/lvdapiaoliang/996-dev/tree/master/all-learning/distributelock/distribute-demo

        

6、基于数据库的分布式锁

 

 

import com.lvxiaosha.distributelock.dao.distributelockmapper;
import com.lvxiaosha.distributelock.model.distributelock;
import com.lvxiaosha.distributelock.model.distributelockexample;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.transaction.annotation.transactional;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.restcontroller;

import javax.annotation.resource;
import java.util.concurrent.locks.lock;
import java.util.concurrent.locks.reentrantlock;

@restcontroller
@slf4j
public class democontroller {
    @resource
    private distributelockmapper distributelockmapper;

    @requestmapping("singlelock")
    @transactional(rollbackfor = exception.class)
    public string singlelock() throws exception {
        log.info("我进入了方法!");
        distributelock distributelock = distributelockmapper.selectdistributelock("demo");
        if (distributelock==null) throw new exception("分布式锁找不到");
        log.info("我进入了锁!");
        try {
            thread.sleep(20000);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
        return "我已经执行完成!";
    }
}
distributelock selectdistributelock(@param("businesscode") string businesscode);
<select id="selectdistributelock" resulttype="com.lvxiaosha.distributelock.model.distributelock">
    select * from distribute_lock
    where business_code = #{businesscode,jdbctype=varchar}
    for update
  </select>

7、基于redis的分布式锁

 

 

 

 

 

 coding演示:

  • 启动redis

        redis的安装配置请参考我的另外一篇文章:

 1.2 redis7.0.4安装与配置开机自启动_iamlvxiaosha的博客-csdn博客

  • 添加maven依赖
<dependency>
   <groupid>org.springframework.boot</groupid>
   <artifactid>spring-boot-starter-data-redis</artifactid>
</dependency>
  • 在application.properties里面添加springboot的redis相关依赖
  • 编写代码
import com.lvxiaosha.distributelock.lock.redislock;
import com.lvxiaosha.distributelock.lock.zklock;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.data.redis.core.redistemplate;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.restcontroller;

@restcontroller
@slf4j
public class redislockcontroller {
    @autowired
    private redistemplate redistemplate;

    @requestmapping("redislock")
    public string redislock(){
        log.info("我进入了方法!");
        try (redislock redislock = new redislock(redistemplate,"rediskey",30)){
            if (redislock.getlock()) {
                log.info("我进入了锁!!");
                thread.sleep(15000);
            }
        } catch (interruptedexception e) {
            e.printstacktrace();
        } catch (exception e) {
            e.printstacktrace();
        }
        log.info("方法执行完成");
        return "方法执行完成";
    }
}
import lombok.extern.slf4j.slf4j;
import org.springframework.data.redis.connection.redisstringcommands;
import org.springframework.data.redis.core.rediscallback;
import org.springframework.data.redis.core.redistemplate;
import org.springframework.data.redis.core.script.redisscript;
import org.springframework.data.redis.core.types.expiration;

import java.util.arrays;
import java.util.list;
import java.util.uuid;

@slf4j
public class redislock implements autocloseable {

    private redistemplate redistemplate;
    private string key;
    private string value;
    //单位:秒
    private int expiretime;

    public redislock(redistemplate redistemplate,string key,int expiretime){
        this.redistemplate = redistemplate;
        this.key = key;
        this.expiretime=expiretime;
        this.value = uuid.randomuuid().tostring();
    }

    /**
     * 获取分布式锁
     * @return
     */
    public boolean getlock(){
        rediscallback<boolean> rediscallback = connection -> {
            //设置nx
            redisstringcommands.setoption setoption = redisstringcommands.setoption.ifabsent();
            //设置过期时间
            expiration expiration = expiration.seconds(expiretime);
            //序列化key
            byte[] rediskey = redistemplate.getkeyserializer().serialize(key);
            //序列化value
            byte[] redisvalue = redistemplate.getvalueserializer().serialize(value);
            //执行setnx操作
            boolean result = connection.set(rediskey, redisvalue, expiration, setoption);
            return result;
        };

        //获取分布式锁
        boolean lock = (boolean)redistemplate.execute(rediscallback);
        return lock;
    }

    public boolean unlock() {
        string script = "if redis.call(\"get\",keys[1]) == argv[1] then\n" +
                "    return redis.call(\"del\",keys[1])\n" +
                "else\n" +
                "    return 0\n" +
                "end";
        redisscript<boolean> redisscript = redisscript.of(script,boolean.class);
        list<string> keys = arrays.aslist(key);

        boolean result = (boolean)redistemplate.execute(redisscript, keys, value);
        log.info("释放锁的结果:"+result);
        return result;
    }


    @override
    public void close() throws exception {
        unlock();
    }
}

8、基于zookeeper与curator的分布式锁

 zookeeper的下载与安装请参考我的另外一篇文章:

1、kafka急速入门_iamlvxiaosha的博客-csdn博客

运行zookeeper,在zookeeper里面创建lock节点:

 

<dependency>
   <groupid>org.apache.zookeeper</groupid>
   <artifactid>zookeeper</artifactid>
   <version>3.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
   <groupid>org.apache.curator</groupid>
   <artifactid>curator-recipes</artifactid>
   <version>5.3.0</version>
</dependency>

import lombok.extern.slf4j.slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.stat;

import java.io.ioexception;
import java.util.collections;
import java.util.list;

@slf4j
public class zklock implements autocloseable, watcher {

    private zookeeper zookeeper;
    private string znode;

    public zklock() throws ioexception {
        this.zookeeper = new zookeeper("http://192.168.110.1130:2181",
                10000,this);
    }

    public boolean getlock(string businesscode) {
        try {
            //创建业务 根节点
            stat stat = zookeeper.exists("/" + businesscode, false);
            if (stat==null){
                zookeeper.create("/" + businesscode,businesscode.getbytes(),
                        zoodefs.ids.open_acl_unsafe,
                        createmode.persistent);
            }

            //创建瞬时有序节点  /order/order_00000001
            znode = zookeeper.create("/" + businesscode + "/" + businesscode + "_", businesscode.getbytes(),
                    zoodefs.ids.open_acl_unsafe,
                    createmode.ephemeral_sequential);

            //获取业务节点下 所有的子节点
            list<string> childrennodes = zookeeper.getchildren("/" + businesscode, false);
            //子节点排序
            collections.sort(childrennodes);
            //获取序号最小的(第一个)子节点
            string firstnode = childrennodes.get(0);
            //如果创建的节点是第一个子节点,则获得锁
            if (znode.endswith(firstnode)){
                return true;
            }
            //不是第一个子节点,则监听前一个节点
            string lastnode = firstnode;
            for (string node:childrennodes){
                if (znode.endswith(node)){
                    zookeeper.exists("/"+businesscode+"/"+lastnode,true);
                    break;
                }else {
                    lastnode = node;
                }
            }
            synchronized (this){
                wait();
            }

            return true;

        } catch (exception e) {
            e.printstacktrace();
        }
        return false;
    }





    @override
    public void close() throws exception {
        zookeeper.delete(znode,-1);
        zookeeper.close();
        log.info("我已经释放了锁!");
    }

    @override
    public void process(watchedevent event) {
        if (event.gettype() == event.eventtype.nodedeleted){
            synchronized (this){
                notify();
            }
        }
    }
}
import com.lvxiaosha.distributezklock.lock.zklock;
import lombok.extern.slf4j.slf4j;
import org.apache.curator.framework.curatorframework;
import org.apache.curator.framework.recipes.locks.interprocessmutex;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.restcontroller;

import java.io.ioexception;
import java.util.concurrent.timeunit;

@restcontroller
@slf4j
public class zookeepercontroller {
    @autowired
    private curatorframework client;

    @requestmapping("zklock")
    public string zookeeperlock(){
        log.info("我进入了方法!");
        try (zklock zklock = new zklock()) {
            if (zklock.getlock("order")){
                log.info("我获得了锁");
                thread.sleep(10000);
            }
        } catch (ioexception e) {
            e.printstacktrace();
        } catch (exception e) {
            e.printstacktrace();
        }
        log.info("方法执行完成!");
        return "方法执行完成!";
    }

    @requestmapping("curatorlock")
    public string curatorlock(){
        log.info("我进入了方法!");
        interprocessmutex lock = new interprocessmutex(client, "/order");
        try{
            if (lock.acquire(30, timeunit.seconds)){
                log.info("我获得了锁!!");
                thread.sleep(10000);
            }
        } catch (ioexception e) {
            e.printstacktrace();
        } catch (exception e) {
            e.printstacktrace();
        }finally {
            try {
                log.info("我释放了锁!!");
                lock.release();
            } catch (exception e) {
                e.printstacktrace();
            }
        }
        log.info("方法执行完成!");
        return "方法执行完成!";
    }
}
import com.lvxiaosha.distributezklock.lock.zklock;
import lombok.extern.slf4j.slf4j;
import org.apache.curator.retrypolicy;
import org.apache.curator.framework.curatorframework;
import org.apache.curator.framework.curatorframeworkfactory;
import org.apache.curator.framework.recipes.locks.interprocessmutex;
import org.apache.curator.retry.exponentialbackoffretry;
import org.junit.test;
import org.junit.runner.runwith;
import org.springframework.boot.test.context.springboottest;
import org.springframework.test.context.junit4.springrunner;

import java.io.ioexception;
import java.util.concurrent.timeunit;

@runwith(springrunner.class)
@springboottest
@slf4j
public class distributezklockapplicationtests {

    @test
    public void contextloads() {
    }


    @test
    public void testzklock() throws exception {
        zklock zklock = new zklock();
        boolean lock = zklock.getlock("order");
        log.info("获得锁的结果:"+lock);

        zklock.close();
    }

    @test
    public void testcuratorlock(){
        retrypolicy retrypolicy = new exponentialbackoffretry(1000, 3);
        curatorframework client = curatorframeworkfactory.newclient("localhost:2181", retrypolicy);
        client.start();
        interprocessmutex lock = new interprocessmutex(client, "/order");
        try {
            if ( lock.acquire(30, timeunit.seconds) ) {
                try  {
                    log.info("我获得了锁!!!");
                }
                finally  {
                    lock.release();
                }
            }
        } catch (exception e) {
            e.printstacktrace();
        }
        client.close();
    }
}

注:比较推荐curator的分布式锁实现方法,实现更简单。

9、基于redisson实现分布式锁

maven依赖引入:

<dependency>
   <groupid>org.redisson</groupid>
   <artifactid>redisson-spring-boot-starter</artifactid>
   <version>3.17.6</version>
</dependency>

修改application.properties配置文件,添加redis配置:

创建redissonlockcontroller:

import lombok.extern.slf4j.slf4j;
import org.redisson.redisson;
import org.redisson.api.rlock;
import org.redisson.api.redissonclient;
import org.redisson.config.config;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.restcontroller;

import java.util.concurrent.timeunit;

@restcontroller
@slf4j
public class redissonlockcontroller {
    @autowired
    private redissonclient redisson;

    @requestmapping("redissonlock")
    public string redissonlock() {
        rlock rlock = redisson.getlock("order");
        log.info("我进入了方法!!");
        try {
            rlock.lock(30, timeunit.seconds);
            log.info("我获得了锁!!!");
            thread.sleep(10000);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }finally {
            log.info("我释放了锁!!");
            rlock.unlock();
        }
        log.info("方法执行完成!!");
        return "方法执行完成!!";
    }
}

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com