文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Java多线程怎么使用同步工具类CyclicBarrier

2023-06-25 11:56

关注

本篇内容介绍了“Java多线程怎么使用同步工具类CyclicBarrier”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

1 CyclicBarrier方法说明

CyclicBarrier提供的方法有:

屏障线程的运行时机:

等待的线程数量=parties之后,CyclicBarrier打开屏障之前。
举例:在分组计算中,每个线程负责一部分计算,最终这些线程计算结束之后,交由屏障线程进行汇总计算。

int getParties():获取CyclicBarrier打开屏障的线程数量,也成为方数。

int getNumberWaiting():获取正在CyclicBarrier上等待的线程数量。

int await():CyclicBarrier上进行阻塞等待,直到发生以下情形之一:

int await(timeout,TimeUnit):CyclicBarrier上进行限时的阻塞等待,直到发生以下情形之一:

boolean isBroken():获取是否破损标志位broken的值,此值有以下几种情况:

void reset():使得CyclicBarrier回归初始状态,直观来看它做了两件事:

2 CyclicBarrier实例

假若有若干个线程都要进行写数据操作,并且只有所有线程都完成写数据操作之后,这些线程才能继续做后面的事情,此时就可以利用CyclicBarrier了:

 public static void main(String[] args) {        int N = 4;        CyclicBarrier barrier  = new CyclicBarrier(N);        for(int i=0;i<N;i++)            new Writer(barrier).start();    }    static class Writer extends Thread{        private CyclicBarrier cyclicBarrier;        public Writer(CyclicBarrier cyclicBarrier) {            this.cyclicBarrier = cyclicBarrier;        }         @Override        public void run() {            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");            try {                Thread.sleep(5000);      //以睡眠来模拟写入数据操作                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");                cyclicBarrier.await();            } catch (InterruptedException e) {                e.printStackTrace();            }catch(BrokenBarrierException e){                e.printStackTrace();            }            System.out.println("所有线程写入完毕,继续处理其他任务...");        }    }

线程Thread-0正在写入数据...
线程Thread-3正在写入数据...
线程Thread-1正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...

从上面输出结果可以看出,每个写入线程执行完写数据操作之后,就在等待其他线程写入操作完毕。

当所有线程线程写入操作完毕之后,所有线程就继续进行后续的操作了。

如果想在所有线程写入操作完之后,进行额外的其他操作可以为CyclicBarrier提供Runnable参数:

public class CyclicBarrierTest {    public static void main(String[] args) {        int N = 4;        CyclicBarrier barrier  = new CyclicBarrier(N,new Runnable() {            @Override            public void run() {                System.out.println("当前线程"+Thread.currentThread().getName());            }        });        for(int i=0;i<N;i++)            new Writer(barrier).start();    }    static class Writer extends Thread{        private CyclicBarrier cyclicBarrier;        public Writer(CyclicBarrier cyclicBarrier) {            this.cyclicBarrier = cyclicBarrier;        }        @Override        public void run() {            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");            try {                Thread.sleep(3000);      //以睡眠来模拟写入数据操作                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");                cyclicBarrier.await();            } catch (InterruptedException e) {                e.printStackTrace();            }catch(BrokenBarrierException e){                e.printStackTrace();            }            System.out.println("所有线程写入完毕,继续处理其他任务...");        }    }}

线程Thread-0正在写入数据...
线程Thread-3正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1正在写入数据...
线程Thread-1写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
当前线程Thread-2
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...

从结果可以看出,当四个线程都到达barrier状态后,会从四个线程中选择一个线程去执行Runnable

await指定时间的效果:

