苍穹之边,浩瀚之挚,眰恦之美; 悟心悟性,善始善终,惟善惟道! —— 朝槿《朝槿兮年说》
让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:域名注册、网站空间、营销软件、网站建设、湖北网站维护、网站推广。
在并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。
主要原因是,对于多线程实现实现并发,一直以来,多线程都存在2个问题:
因此,在并发编程领域中,一直有一个很重要的设计原则: “ 不要通过内存共享来实现通信,而应该通过通信来实现内存共享。”
简单来说,就是尽可能通过消息通信,而不是内存共享来实现进程或者线程之间的同步。
本文用到的一些关键词语以及常用术语,主要如下:
在Java领域中,我们可以将锁大致分为基于Java语法层面(关键词)实现的锁和基于JDK层面实现的锁。
在Java领域中, 尤其是在并发编程领域,对于多线程并发执行一直有两大核心问题:同步和互斥。其中:
针对对于这两大核心问题,利用管程是能够解决和实现的,因此可以说,管程是并发编程的万能钥匙。
虽然,Java在基于语法层面(synchronized 关键字)实现了对管程技术,但是从使用方式和性能上来说,内置锁(synchronized 关键字)的粒度相对过大,不支持超时和中断等问题。
为了弥补这些问题,从JDK层面对其“重复造轮子”,在JDK内部对其重新设计和定义,甚至实现了新的特性。
在Java领域中,从JDK源码分析来看,基于JDK层面实现的锁大致主要可以分为以下4种方式:
从阅读源码不难发现,在Java SDK 并发包主要通过AbstractQueuedSynchronizer(AQS)实现多线程同步机制的封装与定义,而通过Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。
在Java领域中,同步器是专门为多线程并发设计的同步机制,主要是多线程并发执行时线程之间通过某种共享状态来实现同步,只有当状态满足这种条件时线程才往下执行的一种同步机制。
一个标准的AQS同步器主要有同步状态机制,等待队列,条件队列,独占模式,共享模式等五大核心要素组成。
在Java领域中,JDK的JUC(java.util.concurrent.)包中提供了各种并发工具,但是大部分同步工具的实现基于AbstractQueuedSynchronizer类实现,其内部结构主要如下:
我们可以得到一个比较通用的并发同步工具基础模型,大致包含如下几个内容,其中:
综上所述,条件变量和等待队列的作用是解决线程之间的同步问题;共享变量与阻塞队列的作用是解决线程之间的互斥问题。
在并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。
综合Java领域中的并发锁的各种实现与应用分析来看,一把锁或者一种锁,基本上都会包含以下几个方面:
综上所述,大致可以根据上述这些方向,我们便可以清楚????️知道Java领域中各种锁实现的基本理论时和实现思想。
在Java领域中,CountDownLatch(闭锁)是针对于Java多线程并发控制中倒计数器的具体数量,主要是采用递减计数方式的倒计数器思想和基于AQS基础同步器来实现的一种同步器工具类。
CountDownLatch(闭锁)是Java多线程并发中最常见的一种同步器,从锁的性质上来看,属于共享锁,其功能相当于一个多线程环境下的倒数门闩。
CountDownLatch通过定义一个倒计数器,在并发环境下由线程进行递减1操作,当计数值变为0之后,被await方法阻塞的线程将会唤醒。
通过CountDownLatch可以实现线程间的计数同步。
一般来说,通过定义一个倒计数器,为了让某个线程或者多个线程在某个运行节点上等待N个条件都满足后,才让所有的线程继续往下执行,其中倒计数器的数量则为N,每满足一个条件,倒计数器就依次逐渐递减1,直到N-1=0的时,所有等待的线程才往下继续执行。
CountDownLatch类最早是在JDK1.5版本提供的,从设计思想上来看,主要包括倒计数器的同步器,控制阻塞等待的方法,倒计数器的递减操作方法等3个核心要素。其中:
简单来说,CountDownLatch主要是让某个线程或者多个线程,等待其他线程完成某件事情或者某个任务结束之后才能继续执行。
在CountDownLatch类的JDK1.8版本中,对于CountDownLatch的基本实现如下:
public class CountDownLatch {
private final Sync sync;
/**
* CountDownLatch锁-构造一个倒计数器
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
/**
* CountDownLatch锁-基于AQS定义支持同步器实现
*/
private static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = - L;
//......其他方法代码
}
/**
* CountDownLatch锁-线程等待方法
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* CountDownLatch锁-倒计数器递减操作
*/
public void countDown() {
sync.releaseShared(1);
}
//... 其他代码
}
/**
* CountDownLatch锁-基于AQS同步器封装一个内部的同步器
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
/**
* CountDownLatch锁-获取共享锁方法
*/
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
/**
* CountDownLatch锁-释放共享锁方法
*/
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
public class CountDownLatch {
private final Sync sync;
/**
* CountDownLatch锁-基于AQS基础同步器实现一个内部同步器
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
/**
* CountDownLatch锁-构造一个倒计数器
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
/**
* CountDownLatch锁-基于AQS定义支持同步器实现
*/
private static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = - L;
//......其他方法代码
}
/**
* CountDownLatch锁-线程等待方法
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* CountDownLatch锁-返回当前计数器
*/
public long getCount() {
return sync.getCount();
}
/**
* CountDownLatch锁-线程等待方法(支持超时机制)
*/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* CountDownLatch锁-倒计数器递减操作
*/
public void countDown() {
sync.releaseShared(1);
}
}
综上所述,从一定意义上讲,CountDownLatch是一种共享锁,属于AQS基础抽象队列同步器中共享模式孵化的产物,没有支持公平模式与非公平模式的实现。
在Java领域中,CyclicBarrier(循环屏障)是针对于Java多线程并发控制中倒计数器的线程数量,主要是采用递减计数方式的倒计数器思想和基于AQS基础同步器实现的ReentrantLock锁来实现的一种同步器工具类。
CyclicBarrier(循环屏障)是Java中通过对线程预定义设置一个屏障,只有当到达屏障的线程数量到达指定的最大屏障时,屏障才会让这些线程通过执行。
从一定意义上来讲,这里的屏障本质上还是一个倒计数器,倒计数器的最大限度支持的数量就是我们为线程设置屏障大小,其工作原理与CountDownLatch(闭锁)类似,都是通过让线程阻塞等待时,倒计数器执行递减1运算。
但是与CountDownLatch不同是,CyclicBarrier(循环屏障)是基于ReentrantLock(可重入锁)来实现的,更准确的说,CyclicBarrier是对ReentrantLock的应用实例。
一般来说,通过定义一个倒计数器,为了让某个线程或者多个线程在某个运行节点上约束N个线程,需要让指定数量的线程共同到达某一个节点之后,这些线程才会一起被执行。
CyclicBarrier(循环屏障)最早是在JDK1.5版本中提供的,从设计思想上来看,主要包括倒计数器的最大屏障,控制阻塞等待的方法,倒计数器的递减操作方法,和触发点线程任务等4个核心要素。其中:
简单来说,CyclicBarrier(循环屏障)是让多个线程互相等待,直到达到一个同步的运行节点。再继续一起执行。
在CyclicBarrier类的JDK1.8版本中,对于CountDownLatch的基本实现如下:
public class CyclicBarrier {
/** CyclicBarrier锁—屏障lock实体 */
private final ReentrantLock lock = new ReentrantLock();
/** CyclicBarrier锁—屏障条件队列 */
private final Condition trip = lock.newCondition();
/** CyclicBarrier锁—屏障最大值 */
private final int parties;
/** CyclicBarrier锁—屏障触发线程任务目标 */
private final Runnable barrierCommand;
/** CyclicBarrier锁—当前计数器的最大值屏障实例 */
private Generation generation = new Generation();
/** CyclicBarrier锁—当前计数器的最大值屏障实例 */
private int count;
/** CyclicBarrier锁—屏障实例 */
private static class Generation {
boolean broken = false;
}
/** CyclicBarrier锁—构造一个屏障实例(不带触发任务的) */
public CyclicBarrier(int parties) {
this(parties, null);
}
/** CyclicBarrier锁—构造一个屏障实例(带触发任务的) */
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
/** CyclicBarrier锁—无参数构造一个等待方法(默认模式) */
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
/** CyclicBarrier锁—有参数构造一个等待方法(支持超时机制) */
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
/** CyclicBarrier锁—更新状态变量 */
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
/** CyclicBarrier锁—阻塞屏障 */
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
//...其他代码
}
�一般来说,假设我们允许控制的最大线程数量为N,预定义设置屏障最大值为Parties(N), 当前屏障的线程数量为Current(N) ,当前屏障中的等待线程数量为Waiting(N),那么我们会得到一个计算公式:
private static class Generation {
boolean broken = false;
}
主要是构造了一个静态私有化的Generation类,其中定义了一个broken变量来作为屏障标记,默认初始值为false,表示还没达到屏障最大值。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// [1].实例化构建ReentrantLock的对象
final ReentrantLock lock = this.lock;
// [2].通过lock()获取锁或者说加锁操作
lock.lock();
try {
// [3].实例化构建Generation屏障实例对象
final Generation g = generation;
// [4].判断Generation屏障实例标记状态
if (g.broken)
throw new BrokenBarrierException();
// [5].判断Thread是包含中断标志位
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// [6].对倒计数器的屏障数量递减1运算
int index = --count;
// [7].依据结果index == 0表示当前指定的线程数量到达屏障最大值,需要触发Runnable任务
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 进行下一轮屏障设置
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// [7].自旋操作
for (;;) {
try {
// 判断是否超时
if (!timed)
trip.await();
else if (nanos > 0L)
// 进行下一轮屏障设置
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// 是否发生线程中断
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
// 如果等待时间超过指定超时时间,throw new TimeoutException
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 最后释放锁操作
lock.unlock();
}
}
在CyclicBarrier类的JDK1.8版本中,对于CyclicBarrier的具体实现如下:
public class CyclicBarrier {
/** CyclicBarrier锁—屏障lock实体 */
private final ReentrantLock lock = new ReentrantLock();
/** CyclicBarrier锁—屏障条件队列 */
private final Condition trip = lock.newCondition();
/** CyclicBarrier锁—屏障最大值 */
private final int parties;
/** CyclicBarrier锁—屏障触发线程任务目标 */
private final Runnable barrierCommand;
/** CyclicBarrier锁—当前计数器的最大值屏障实例 */
private Generation generation = new Generation();
/** CyclicBarrier锁—当前计数器的最大值屏障实例 */
private int count;
/** CyclicBarrier锁—屏障实例 */
private static class Generation {
boolean broken = false;
}
/** CyclicBarrier锁—构造一个屏障实例(不带触发任务的) */
public CyclicBarrier(int parties) {
this(parties, null);
}
/** CyclicBarrier锁—构造一个屏障实例(带触发任务的) */
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
/** CyclicBarrier锁—无参数构造一个等待方法(默认模式) */
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
/** CyclicBarrier锁—有参数构造一个等待方法(支持超时机制) */
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
/** CyclicBarrier锁—更新状态变量 */
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
/** CyclicBarrier锁—阻塞屏障 */
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
/** CyclicBarrier锁—阻塞屏障 */
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// [1].实例化构建ReentrantLock的对象
final ReentrantLock lock = this.lock;
// [2].通过lock()获取锁或者说加锁操作
lock.lock();
try {
// [3].实例化构建Generation屏障实例对象
final Generation g = generation;
// [4].判断Generation屏障实例标记状态是否为true
if (g.broken)
throw new BrokenBarrierException();
// [5].判断Thread是包含中断标志位
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// [6].对倒计数器的屏障数量递减1运算
int index = --count;
// [7].依据结果index == 0表示当前指定的线程数量到达屏障最大值,需要触发Runnable任务
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 进行下一轮屏障设置
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// [7].自旋操作
for (;;) {
try {
// 判断是否超时
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
// 如果等待时间超过指定超时时间,throw new TimeoutException
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 最后释放锁操作
lock.unlock();
}
}
/** CyclicBarrier锁—获取当前等屏障等待数量 */
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
/** CyclicBarrier锁—获取当前等屏障数量 */
public int getParties() {
return parties;
}
/** CyclicBarrier锁—判断当前屏障 */
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
/** CyclicBarrier锁—重置屏障数量 */
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
}
主要需要注意如下几个方法,都是基于ReentrantLock来实现加锁和解锁操作的,其中:
综上所述,从一定意义上讲,CyclicBarrier是一种可重入锁,属于ReentrantLock的应用实例,其中加锁和解锁操作都是独占模式的。
在Java领域中,Semaphore(信号量)是针对于Java多线程并发控制中实现对公共资源的线程数量进行并发同时访问控制,主要是采用指定一个最大许可数的思想和基于AQS基础同步器来实现的一种同步器工具类。
Semaphore可以用来控制在同一时刻访问共享资源的线程数量,通过协调各个线程以保证共享资源的合理使用。
Semaphore维护了一组虚拟许可,它的数量可以通过构造器的参数指定。
线程在访问共享资源前,必须调用Semaphore的acquire()方法获得许可,如果许可数量为0,该线程就一直阻塞。
线程在访问共享资源后,必须调用Semaphore的release()方法释放许可。
一般来说,通过定义一个倒计数器,为了控制最多N个线程同时访问公共资源,其计数器的最大值Max(N)是被许可的最多N个线程数量,即就是许可的最大值N。
Semaphore类最早是在JDK1.5版本提供的,从设计思想上来看,主要包括倒计数器的最大许可数,同步器工作模式,获取锁方法,释放锁方法等4个核心要素。其中:
在JDK1.8版本中,对于Semaphore的基本实现如下:
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = - L;
/**
* Semaphore锁- 封装同步器
*/
private final Sync sync;
/**
* Semaphore锁- 封装同步器
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
//....其他代码
}
/**
* Semaphore锁- 构造一个令牌许可(默认非公模式)
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
/**
* Semaphore锁- 构造一个令牌许可(可选公平/非公模式)
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
/**
* Semaphore锁- 获取锁方法(默认一个且可中断机制)
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* Semaphore锁- 获取锁方法(可选指定多个且可中断机制)
*/
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
/**
* Semaphore锁- 获取锁方法(默认多个且不可中断机制)
*/
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
/**
* Semaphore锁- 获取锁方法(指定多个且不可中断机制)
*/
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
/**
* Semaphore锁-释放锁方法(默认一个)
*/
public void release() {
sync.releaseShared(1);
}
/**
* Semaphore锁-释放锁方法(可选指定多个)
*/
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
}
/**
* Semaphore锁- 基于AQS基础同步器封装同步器
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
/**
* Semaphore锁- 非公平模式获取共享锁
*/
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
/**
* Semaphore锁- 释放共享锁
*/
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
/**
* Semaphore锁- 自旋+compareAndSetState通过CAS操作计算操作令牌许可数
*/
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
/**
* Semaphore锁- 自旋+compareAndSetState通过CAS操作重置令牌许可数
*/
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
特别指出的是,这里的非公平模式主要描述的是,在令牌许可数量允许的情况下,让所有线程进行自旋操作,其实就是不关心线程到来的顺序,将全部线程放到一起去参与竞争令牌许可。
其中,主要还利用compareAndSetState方法来进行CAS操作,保证修改令牌许可数量的原子性操作。
一般来说,假设我们允许控制的最大线程数量为N,剩余令牌许可数量为Remanent(N), 当前可用令牌许可数量为Current(N) , 消耗令牌许可数量为Reduction(N),那么我们会得到一个计算公式:
�即就意味着,剩余令牌许可数量等于当前可用令牌许可数量与消耗令牌许可数量之差。
由此可见,在公平/非公平模式下,我们对于对于获取锁和释放锁时,对于剩余令牌许可数量Remanent(N)计算都满足以下公式:
当然,需要注意的的一个问题,就是当剩余令牌许可数量Remanent(N) < 0时,表示当前线程会进入阻塞等待状态。
/**
* Semaphore锁- 基于Sync抽象类封装FairSync公平同步器
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = L;
/**
* Semaphore锁- Semaphore锁- 通过构造方法指定令牌许可
*/
FairSync(int permits) {
super(permits);
}
/**
* Semaphore锁- Semaphore锁- 公平模式释放共享锁
*/
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
需要注意的是,在未达到最大的令牌许可数量时,所有线程都不会进入等待队列中。
/**
* Semaphore锁- 基于Sync抽象类封装NonfairSync非公平同步器
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = - L;
/**
* Semaphore锁- Semaphore锁- 通过构造方法指定令牌许可
*/
NonfairSync(int permits) {
super(permits);
}
/**
* Semaphore锁- Semaphore锁- 非公平模式释放共享锁
*/
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
在JDK1.8版本中,对于Semaphore的具体实现如下:
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = - L;
/**
* Semaphore锁- 封装同步器
*/
private final Sync sync;
/**
* Semaphore锁- 基于AQS基础同步器封装同步器
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
/**
* Semaphore锁- 非公平模式获取共享锁
*/
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
/**
* Semaphore锁- 释放共享锁
*/
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
/**
* Semaphore锁- 自旋+compareAndSetState通过CAS操作计算操作令牌许可数
*/
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
/**
* Semaphore锁- 自旋+compareAndSetState通过CAS操作重置令牌许可数
*/
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
/**
* Semaphore锁- 基于Sync抽象类封装FairSync公平同步器
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = L;
/**
* Semaphore锁- Semaphore锁- 通过构造方法指定令牌许可
*/
FairSync(int permits) {
super(permits);
}
/**
* Semaphore锁- Semaphore锁- 公平模式释放共享锁
*/
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
/**
* Semaphore锁- 基于Sync抽象类封装NonfairSync非公平同步器
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = - L;
/**
* Semaphore锁- Semaphore锁- 通过构造方法指定令牌许可
*/
NonfairSync(int permits) {
super(permits);
}
/**
* Semaphore锁- Semaphore锁- 非公平模式释放共享锁
*/
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/**
* Semaphore锁- 构造一个令牌许可(默认非公模式)
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
/**
* Semaphore锁- 构造一个令牌许可(可选公平/非公模式)
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
/**
* Semaphore锁- 获取锁方法(默认一个且可中断机制)
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* Semaphore锁- 获取锁方法(可选指定多个且可中断机制)
*/
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
/**
* Semaphore锁- 获取锁方法(默认多个且不可中断机制)
*/
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
/**
* Semaphore锁- 获取锁方法(指定多个且不可中断机制)
*/
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
/**
* Semaphore锁-释放锁方法(默认一个)
*/
public void release() {
sync.releaseShared(1);
}
/**
* Semaphore锁-释放锁方法(可选指定多个)
*/
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
/**
* Semaphore锁-尝试获取锁方法(默认一个)
*/
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
/**
* Semaphore锁-尝试获取锁方法(可选指定多个)
*/
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
/**
* Semaphore锁-尝试获取锁方法(可选指定多个并且支持超时机制)
*/
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
/**
* Semaphore锁-尝试获取锁方法(默认一个并且支持超时机制)
*/
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* Semaphore锁-统计可以令牌许可数
*/
public int availablePermits() {
return sync.getPermits();
}
/**
* Semaphore锁-重置令牌许可数
*/
public int drainPermits() {
return sync.drainPermits();
}
/**
* Semaphore锁-递减计算令牌许可数
*/
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
/**
* Semaphore锁-判断是否公平模式
*/
public boolean isFair() {
return sync instanceof FairSync;
}
/**
* Semaphore锁-判断队列中是否存在线程对象
*/
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
/**
* Semaphore锁-获取队列长度
*/
public final int getQueueLength() {
return sync.getQueueLength();
}
/**
* Semaphore锁-获取队列的线程对象
*/
protected Collection < Thread > getQueuedThreads() {
return sync.getQueuedThreads();
}
}
综上所述,从一定意义上讲,Semaphore是一种共享锁,属于AQS基础抽象队列同步器中共享模式孵化的产物,支持公平模式与非公平模式,默认是使用非公平模式。
通过对Java领域中,JDK内部提供的各种锁的实现来看,一直围绕的核心主要还是基于AQS基础同步器来实现的,但是AQS基础同步器不是一种非它不可的技术标准规范,更多的只是一套技术参考指南。
但是,实际上,Java对于锁的实现与运用远远不止这些,还有相位器(Phaser)和交换器(Exchanger),以及在Java JDK1.8版本之前并发容器ConcurrentHashMap中使用的分段锁(Segment)。
不论是何种实现和应用,在Java并发编程领域来讲,都是围绕线程安全问题的角度去考虑的,只是针对于各种各样的业务场景做的具体的实现。
一定意义上来讲,对线程加锁只是并发编程的实现方式之一,相对于实际应用来说,Java领域中的锁都只是一种单一应用的锁,只是给我们掌握Java并发编程提供一种思想没,三言两语也不可能详尽。
到此为止,这算是对于Java领域中并发锁的最终章,文中表述均为个人看法和个人理解,如有不到之处,忘请谅解也请给予批评指正。
最后,技术研究之路任重而道远,愿我们熬的每一个通宵,都撑得起我们想在这条路上走下去的勇气,未来仍然可期,与各位程序编程君共勉!
版权声明:本文为博主原创文章,遵循相关版权协议,如若转载或者分享请附上原文出处链接和链接来源。