一、让线程顺序运行的11种方法
1 方法说明
- 使用线程的join方法
- 使用主线程的join方法
- 使用线程的wait方法
- 使用线程的线程池方法
- 使用线程的Condition(条件变量)方法
- 使用CountDownLatch(倒计数)的方法
- 使用线程的CyclicBarrier(回环栅栏)方法
- 使用线程的Semaphore(信号量)方法
- 使用LockSupport的park与unpark方法
- 使用阻塞队列的put与take方法
- 使用CAS思想来完成多线程的顺序执行
AtomicReference
2 实现
2.1 使用线程的join方法
**join()😗*是Theard的方法,作用是调用线程需等待该join()线程执行完成后,才能继续用下运行。
**应用场景:**当一个线程必须等待另一个线程执行完毕才能执行时可以使用join方法。
2.1.1 实现代码
package cn.lyf.leetcode.thread.demo;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.TimeUnit;@Slf4jpublic class ThreadDemo1 { public static void main(String[] args) { Thread threadA = new Thread(() -> { log.info("start..."); // 模拟业务执行时长 sleep(100); log.info("end..."); }, "thread-a"); Thread threadB = new Thread(() -> { try { threadA.join(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("start..."); // 模拟业务执行时长 sleep(100); log.info("end..."); }, "thread-b"); Thread threadC = new Thread(() -> { try { threadB.join(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("start..."); // 模拟业务执行时长 sleep(100); log.info("end..."); }, "thread-c"); threadA.start(); threadB.start(); threadC.start(); } public static void sleep(long timeOut) { sleep(timeOut, TimeUnit.MILLISECONDS); } public static void sleep(long timeOut, TimeUnit unit) { try { TimeUnit.MILLISECONDS.sleep(unit.toMillis(timeOut)); } catch (InterruptedException e) { log.error("", e); } }}
2.1.2 运行结果
2023-04-11 09:48:23 [ thread-a ] INFO ThreadDemo1:39 - start...2023-04-11 09:48:23 [ thread-a ] INFO ThreadDemo1:42 - end...2023-04-11 09:48:23 [ thread-b ] INFO ThreadDemo1:51 - start...2023-04-11 09:48:23 [ thread-b ] INFO ThreadDemo1:54 - end...2023-04-11 09:48:23 [ thread-c ] INFO ThreadDemo1:63 - start...2023-04-11 09:48:23 [ thread-c ] INFO ThreadDemo1:66 - end...
2.2 使用主线程的join方法
在主线程中调用线程的join方法
2.2.1 实现代码
package cn.lyf.leetcode.thread.demo;import lombok.extern.slf4j.Slf4j;import static cn.lyf.leetcode.thread.demo.ThreadDemo1.sleep;@Slf4jpublic class ThreadDemo2 { public static void main(String[] args) throws Exception { Thread threadA = new Thread(() -> { log.info("start..."); // 模拟业务执行时长 sleep(100); log.info("end..."); }, "thread-a"); Thread threadB = new Thread(() -> { log.info("start..."); // 模拟业务执行时长 sleep(100); log.info("end..."); }, "thread-b"); Thread threadC = new Thread(() -> { log.info("start..."); // 模拟业务执行时长 sleep(100); log.info("end..."); }, "thread-c"); threadA.start(); threadA.join(); threadB.start(); threadB.join(); threadC.start(); }}
2.2.2 运行结果
2023-04-11 09:54:07 [ thread-a ] INFO ThreadDemo2:36 - start...2023-04-11 09:54:07 [ thread-a ] INFO ThreadDemo2:39 - end...2023-04-11 09:54:07 [ thread-b ] INFO ThreadDemo2:43 - start...2023-04-11 09:54:07 [ thread-b ] INFO ThreadDemo2:46 - end...2023-04-11 09:54:07 [ thread-c ] INFO ThreadDemo2:50 - start...2023-04-11 09:54:07 [ thread-c ] INFO ThreadDemo2:53 - end...
2.3 使用线程的wait方法
2.3.1 实现代码
package cn.lyf.leetcode.thread.demo;import lombok.extern.slf4j.Slf4j;import static cn.lyf.leetcode.thread.demo.ThreadDemo1.sleep;@Slf4jpublic class ThreadDemo3 { static final Object LOCK1 = new Object(); static final Object LOCK2 = new Object(); static boolean isRun1 = false; static boolean isRun2 = false; public static void main(String[] args) { new Thread(() -> { synchronized (LOCK1) { log.info("start..."); sleep(100); log.info("end..."); isRun1 = true; LOCK1.notify(); } }, "thread-a").start(); new Thread(() -> { synchronized (LOCK1) { while (!isRun1) { try { LOCK1.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } synchronized (LOCK2) { log.info("start..."); sleep(100); log.info("end..."); isRun2 = true; LOCK2.notify(); } } }, "thread-b").start(); new Thread(() -> { synchronized (LOCK2) { while (!isRun2) { try { LOCK2.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.info("start..."); sleep(100); log.info("end..."); } }, "thread-c").start(); }}
2.3.2 运行结果
2023-04-11 10:09:54 [ thread-a ] INFO ThreadDemo3:47 - start...2023-04-11 10:09:54 [ thread-a ] INFO ThreadDemo3:49 - end...2023-04-11 10:09:54 [ thread-b ] INFO ThreadDemo3:65 - start...2023-04-11 10:09:54 [ thread-b ] INFO ThreadDemo3:67 - end...2023-04-11 10:09:54 [ thread-c ] INFO ThreadDemo3:83 - start...2023-04-11 10:09:54 [ thread-c ] INFO ThreadDemo3:85 - end...
2.4 使用线程的线程池方法
JAVA通过Executors提供了四种线程池
- 单线程化线程池(newSingleThreadExecutor);
- 可控最大并发数线程池(newFixedThreadPool);
- 可回收缓存线程池(newCachedThreadPool);
- 支持定时与周期性任务的线程池(newScheduledThreadPool)。
**单线程化线程池(newSingleThreadExecutor)😗*优点,串行执行所有任务。
**submit():**提交任务。
**shutdown():**方法用来关闭线程池,拒绝新任务。
**应用场景:**串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
2.4.1 实现代码
package cn.lyf.leetcode.thread.demo;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import static cn.lyf.leetcode.thread.demo.ThreadDemo1.sleep;@Slf4jpublic class ThreadDemo4 { public static void main(String[] args) { ExecutorService poolExecutor = Executors.newSingleThreadExecutor(); poolExecutor.submit(() -> { log.info("threadA start..."); sleep(100); log.info("threadA end..."); }); poolExecutor.submit(() -> { log.info("threadB start..."); sleep(100); log.info("threadB end..."); }); poolExecutor.submit(() -> { log.info("threadC start..."); sleep(100); log.info("threadC end..."); }); poolExecutor.shutdown(); }}
2.4.2 运行结果
2023-04-11 10:27:11 [ pool-1-thread-1 ] INFO ThreadDemo4:41 - threadA start...2023-04-11 10:27:11 [ pool-1-thread-1 ] INFO ThreadDemo4:43 - threadA end...2023-04-11 10:27:11 [ pool-1-thread-1 ] INFO ThreadDemo4:47 - threadB start...2023-04-11 10:27:11 [ pool-1-thread-1 ] INFO ThreadDemo4:49 - threadB end...2023-04-11 10:27:11 [ pool-1-thread-1 ] INFO ThreadDemo4:53 - threadC start...2023-04-11 10:27:11 [ pool-1-thread-1 ] INFO ThreadDemo4:55 - threadC end...
2.5 使用线程的Condition(条件变量)方法
**Condition(条件变量)😗*通常与一个锁关联。需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。
- Condition中await()方法类似于Object类中的wait()方法。
- Condition中await(long time,TimeUnit unit)方法类似于Object类中的wait(long time)方法。
- Condition中signal()方法类似于Object类中的notify()方法。
- Condition中signalAll()方法类似于Object类中的notifyAll()方法。
**应用场景:**Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备( signal 或者 signalAll方法被调用)时 ,这些等待线程才会被唤醒,从而重新争夺锁。
2.5.1 实现代码
package cn.lyf.leetcode.thread.demo;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;import static cn.lyf.leetcode.thread.demo.ThreadDemo1.sleep;@Slf4jpublic class ThreadDemo5 { static final Lock LOCK = new ReentrantLock(); static final Condition CONDITION_A = LOCK.newCondition(); static final Condition CONDITION_B = LOCK.newCondition(); static boolean isRunA = false; static boolean isRunB = false; public static void main(String[] args) { new Thread(() -> { // 加锁 LOCK.lock(); log.info("start..."); try { sleep(100); } catch (Exception e) { log.error("", e); } finally { isRunA = true; CONDITION_A.signal(); // 解锁 LOCK.unlock(); log.info("end..."); } }, "thread-a").start(); new Thread(() -> { // 加锁 LOCK.lock(); while (!isRunA) { try { CONDITION_A.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.info("start..."); try { sleep(100); } catch (Exception e) { log.error("", e); } finally { isRunB = true; CONDITION_B.signal(); // 解锁 LOCK.unlock(); log.info("end..."); } }, "thread-b").start(); new Thread(() -> { // 加锁 LOCK.lock(); while (!isRunB) { // 这里根据状态码,来使thread-c处于一直等待状态 try { CONDITION_B.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.info("start..."); try { sleep(100); } catch (Exception e) { log.error("", e); } finally { // 解锁 LOCK.unlock(); log.info("end..."); } }, "thread-c").start(); }}
2.5.2 运行结果
2023-04-11 10:34:06 [ thread-a ] INFO ThreadDemo5:48 - start...2023-04-11 10:34:06 [ thread-a ] INFO ThreadDemo5:58 - end...2023-04-11 10:34:06 [ thread-b ] INFO ThreadDemo5:72 - start...2023-04-11 10:34:06 [ thread-b ] INFO ThreadDemo5:82 - end...2023-04-11 10:34:06 [ thread-c ] INFO ThreadDemo5:96 - start...2023-04-11 10:34:06 [ thread-c ] INFO ThreadDemo5:104 - end...
2.6 使用线程的CountDownLatch(倒计数)方法
**CountDownLatch:**位于java.util.concurrent包下,利用它可以实现类似计数器的功能。
**应用场景:**比如有一个任务C,它要等待其他任务A,B执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。
2.6.1 实现代码
package cn.lyf.leetcode.thread.demo;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CountDownLatch;import static cn.lyf.leetcode.thread.demo.ThreadDemo1.sleep;@Slf4jpublic class ThreadDemo6 { public static void main(String[] args) { CountDownLatch countDownLatchA = new CountDownLatch(1); CountDownLatch countDownLatchB = new CountDownLatch(1); new Thread(() -> { log.info("start..."); sleep(100); log.info("end..."); countDownLatchA.countDown(); }, "thread-a").start(); new Thread(() -> { try { countDownLatchA.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("start..."); sleep(100); log.info("end..."); countDownLatchB.countDown(); }, "thread-b").start(); new Thread(() -> { try { countDownLatchB.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("start..."); sleep(100); log.info("end..."); }, "thread-c").start(); }}
2.6.2 运行结果
2023-04-11 10:37:24 [ thread-a ] INFO ThreadDemo6:40 - start...2023-04-11 10:37:24 [ thread-a ] INFO ThreadDemo6:42 - end...2023-04-11 10:37:24 [ thread-b ] INFO ThreadDemo6:52 - start...2023-04-11 10:37:25 [ thread-b ] INFO ThreadDemo6:54 - end...2023-04-11 10:37:25 [ thread-c ] INFO ThreadDemo6:64 - start...2023-04-11 10:37:25 [ thread-c ] INFO ThreadDemo6:66 - end...
2.7 使用线程的CyclicBarrier(回环栅栏)方法
**CyclicBarrier(回环栅栏)😗*通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。
**应用场景:**公司组织春游,等待所有的员工到达集合地点才能出发,每个人到达后进入barrier状态。都到达后,唤起大家一起出发去旅行。
2.7.1 实现代码
package cn.lyf.leetcode.thread.demo;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;import static cn.lyf.leetcode.thread.demo.ThreadDemo1.sleep;@Slf4jpublic class ThreadDemo7 { public static void main(String[] args) { CyclicBarrier cyclicBarrier1 = new CyclicBarrier(2); CyclicBarrier cyclicBarrier2 = new CyclicBarrier(2); new Thread(() -> { log.info("start..."); sleep(100); log.info("end..."); try { cyclicBarrier1.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }, "thread-a").start(); new Thread(() -> { try { cyclicBarrier1.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } log.info("start..."); sleep(100); log.info("end..."); try { cyclicBarrier2.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }, "thread-b").start(); new Thread(() -> { try { cyclicBarrier2.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } log.info("start..."); sleep(100); log.info("end..."); }, "thread-c").start(); }}
2.7.2 运行结果
2023-04-11 10:47:37 [ thread-a ] INFO ThreadDemo7:41 - start...2023-04-11 10:47:37 [ thread-a ] INFO ThreadDemo7:43 - end...2023-04-11 10:47:37 [ thread-b ] INFO ThreadDemo7:57 - start...2023-04-11 10:47:38 [ thread-b ] INFO ThreadDemo7:59 - end...2023-04-11 10:47:38 [ thread-c ] INFO ThreadDemo7:75 - start...2023-04-11 10:47:38 [ thread-c ] INFO ThreadDemo7:77 - end...
2.8 使用Sephmore(信号量)实现线程按顺序运行
**Sephmore(信号量)😗*Semaphore是一个计数信号量,从概念上将,Semaphore包含一组许可证,如果有需要的话,每个acquire()方法都会阻塞,直到获取一个可用的许可证,每个release()方法都会释放持有许可证的线程,并且归还Semaphore一个可用的许可证。然而,实际上并没有真实的许可证对象供线程使用,Semaphore只是对可用的数量进行管理维护。
**acquire()😗*当前线程尝试去阻塞的获取1个许可证,此过程是阻塞的,当前线程获取了1个可用的许可证,则会停止等待,继续执行。
**release()😗*当前线程释放1个可用的许可证。
**应用场景:**Semaphore可以用来做流量分流,特别是对公共资源有限的场景,比如数据库连接。假设有这个的需求,读取几万个文件的数据到数据库中,由于文件读取是IO密集型任务,可以启动几十个线程并发读取,但是数据库连接数只有10个,这时就必须控制最多只有10个线程能够拿到数据库连接进行操作。这个时候,就可以使用Semaphore做流量控制。
2.8.1 实现代码
package cn.lyf.leetcode.thread.demo;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.Semaphore;import static cn.lyf.leetcode.thread.demo.ThreadDemo1.sleep;@Slf4jpublic class ThreadDemo8 { public static void main(String[] args) { // 这里一定要将permits设置为0,只有设置为0 thread-a才会先执行,thread-b才会后执行 Semaphore semaphore1 = new Semaphore(0); // 同上 Semaphore semaphore2 = new Semaphore(0); new Thread(() -> { log.info("start..."); sleep(100); log.info("end..."); semaphore1.release(); }, "thread-a").start(); new Thread(() -> { try { semaphore1.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("start..."); sleep(100); log.info("end..."); semaphore1.release(); semaphore2.release(); }, "thread-b").start(); new Thread(() -> { try { semaphore2.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("start..."); sleep(100); log.info("end..."); semaphore2.release(); }, "thread-c").start(); }}
2.8.2 运行结果
2023-04-11 11:00:58 [ thread-a ] INFO ThreadDemo8:42 - start...2023-04-11 11:00:58 [ thread-a ] INFO ThreadDemo8:44 - end...2023-04-11 11:00:58 [ thread-b ] INFO ThreadDemo8:54 - start...2023-04-11 11:00:58 [ thread-b ] INFO ThreadDemo8:56 - end...2023-04-11 11:00:58 [ thread-c ] INFO ThreadDemo8:67 - start...2023-04-11 11:00:58 [ thread-c ] INFO ThreadDemo8:69 - end...
2.9 使用LockSupport的park与unpark方法
java.util.concurrent.locks.LockSupport#park()
java.util.concurrent.locks.LockSupport#unpark(Thread thread)
- park:方法是使当前线程暂停运行
- unpark:方法是使指定线程恢复运行
2.9.1 实现代码
package cn.lyf.leetcode.thread.demo;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.locks.LockSupport;import static cn.lyf.leetcode.thread.demo.ThreadDemo1.sleep;@Slf4jpublic class ThreadDemo9 { static volatile boolean isRunA = false; static volatile boolean isRunB = false; public static void main(String[] args) { Thread threadC = new Thread(() -> { while (!isRunB) { LockSupport.park(); } log.info("start..."); // 模拟业务执行时长 sleep(100); log.info("end..."); }, "thread-c"); Thread threadB = new Thread(() -> { while (!isRunA) { LockSupport.park(); } log.info("start..."); // 模拟业务执行时长 sleep(100); log.info("end..."); isRunB = true; LockSupport.unpark(threadC); }, "thread-b"); Thread threadA = new Thread(() -> { log.info("start..."); // 模拟业务执行时长 sleep(100); log.info("end..."); isRunA = true; LockSupport.unpark(threadB); }, "thread-a"); threadC.start(); threadB.start(); threadA.start(); }}
2.9.2 运行结果
2023-04-11 11:25:20 [ thread-a ] INFO ThreadDemo9:56 - start...2023-04-11 11:25:20 [ thread-a ] INFO ThreadDemo9:59 - end...2023-04-11 11:25:20 [ thread-b ] INFO ThreadDemo9:46 - start...2023-04-11 11:25:20 [ thread-b ] INFO ThreadDemo9:49 - end...2023-04-11 11:25:20 [ thread-c ] INFO ThreadDemo9:36 - start...2023-04-11 11:25:20 [ thread-c ] INFO ThreadDemo9:39 - end...
2.10 使用阻塞队列的put与take方法
阻塞队列的常用API
方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, timeOut, unit) |
移除 | remove() | poll() | take() | poll(timeOut, unit) |
检查 | element() | peek() | 不可用 | 不可用 |
本例主要是使用take操作的阻塞作用
2.10.1 实现代码
package cn.lyf.leetcode.thread.demo;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import static cn.lyf.leetcode.thread.demo.ThreadDemo1.sleep;@Slf4jpublic class ThreadDemo10 { static volatile boolean isRunA = false; static volatile boolean isRunB = false; static BlockingQueue<Object> queue1 = new LinkedBlockingQueue<>(); static BlockingQueue<Object> queue2 = new LinkedBlockingQueue<>(); public static void main(String[] args) { final Object obj = new Object(); Thread threadA = new Thread(() -> { log.info("start..."); // 模拟业务执行时长 sleep(100); log.info("end..."); isRunA = true; try { queue1.put(obj); } catch (InterruptedException e) { e.printStackTrace(); } }, "thread-a"); Thread threadB = new Thread(() -> { while (!isRunA) { try { queue1.take(); } catch (InterruptedException e) { e.printStackTrace(); } } log.info("start..."); // 模拟业务执行时长 sleep(100); log.info("end..."); isRunB = true; try { queue2.put(obj); } catch (InterruptedException e) { e.printStackTrace(); } }, "thread-b"); Thread threadC = new Thread(() -> { while (!isRunB) { try { queue2.take(); } catch (InterruptedException e) { e.printStackTrace(); } } log.info("start..."); // 模拟业务执行时长 sleep(100); log.info("end..."); }, "thread-c"); threadC.start(); threadB.start(); threadA.start(); }}
2.10.2 运行结果
2023-04-11 12:19:10 [ thread-a ] INFO ThreadDemo10:32 - start...2023-04-11 12:19:10 [ thread-a ] INFO ThreadDemo10:35 - end...2023-04-11 12:19:10 [ thread-b ] INFO ThreadDemo10:52 - start...2023-04-11 12:19:10 [ thread-b ] INFO ThreadDemo10:55 - end...2023-04-11 12:19:10 [ thread-c ] INFO ThreadDemo10:73 - start...2023-04-11 12:19:10 [ thread-c ] INFO ThreadDemo10:76 - end...
2.11 使用CAS思想来实现线程的顺序运行(AtomicReference
)
compare and swap的缩写,中文翻译成比较并交换, 实现并发算法时常用到的一种技术。它包含三个操作数——内存位置、预期原值及更新值。
执行CAS操作的时候,将内存位置的值与预期原值比较:
如果相匹配,那么处理器会自动将该位置值更新为新值,
如果不匹配,处理器不做任何操作,多个线程同时执行CAS操作只有一个会成功。
2.11.1 实现代码
package cn.lyf.leetcode.thread.demo;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.atomic.AtomicReference;import static cn.lyf.leetcode.thread.demo.ThreadDemo1.sleep;@Slf4jpublic class ThreadDemo11 { public static void main(String[] args) { AtomicReference<Thread> threadAtomicReference = new AtomicReference<>(); Thread threadA = new Thread(() -> { log.info("start..."); // 模拟业务执行时长 sleep(100); log.info("end..."); threadAtomicReference.set(Thread.currentThread()); }, "thread-a"); Thread threadB = new Thread(() -> { while (!threadAtomicReference.compareAndSet(threadA, null)) { sleep(1); } log.info("start..."); // 模拟业务执行时长 sleep(100); log.info("end..."); threadAtomicReference.set(Thread.currentThread()); }, "thread-b"); Thread threadC = new Thread(() -> { while (!threadAtomicReference.compareAndSet(threadB, null)) { sleep(1); } log.info("start..."); // 模拟业务执行时长 sleep(100); log.info("end..."); }, "thread-c"); threadC.start(); threadB.start(); threadA.start(); }}
2.11.2 运行结果
2023-04-11 14:06:32 [ thread-a ] INFO ThreadDemo11:23 - start...2023-04-11 14:06:32 [ thread-a ] INFO ThreadDemo11:26 - end...2023-04-11 14:06:32 [ thread-b ] INFO ThreadDemo11:34 - start...2023-04-11 14:06:32 [ thread-b ] INFO ThreadDemo11:37 - end...2023-04-11 14:06:32 [ thread-c ] INFO ThreadDemo11:45 - start...2023-04-11 14:06:33 [ thread-c ] INFO ThreadDemo11:48 - end...