public class CyclicBarrierTest {    public static void main(String[] args) {        int N = 4;        CyclicBarrier barrier = new CyclicBarrier(N);        for (int i = 0; i < N; i++) {            if (i < N - 1)                new Writer(barrier).start();            else {                try {                    //运行时间远小于2000(cyclicBarrier.await 指定时间) 就不会抛出TimeoutException                    Thread.sleep(3000);                } catch (InterruptedException e) {                    e.printStackTrace();                }                new Writer(barrier).start();            }                    }    }    static class Writer extends Thread {        private CyclicBarrier cyclicBarrier;        public Writer(CyclicBarrier cyclicBarrier) {            this.cyclicBarrier = cyclicBarrier;        }        @Override        public void run() {            System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");            try {                Thread.sleep(3000);      //以睡眠来模拟写入数据操作                System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");                try {                    cyclicBarrier.await(2000, TimeUnit.MILLISECONDS);                } catch (TimeoutException e) {                    e.printStackTrace();                }            } catch (InterruptedException e) {                e.printStackTrace();            } catch (BrokenBarrierException e) {                e.printStackTrace();            }            System.out.println(Thread.currentThread().getName() + "所有线程写入完毕,继续处理其他任务...");        }    }}

线程Thread-0正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1正在写入数据...
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
线程Thread-3正在写入数据...
java.util.concurrent.TimeoutException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:257)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
    at CyclicBarrierTest$Writer.run(CyclicBarrierTest.java:43)
Thread-0所有线程写入完毕,继续处理其他任务...
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
    at CyclicBarrierTest$Writer.run(CyclicBarrierTest.java:43)
Thread-1所有线程写入完毕,继续处理其他任务...
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
    at CyclicBarrierTest$Writer.run(CyclicBarrierTest.java:43)
Thread-2所有线程写入完毕,继续处理其他任务...
线程Thread-3写入数据完毕,等待其他线程写入完毕
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
    at CyclicBarrierTest$Writer.run(CyclicBarrierTest.java:43)
Thread-3所有线程写入完毕,继续处理其他任务...

上面的代码在main方法的for循环中,故意让最后一个线程启动延迟,因为在前面三个线程都达到barrier之后,等待了指定的时间发现第四个线程还没有达到barrier,就抛出异常并继续执行后面的任务。

另外CyclicBarrier是可以重用的,看下面这个例子:

public class CyclicBarrierTest {    public static void main(String[] args) {        int N = 4;        CyclicBarrier barrier  = new CyclicBarrier(N);        for(int i=0;i<N;i++) {            new Writer(barrier).start();        }        try {            Thread.sleep(5000);        } catch (InterruptedException e) {            e.printStackTrace();        }        System.out.println("CyclicBarrier重用");        for(int i=0;i<N;i++) {            new Writer(barrier).start();        }    }    static class Writer extends Thread{        private CyclicBarrier cyclicBarrier;        public Writer(CyclicBarrier cyclicBarrier) {            this.cyclicBarrier = cyclicBarrier;        }        @Override        public void run() {            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");            try {                Thread.sleep(3000);      //以睡眠来模拟写入数据操作                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");                cyclicBarrier.await();            } catch (InterruptedException e) {                e.printStackTrace();            }catch(BrokenBarrierException e){                e.printStackTrace();            }            System.out.println(Thread.currentThread().getName()+"所有线程写入完毕,继续处理其他任务...");        }    }}

线程Thread-0正在写入数据...
线程Thread-3正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1正在写入数据...
线程Thread-1写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
Thread-2所有线程写入完毕,继续处理其他任务...
Thread-1所有线程写入完毕,继续处理其他任务...
Thread-3所有线程写入完毕,继续处理其他任务...
Thread-0所有线程写入完毕,继续处理其他任务...
CyclicBarrier重用
线程Thread-4正在写入数据...
线程Thread-5正在写入数据...
线程Thread-6正在写入数据...
线程Thread-7正在写入数据...
线程Thread-5写入数据完毕,等待其他线程写入完毕
线程Thread-4写入数据完毕,等待其他线程写入完毕
线程Thread-7写入数据完毕,等待其他线程写入完毕
线程Thread-6写入数据完毕,等待其他线程写入完毕
Thread-6所有线程写入完毕,继续处理其他任务...
Thread-5所有线程写入完毕,继续处理其他任务...
Thread-4所有线程写入完毕,继续处理其他任务...
Thread-7所有线程写入完毕,继续处理其他任务...

从执行结果可以看出,在初次的4个线程越过barrier状态后,又可以用来进行新一轮的使用。而CountDownLatch无法进行重复使用。

3 CyclicBarrier源码解析

先看一下CyclicBarrier中成员变量的组成:

