java并发编程实战和并发编程的艺术,java并发编程教程
墨初 知识笔记 97阅读
list new ArrayList<>(); for (int i 0; i < threadCount; i) { final int threadNo i; Thread thread new Thread(() -> { System.out.println(当前线程- Thread.currentThread().getName(): 开始); int j 0; while (j < loopCount) { try { Thread.sleep(100); } catch (InterruptedException e) { } consumer.accept(Thread.currentThread()); } System.out.println(当前线程- Thread.currentThread().getName(): 结束); }, threadName : i); thread.start(); list.add(thread); } return list; } public static void wait(long gap , Runnable runnable) { while (true) { try { Thread.sleep(gap); runnable.run(); } catch (InterruptedException e) { throw new RuntimeException(e); } } }} synchronized关键字
和其他的线程安全技术一样synchronized
关键字的作用也是为了保障数据的原子性、可见性和有序性只是相比于其他技术synchronized
资历更老历史更久而且也更基础基本上我们在学习线程相关内容的时候就会学习这个关键字。
特点

在用法上synchronized
关键字可以修饰变量、方法和代码块修饰不同的对象最终产生的影响范围也有所不同下面我们通过一些简单示例来看下synchronized修饰不同的对象所产生的效果
加在方法上默认的共享锁变量是当前对象实例this
public synchronized void updateSafe(int value) { int sum this.value value; try { Thread.sleep(10); } catch (InterruptedException e) { throw new RuntimeException(e); } this.value sum; }
加载代码块中

