文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Java AQS中ReentrantLock条件锁的使用

2023-02-02 12:02

关注

一.什么是AQS

1.定义

java.util.concurrent包中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这些行为的抽象就是基于AbstractQueuedSynchronizer(简称AQS)实现的,AQS是一个抽象同步框架,可以用来实现一个依赖状态的同步器。

JDK中提供的大多数的同步器如Lock, Latch, Barrier等,都是基于AQS框架来实现的。

2.特性

3.属性

内部维护属性volatile int state,表示资源的可用状态

4.资源共享方式

5.两种队列

6.队列节点状态

7.实现方法

自定义同步器实现时主要实现以下几种方法:

二.等待队列

1.同步等待队列

AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先进先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。

AQS 依赖CLH同步队列来完成同步状态的管理:

2.条件等待队列

AQS中条件队列是使用单向列表保存的,用nextWaiter来连接:

三.condition接口

    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        new Thread(() -> {
            lock.lock();
            try {
                log.debug(Thread.currentThread().getName() + " 开始处理任务");
                //会释放当前持有的锁,然后阻塞当前线程
                condition.await();
                log.debug(Thread.currentThread().getName() + " 结束处理任务");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }).start();
        new Thread(() -> {
            lock.lock();
            try {
                log.debug(Thread.currentThread().getName() + " 开始处理任务");
                Thread.sleep(2000);
                //唤醒因调用Condition#await方法而阻塞的线程
                condition.signal();
                log.debug(Thread.currentThread().getName() + " 结束处理任务");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }).start();
    }

Thread-0 开始处理任务

Thread-1 开始处理任务

Thread-1 结束处理任务

Thread-0 结束处理任务

四.ReentrantLock

1.ReentrantLock是什么

ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。

2.特点

3. ReentrantLock和synchronized的区别

4. ReentrantLock的使用

伪代码:

ReentrantLock lock = new ReentrantLock(); //参数默认false,不公平锁  
ReentrantLock lock = new ReentrantLock(true); //公平锁  
//加锁    
lock.lock(); 
try {  
    //临界区 
} finally { 
    // 解锁 
    lock.unlock();  

例子:基本使用

	private static int sum = 0;
    private static Lock lock = new ReentrantLock();
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 3; i++) {
            Thread thread = new Thread(()->{
                //加锁 一般写在try前面
                lock.lock();
                try {
                    // 临界区代码 业务逻辑
                    for (int j = 0; j < 10000; j++) {
                        sum++;
                    }
                } finally {
                    // 解锁
                    lock.unlock();
                }
            });
            thread.start();
        }
        Thread.sleep(2000);
        System.out.println(sum);
    }

30000

可重入

public static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) {
        method1();
    }
    public static void method1() {
        lock.lock();
        try {
            log.debug("execute method1");
            method2();
        } finally {
            lock.unlock();
        }
    }
    public static void method2() {
        lock.lock();
        try {
            log.debug("execute method2");
            method3();
        } finally {
            lock.unlock();
        }
    }
    public static void method3() {
        lock.lock();
        try {
            log.debug("execute method3");
        } finally {
            lock.unlock();
        }
    }

execute method1

execute method2

execute method3