    private final ReentrantLock lock = new ReentrantLock();        private final Condition trip = lock.newCondition();        private final int parties;//拦截的线程数量        private final Runnable barrierCommand; //当屏障撤销时,需要执行的屏障操作    //当前的Generation。每当屏障失效或者开闸之后都会自动替换掉。从而实现重置的功能。    private Generation generation = new Generation();        private int count;

可以看出,CyclicBarrier是由ReentrantLockCondition来实现的。具体每个变量都有什么意义,我们在分析源码的时候具体说。
我们主要从CyclicBarrier的构造方法和它的await方法分析说起。

CyclicBarrier构造函数

CyclicBarrier有两个构造函数:

//带Runnable参数的函数 public CyclicBarrier(int parties, Runnable barrierAction) {        if (parties <= 0) throw new IllegalArgumentException();        this.parties = parties;//有几个运动员要参赛        this.count = parties;//目前还需要几个运动员准备好        //你要在所有线程都继续执行下去之前要执行什么操作,可以为空        this.barrierCommand = barrierAction;    }//不带Runnable参数的函数 public CyclicBarrier(int parties) {     this(parties, null); }

其中,第二个构造函数调用的是第一个构造函数,这个 Runnable barrierAction 参数是什么呢?其实在上面的小示例中我们就用到了这个Runnable参数,它就是在所有线程都准备好之后,满足Barrier条件时,并且在所有线程继续执行之前,我们可以执行这个Runnable。但是值得注意的是,这不是新起了一个线程,而是通过最后一个准备好的(也就是最后一个到达Barrier的)线程承担启动的。这一点我们在上面示例中打印的运行结果中也可以看出来:Thread-2线程是最后一个准备好的,就是它执行的这个barrierAction
这里partiescount不要混淆,parties是表示必须有几个线程要到达Barrier,而count是表示目前还有几个线程未到达Barrier。也就是说,只有当count参数为0时,Barrier条件即满足,所有线程可以继续执行。
count变量是怎么减少到0的呢?是通过Barrier执行的await方法。下面我们就看一下await方法。

await方法

public int await() throws InterruptedException, BrokenBarrierException {        try {            return dowait(false, 0L);        } catch (TimeoutException toe) {            throw new Error(toe); // cannot happen        }

await方法调用的dowait方法:

   private int dowait(boolean timed, long nanos)        throws InterruptedException, BrokenBarrierException,               TimeoutException {        final ReentrantLock lock = this.lock;        lock.lock();//获取ReentrantLock互斥锁        try {            final Generation g = generation;//获取generation对象            if (g.broken)//如果generation损坏,抛出异常                throw new BrokenBarrierException();            if (Thread.interrupted()) {                //如果当前线程被中断,则调用breakBarrier方法,停止CyclicBarrier,并唤醒所有线程                breakBarrier();                throw new InterruptedException();            }            int index = --count;// 看到这里了吧,count减1             //index=0,也就是说,有0个线程未满足CyclicBarrier条件,也就是条件满足,            //可以唤醒所有的线程了            if (index == 0) {  // tripped                boolean ranAction = false;                try {                   //这就是构造器的第二个参数,如果不为空的话,就执行这个Runnable的run方法,                   //你看,这里是执行的是run方法,也就是说,并没有新起一个另外的线程,                   //而是最后一个执行await操作的线程执行的这个run方法。                    final Runnable command = barrierCommand;                    if (command != null)                        command.run(); //同步执行barrierCommand                    ranAction = true;                    nextGeneration(); //执行成功设置下一个nextGeneration                    return 0;                } finally {                    if (!ranAction) . //如果barrierCommand执行失败,进行屏障破坏处理                        breakBarrier();                }            }            //如果当前线程不是最后一个到达的线程            // loop until tripped, broken, interrupted, or timed out            for (;;) {                try {                    if (!timed)                        trip.await(); //调用Condition的await()方法阻塞                    else if (nanos > 0L)                        nanos = trip.awaitNanos(nanos); //调用Condition的awaitNanos()方法阻塞                } catch (InterruptedException ie) {                //如果当前线程被中断,则判断是否有其他线程已经使屏障破坏。若没有则进行屏障破坏处理,并抛出异常;否则再次中断当前线程                    if (g == generation && ! g.broken) {                        breakBarrier();//执行breakBarrier,唤醒所有线程                        throw ie;                    } else {                        // We're about to finish waiting even if we had not                        // been interrupted, so this interrupt is deemed to                        // "belong" to subsequent execution.                        Thread.currentThread().interrupt();                    }                }                if (g.broken)//如果当前generation已经损坏,抛出异常                    throw new BrokenBarrierException();                if (g != generation)//如果generation已经更新换代,则返回index                    return index;                //如果是参数是超时等待,并且已经超时,则执行breakBarrier()方法                //唤醒所有等待线程。                if (timed && nanos <= 0L) {                    breakBarrier();                    throw new TimeoutException();                }            }        } finally {            lock.unlock();        }    }

简单来说,如果不发生异常,线程不被中断,那么dowait方法会调用Conditionawait方法(具体Condition的原理请看前面的文章),直到所有线程都准备好,即都执行了dowait方法,(做count的减操作,直到count=0),即CyclicBarrier条件已满足,就会执行唤醒线程操作,也就是上面的nextGeneration()方法。可能大家会有疑惑,这个Generation是什么东西呢?其实这个Generation定义的很简单,就一个布尔值的成员变量:

private Generation generation = new Generation();private static class Generation {    boolean broken = false;}

Generation 可以理解成“代”,我们要知道,CyclicBarrier是可以重复使用的,CyclicBarrier中的同一批线程属于同一“代”,当所有线程都满足了CyclicBarrier条件,执行唤醒操作nextGeneration()方法时,会新new 出一个Generation,代表一下“代”。

nextGeneration的源码

private void nextGeneration() {        // signal completion of last generation        trip.signalAll();//调用Condition的signalAll方法,唤醒所有await的线程        // set up next generation        count = parties;//重置count值        //生成新的Generation,表示上一代的所有线程已经唤醒,进行更新换代        generation = new Generation();     }

breakBarrier源码

再来看一下breakBarrier的代码,breakBarrier方法是在当前线程被中断时执行的,用来唤醒所有的等待线程:

private void breakBarrier() {        generation.broken = true;//表示当代因为线程被中断,已经发成损坏了        count = parties;//重置count值        trip.signalAll();//调用Condition的signalAll方法,唤醒所有await的线程    }

isBroken方法

public boolean isBroken() {        final ReentrantLock lock = this.lock;        lock.lock();        try {            return generation.broken;        } finally {            lock.unlock();        }    }

判断此屏障是否处于中断状态。如果因为构造或最后一次重置而导致中断或超时,从而使一个或多个参与者摆脱此屏障点,或者因为异常而导致某个屏障操作失败,则返回true;否则返回false

reset方法

//将屏障重置为其初始状态。    public void reset() {        final ReentrantLock lock = this.lock;        lock.lock();        try {            //唤醒所有等待的线程继续执行,并设置屏障中断状态为true            breakBarrier();   // break the current generation            //唤醒所有等待的线程继续执行,并设置屏障中断状态为false            nextGeneration(); // start a new generation        } finally {            lock.unlock();        }    }

getNumberWaiting方法

//返回当前在屏障处等待的参与者数目,此方法主要用于调试和断言。    public int getNumberWaiting() {        final ReentrantLock lock = this.lock;        lock.lock();        try {            return parties - count;        } finally {            lock.unlock();        }    }

“Java多线程怎么使用同步工具类CyclicBarrier”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程网网站,小编将为大家输出更多高质量的实用文章!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