public void updateSafeBlock(int value) { synchronized (this) { int sum this.value value; try { Thread.sleep(10); } catch (InterruptedException e) { throw new RuntimeException(e); } this.value sum; } }
ReentrantLock 相对于synchronized
它具备如下特点
ReentrantLock是可重入的互斥锁虽然具有与synchronized相同功能但是会比synchronized更加灵活
基本语法 // 获取ReentrantLock对象 ReentrantLock lock new ReentrantLock(); public void saveSafe(int money) { // 加锁 获取不到锁一直等待直到获取锁 lock.lock(); try { // 临界区 save(money); // 需要执行的代码 } finally { // 释放锁 如果不释放其他线程就获取不到锁 lock.unlock(); } }
可中断 lockInterruptibly()
synchronized
和 reentrantlock.lock()
的锁, 是不可被打断的; 也就是说别的线程已经获得了锁, 线程就需要一直等待下去不能中断直到获得到锁才运行。
一个线程等待锁的时候是可以被中断。通过中断异常报出。处理中断使其继续执行逻辑
Test public void testLockInterupted() { ReentrantLock lock new ReentrantLock(); // 主线程获取锁 lock.lock(); AtomicReference<Thread> t1 new AtomicReference<>(); SimpleThreadUtils.newLoopThread(2, Kx, 1000, (t) -> { if (t.getName().contains(0)) { if (t1.get() null){ t1.set(t); } try { System.out.println(获取锁 线程 t.getName() :::); // 获取锁 阻塞 科中断 lock.lockInterruptibly(); // 不可中断 lock.lock(); System.out.println(获取锁成功 线程 t.getName() :::); }catch (InterruptedException e){ System.out.println( 被打断 线程 t.getName() :::); } } else { if (t1.get() ! null) { t1.get().interrupt(); System.out.println(中断 线程 t.getName() :::); }else { System.out.println(中断 线程 没有准备 t.getName() :::); } } }); SimpleThreadUtils.wait(10000, () -> { System.out.println(监控 reentrantLockDemo.getMoney()); }); }
设置超时时间 tryLock() 使用 lock.tryLock() 方法会返回获取锁是否成功。如果成功则返回true反之则返回false。
并且tryLock方法可以设置指定等待时间参数为tryLock(long timeout, TimeUnit unit) , 其中timeout为最长等待时间TimeUnit为时间单位
获取锁的过程中, 如果超过等待时间, 或者被打断, 就直接从阻塞队列移除, 此时获取锁就失败了, 不会一直阻塞着 ! (可以用来实现死锁问题)
tryLock适合我们再预期判断是否可以获得锁的前提下进行业务处理。而不需要一直等待占用线程资源。
当我们tryLock的时候表示我们试着获取锁如果已经被其他线程占用那么就可以直接跳过处理提示用户资源被处理中。
Test public void testLockTryLock() { ReentrantLock lock new ReentrantLock();// SimpleThreadUtils. SimpleThreadUtils.newLoopThread(10, threadname, 1000, (t) -> { boolean b lock.tryLock(); try { try { if(b) { System.out.println(t.getName() 获取到锁处理一分钟); Thread.sleep(1000); }else { System.out.println(t.getName() 获取到锁失败); Thread.sleep(1000); } } catch (InterruptedException e) { } } finally { if (b) { lock.unlock(); } } }); SimpleThreadUtils.wait(10000, () -> { System.out.println(监控 reentrantLockDemo.getMoney()); }); }}
结果
当前线程-t:1: 开始当前线程-t:0: 开始当前线程-t:2: 开始t:1获取到锁失败t:2获取到锁处理一分钟t:0获取到锁失败t:0获取到锁处理一分钟t:2获取到锁失败t:1获取到锁失败t:2获取到锁失败t:0获取到锁失败t:1获取到锁处理一分钟
公平锁 new ReentrantLock(true) ReentrantLoc
k默认是非公平锁
, 可以指定为公平锁new ReentrantLock(true)
。
在线程获取锁失败进入阻塞队列时先进入的会在锁被释放后先获得锁。这样的获取方式就是公平的。
一般不设置ReentrantLock为公平的, 会降低并发度.
Synchronized底层的Monitor锁就是不公平的, 和谁先进入阻塞队列是没有关系的。
传统对象等待集合只有一个 waitSet Lock可以通过newCondition()方法 生成多个等待集合Condition对象。 Lock和Condition 是一对多的关系
配合方法 await()
和signal()
使用流程
await
前需要 获得锁
await 执行后会释放锁进入 conditionObject (条件变量)中等待await 的线程被唤醒或打断、或超时取重新竞争 lock 锁竞争 lock 锁成功后从 await 后继续执行signal 方法用来唤醒条件变量(等待室)汇总的某一个等待的线程signalAll方法, 唤醒条件变量(休息室)中的所有线程 Test public void testCondition() { ReentrantLock lock new ReentrantLock(); // 等待 A条件变量 Condition ac lock.newCondition(); // 等外 B条件变量 Condition bc lock.newCondition(); SimpleThreadUtils.newLoopThread(2, t, 1000, (t) -> { try { print(Lock..................); lock.lock(); if (t.getNo() 0){ print(等待...); ac.await(); print(等待结束..); }else if(t.getNo() 1){ print(通知取消等待...); ac.signal(); print(通知取消等待执行...); } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } }); SimpleThreadUtils.wait(10000, () -> { }); }
ReadWriteLock ReadWriteLock也是一个接口提供了readLock和writeLock两种锁的操作机制一个资源可以被多个线程同时读或者被一个线程写但是不能同时存在读和写线程。
读锁共享锁 readLock
写锁 独占锁 writeLock
读写锁一定要注意几点。
一定要记得读锁和写锁之间的竞争关系。只有读锁与读锁之间是不存在竞争其他都会产生锁竞争锁阻塞。对于读锁而言由于同一时刻可以允许多个线程访问共享资源进行读操作因此称它为共享锁而对于写锁而言同一时刻只允许一个线程访问共享资源进行写操作因此称它为排他锁。记住锁的通用范式一定要释放锁public class ReadWriteLockDemo { private ReadWriteLock readWriteLock new ReentrantReadWriteLock(); private int value; public int getValue() { System.out.println(读: value); return value; } public void update(int value) { int sum this.value value; try { Thread.sleep(10); } catch (InterruptedException e) { } this.value sum; } public int read() { Lock lock readWriteLock.readLock(); try { lock.lock(); Thread.sleep(10); System.out.println(读锁after); return getValue(); } catch (InterruptedException e) { throw new IllegalArgumentException(); } finally { lock.unlock(); } } public void updateSafe(int value) { Lock lock readWriteLock.writeLock(); try { System.out.println(writeLockbefore); lock.lock(); Thread.sleep(1000); System.out.println(writeLockafter); update(value); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } }}
CountDownLatch CountDownLatch允许一个或多个线程等待其他线程完成操作。这就是一个计数器一样。
CountDownLatch定义了一个计数器和一个阻塞队列 当计数器的值递减为0之前阻塞队列里面的线程处于挂起状态当计数器递减到0时会唤醒阻塞队列所有线程这里的计数器是一个标志可以表示一个任务一个线程也可以表示一个倒计时器。
例子 游戏加载资源
只有5个资源线程同时加载完成游戏才能开始
public class CountDownLatchTest { Test public void test() throws InterruptedException { CountDownLatch countDownLatch new CountDownLatch(5); SimpleThreadUtils.newLoopThread(5, cd, 1, (t) -> { // 资源加载逻辑 int i t.getNo() * 1000; try { // 模拟加载时间 Thread.sleep(i); } catch (InterruptedException e) { } System.out.println(t.getName(): 花费了 i s); // 计数器-1 countDownLatch.countDown(); });// 阻塞等待 计数器为0. 所有资源加载完成 countDownLatch.await(); System.out.println(开始游戏); }}
CyclicBarrier CyclicBarrier可以理解为一个循环栅栏其实和CountDownLatch有一定的相同点。就是都是需要进行计数CyclicBarrier是等待所有人都准备就绪了才会进行下一步。不同点在于CyclicBarrier结束了一次计数之后会自动开始下一次计数。而CountDownLatch只能完成一次。
CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置那么栅栏将打开此时所有的线程都将被释放而栅栏将被重置以便下次使用。
CyclicBarrier内部使用了ReentrantLock和Condition两个类。它有两个构造函数
public CyclicBarrier(int parties) { this(parties, null);} public CyclicBarrier(int parties, Runnable barrierAction) { if (parties < 0) throw new IllegalArgumentException(); this.parties parties; this.count parties; this.barrierCommand barrierAction;}
CyclicBarrier默认的构造方法是CyclicBarrier(int parties)其参数表示屏障拦截的线程数量每个线程使用await()方法告诉CyclicBarrier我已经到达了屏障然后当前线程被阻塞。
CyclicBarrier的另一个构造函数CyclicBarrier(int parties, Runnable barrierAction)用于线程到达屏障时优先执行barrierAction方便处理更复杂的业务场景。
await方法调用await方法的线程告诉CyclicBarrier自己已经到达同步点然后当前线程被阻塞。直到parties个参与线程调用了await方法CyclicBarrier同样提供带超时时间的await和不带超时时间的await方法
Test public void test() throws InterruptedException { CyclicBarrier cyclicBarrier new CyclicBarrier(5); SimpleThreadUtils.newLoopThread(5, cd, 1, (t) -> { while (true) { int i t.getNo() * 1000; try { Thread.sleep(i); } catch (InterruptedException e) { } System.out.println( t.getName() : 准备好了); try { cyclicBarrier.await(); System.out.println( 冲。。。 t.getName()); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (BrokenBarrierException e) { throw new RuntimeException(e); } } }); SimpleThreadUtils.wait(10000,()->{}); }
Semaphore Semaphore主要是用来控制资源数量的工具可以理解为信号量。
初始化时给定信号量的个数其他线程可以来尝试获得许可当完成任务之后需要释放响应的许可。
这样同时并行/并发处理的数量最大为我们指定的数量。约束住同时访问资源的数量限制。
Test public void test() throws InterruptedException { Semaphore semaphore new Semaphore(5); SimpleThreadUtils.newLoopThread(10, cd, 10, (t) -> { try { semaphore.acquire(); int i t.getNo() * 1000; try { Thread.sleep(i); } catch (InterruptedException e) { } System.out.println(加载资源线程t.getName(): 花费了 i s); } catch (InterruptedException e) { throw new RuntimeException(e); }finally { semaphore.release(); } }); SimpleThreadUtils.wait(10000,()->{}); }
Exchanger Exchanger是一个用于线程间协作的工具类。它提供了一个交换的同步点在这个交换点两个线程能够交换数据。具体交换数据的方式是通过exchange函数实现的如果一个线程先执行exchange函数那么它会等待另一个线程也执行exchange方法。当两个线程都到达了同步交换点两个线程就可以交换数据。
两个人同时到达指定的地点然后留下信息exchanger把信息交换传输。
Test public void test() { Exchanger<String> exchanger new Exchanger<>(); SimpleThreadUtils.newLoopThread(2,ex,2,(t)->{ if (t.getNo() 0){ try { String exchange exchanger.exchange(我先到了你死定了); System.out.println(t.getName():exchange); } catch (InterruptedException e) { } }else { try { Thread.sleep(5000); String exchange exchanger.exchange(先到了吧去死吧); System.out.println(t.getName():exchange); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); SimpleThreadUtils.wait(1000,()->{}); }}
Phaser Phaser
是Java7
新引入的并发API
我们可以将Phaser的概念看成是一个个的阶段 每个阶段都需要执行的线程任务任务执行完毕后就进入下一阶段。这里和CyclicBarrier 和CountDownLatch的概念类似 实际上也确实可以用Phaser代替CyclicBarrier和CountDownLatch。
Phaser也是通过计数器来控制 在Phaser中叫parties 我们在指定了parties之后 Phaser可以根据需要动态增加或减少parties的值
register()//添加一个新的注册者bulkRegister(int parties)//添加指定数量的多个注册者arrive()// 到达栅栏点直接执行无须等待其他的线程arriveAndAwaitAdvance()//到达栅栏点必须等待其他所有注册者到达arriveAndDeregister()//到达栅栏点注销自己无须等待其他的注册者到达onAdvance(int phase, int registeredParties)//多个线程达到注册点之后会调用该方法。
结语 并发针对单核 CPU 而言它指的是 CPU 交替执行不同任务的能力并行针对多核 CPU 而言它指的是多个核心同时执行多个任务的能力。
单核 CPU 只能并发无法并行换句话说并行只可能发生在多核 CPU 中。
在多核 CPU 中并发和并行一般都会同时存在它们都是提高 CPU 处理任务能力的重要手段。