可中断

    public static void main(String[] args) throws InterruptedException {
        ReentrantLock lock = new ReentrantLock();
        Thread t1 = new Thread(() -> {
            log.debug("t1启动...");
            try {
                lock.lockInterruptibly();
                try {
                    log.debug("t1获得了锁");
                } finally {
                    lock.unlock();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                log.debug("t1等锁的过程中被中断");
            }
        }, "t1");
        lock.lock();
        try {
            log.debug("main线程获得了锁");
            t1.start();
            //先让线程t1执行
            Thread.sleep(1000);
            t1.interrupt();
            log.debug("线程t1执行中断");
        } finally {
            lock.unlock();
        }
    }

main线程获得了锁

t1启动…

线程t1执行中断

t1等锁的过程中被中断

锁超时

    public static void main(String[] args) throws InterruptedException {
        ReentrantLock lock = new ReentrantLock();
        Thread t1 = new Thread(() -> {
            log.debug("t1启动...");
            try {
                //if (!lock.tryLock()) {
                //  log.debug("t1获取锁失败,立即返回false");
                //  return;
                //}
 				if (!lock.tryLock(1, TimeUnit.SECONDS)) {
                    log.debug("等待 1s 后获取锁失败,返回");
                    return;
                }
            } catch (Exception e) {
                e.printStackTrace();
                return;
            }
            try {
                log.debug("t1获得了锁");
            } finally {
                lock.unlock();
            }
        }, "t1");
        lock.lock();
        try {
            log.debug("main线程获得了锁");
            t1.start();
            //先让线程t1执行
            Thread.sleep(2000);
        } finally {
            lock.unlock();
        }
    }

main线程获得了锁

t1启动…

等待 1s 后获取锁失败,返回

公平锁和非公平锁

public static void main(String[] args) throws InterruptedException {
//        ReentrantLock lock = new ReentrantLock(true); //公平锁
        ReentrantLock lock = new ReentrantLock(); //非公平锁
        for (int i = 0; i < 500; i++) {
            new Thread(() -> {
                lock.lock();
                try {
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.debug(Thread.currentThread().getName() + " running...");
                } finally {
                    lock.unlock();
                }
            }, "t" + i).start();
        }
        // 1s 之后去争抢锁
        Thread.sleep(1000);
        for (int i = 0; i < 500; i++) {
            new Thread(() -> {
                lock.lock();
                try {
                    log.debug(Thread.currentThread().getName() + " running...");
                } finally {
                    lock.unlock();
                }
            }, "强行插入" + i).start();
        }
    }

条件变量

private static ReentrantLock lock = new ReentrantLock();
    private static Condition cigCon = lock.newCondition();
    private static Condition takeCon = lock.newCondition();
    private static boolean hashcig = false;
    private static boolean hastakeout = false;
    //送烟
    public void cigratee(){
        lock.lock();
        try {
            while(!hashcig){
                try {
                    log.debug("没有烟,歇一会");
                    cigCon.await();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
            log.debug("有烟了,干活");
        }finally {
            lock.unlock();
        }
    }
    //送外卖
    public void takeout(){
        lock.lock();
        try {
            while(!hastakeout){
                try {
                    log.debug("没有饭,歇一会");
                    takeCon.await();

                }catch (Exception e){
                    e.printStackTrace();
                }
            }
            log.debug("有饭了,干活");
        }finally {
            lock.unlock();
        }
    }
    public static void main(String[] args) {
        ReentrantLockDemo6 test = new ReentrantLockDemo6();
        new Thread(() ->{
            test.cigratee();
        }).start();
        new Thread(() -> {
            test.takeout();
        }).start();
        new Thread(() ->{
            lock.lock();
            try {
                hashcig = true;
                log.debug("唤醒送烟的等待线程");
                cigCon.signal();
            }finally {
                lock.unlock();
            }
        },"t1").start();
        new Thread(() ->{
            lock.lock();
            try {
                hastakeout = true;
                log.debug("唤醒送饭的等待线程");
                takeCon.signal();
            }finally {
                lock.unlock();
            }
        },"t2").start();
    }

没有烟,歇一会
没有饭,歇一会
唤醒送烟的等待线程
唤醒送饭的等待线程
有烟了,干活
有饭了,干活

五.源码解析

首先会调用lock方法

public void lock() {
        sync.lock();
    }

lock会调用公平方法或者非公平的方法,默认是非公平锁方法,非公平锁则会cas尝试加锁,state是不是0,是0的话就把它改为1,并设置当前线程为独占线程,加锁成功,待下个线程进来时已经变成1,则失败阻塞。

加锁

	final void lock() {
		// 看状态是不是0,如果是0 则改为1,加锁成功
        if (compareAndSetState(0, 1))
         // 并设置当前线程为独占线程
           setExclusiveOwnerThread(Thread.currentThread());
        else
        	//不是0则失败阻塞
           acquire(1);
    }
protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

加锁失败(入队 阻塞)

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //恢复中断标识位
            selfInterrupt();
    }

首先tryAcquire 又进行了一次判断,看是否能获取锁,

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            //其他线程进来,状态值是1
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
            	// 重入,将状态值+1
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

添加进队列

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        //第一次tail为空
        if (pred != null) {
        	//尾插法
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //tail为空则在这里创建队列
        enq(node);
        return node;
    }

创建队列并且入队

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
            	//创建队列
                if (compareAndSetHead(new Node()))
                	// 将头节点指向前一节点的尾节点,这时候tail不为空了
                    tail = head;
            } else {
            	//双向接口,前一节点的尾节点也指向当前节点的头节点
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

阻塞

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) { //保证一定获取锁
            	//获取head节点
                final Node p = node.predecessor();
                //是头节点则尝试获取锁
                if (p == head && tryAcquire(arg)) {
                	//设置头节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //获取锁失败的情况,阻塞,在for循环里,第一次shouldParkAfterFailedAcquire为false,会将其设置为-1,第二次就可以阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

是否需要阻塞,把状态设置为SIGNAL,可以被唤醒了

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //是-1了就可以去阻塞
        if (ws == Node.SIGNAL)
            
            return true;
        if (ws > 0) {
            
            do { //把节点去掉
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            
             //把状态设置为SIGNAL,可以被唤醒了
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

真正的阻塞方法

 private final boolean parkAndCheckInterrupt() {
 		//阻塞
        LockSupport.park(this);
        //清除中断标识位,在加锁失败方法的后面恢复中断标识位,可能其他地方还用到这个锁标识位
        return Thread.interrupted();
    }

唤醒 unlock()

 public void unlock() {
        sync.release(1);
    }
  public final boolean release(int arg) {
  	// 尝试唤醒
     if (tryRelease(arg)) {
         Node h = head;
         if (h != null && h.waitStatus != 0)
         	//唤醒阻塞的线程
             unparkSuccessor(h);
         return true;
     }
     return false;
 }
protected final boolean tryRelease(int releases) {
			//当前状态-1
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            //设置状态
            setState(c);
            return free;
        }

在这里唤醒

private void unparkSuccessor(Node node) {
        
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //后面一个节点不为空 则直接唤醒当前线程
        if (s != null)
            LockSupport.unpark(s.thread);
    }

线程取消获取锁

private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;
        node.thread = null;
        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
        	//将前一个节点干掉
            node.prev = pred = pred.prev;
        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;
        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;
        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }
            node.next = node; // help GC
        }
    }

至此加锁、解锁、阻塞、唤醒的底层源码都梳理完了。

到此这篇关于Java AQS中ReentrantLock条件锁的使用的文章就介绍到这了,更多相关Java ReentrantLock条件锁内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