JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQu

2019-10-11 05:43 来源:未知

清明节和朋友去被抖音带火的一个餐厅,下午两点钟取晚上的号,前面已经有十几桌了,四点半餐厅开始正式营业,等轮到我们已经近八点了。餐厅分为几个区域,只有最火的区域需要排号,其他区域基本上是随到随吃的,最冷清的区域几乎都没什么人。菜的价格异常的贵,味道也并不好。最后送出两张图:

第八章 ArrayBlockingQueue源码解析,arraydeque源码解析

注意:在阅读本文之前或在阅读的过程中,需要用到ReentrantLock,内容见《第五章 ReentrantLock源码解析1--获得非公平锁与公平锁lock()》《第六章 ReentrantLock源码解析2--释放锁unlock()》《第七章 ReentrantLock总结》

1、对于ArrayBlockingQueue需要掌握以下几点

  • 创建
  • 入队(添加元素)
  • 出队(删除元素)

2、创建

  • public ArrayBlockingQueue(int capacity, boolean fair)
  • public ArrayBlockingQueue(int capacity)

使用方法:

  • Queue<String> abq = new ArrayBlockingQueue<String>(2);
  • Queue<String> abq = new ArrayBlockingQueue<String>(2,true);

通过使用方法,可以看出ArrayBlockingQueue支持ReentrantLock的公平锁模式与非公平锁模式,对于这两种模式,查看本文开头的文章即可。

源代码如下:

彩民之家论坛9066777 1 private final E[] items;//底层数据结构 private int takeIndex;//用来为下一个take/poll/remove的索引(出队) private int putIndex;//用来为下一个put/offer/add的索引(入队) private int count;//队列中元素的个数 /* * Concurrency control uses the classic two-condition algorithm found in any * textbook. */ /** Main lock guarding all access */ private final ReentrantLock lock;//锁 /** Condition for waiting takes */ private final Condition notEmpty;//等待出队的条件 /** Condition for waiting puts */ private final Condition notFull;//等待入队的条件 View Code 彩民之家论坛9066777 2 /** * 创造一个队列,指定队列容量,指定模式 * @param fair * true:先来的线程先操作 * false:顺序随机 */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = (E[]) new Object[capacity];//初始化类变量数组items lock = new ReentrantLock(fair);//初始化类变量锁lock notEmpty = lock.newCondition();//初始化类变量notEmpty Condition notFull = lock.newCondition();//初始化类变量notFull Condition } /** * 创造一个队列,指定队列容量,默认模式为非公平模式 * @param capacity <1会抛异常 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } View Code

注意:

  • ArrayBlockingQueue的组成:一个对象数组 1把锁ReentrantLock 2个条件Condition
  • 在查看源码的过程中,也要模仿带条件锁的使用,这个双条件锁模式是很经典的模式

3、入队

3.1、public boolean offer(E e)

原理:

  • 在队尾插入一个元素, 如果队列没满,立即返回true; 如果队列满了,立即返回false

使用方法:

  • abq.offer("hello1");

源代码:

彩民之家论坛9066777 3 /** * 在队尾插入一个元素, * 如果队列没满,立即返回true; * 如果队列满了,立即返回false * 注意:该方法通常优于add(),因为add()失败直接抛异常 */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length)//数组满了 return false; else {//数组没满 insert(e);//插入一个元素 return true; } } finally { lock.unlock(); } } View Code 彩民之家论坛9066777 4 private void insert(E x) { items[putIndex] = x;//插入元素 putIndex = inc(putIndex);//putIndex 1 count;//元素数量 1 /** * 唤醒一个线程 * 如果有任意一个线程正在等待这个条件,那么选中其中的一个区唤醒。 * 在从等待状态被唤醒之前,被选中的线程必须重新获得锁 */ notEmpty.signal(); } View Code 彩民之家论坛9066777 5 /** * i 1,数组下标 1 */ final int inc(int i) { return ( i == items.length) ? 0 : i; } View Code

代码非常简单,流程看注释即可,只有一点注意点:

  • 在插入元素结束后,唤醒等待notEmpty条件(即获取元素)的线程,可以发现这类似于生产者-消费者模式

 

3.2、public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException

原理:

  • 在队尾插入一个元素,,如果数组已满,则进入等待,直到出现以下三种情况:
    • 被唤醒
    • 等待时间超时
    • 当前线程被中断

使用方法:

彩民之家论坛9066777 6 try { abq.offer("hello2",1000,TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } View Code

源代码:

彩民之家论坛9066777 7 /** * 在队尾插入一个元素, * 如果数组已满,则进入等待,直到出现以下三种情况: * 1、被唤醒 * 2、等待时间超时 * 3、当前线程被中断 */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout);//将超时时间转换为纳秒 final ReentrantLock lock = this.lock; /* * lockInterruptibly(): * 1、 在当前线程没有被中断的情况下获取锁。 * 2、如果获取成功,方法结束。 * 3、如果锁无法获取,当前线程被阻塞,直到下面情况发生: * 1)当前线程(被唤醒后)成功获取锁 * 2)当前线程被其他线程中断 * * lock() * 获取锁,如果锁无法获取,当前线程被阻塞,直到锁可以获取并获取成功为止。 */ lock.lockInterruptibly();//加可中断的锁 try { for (;;) { if (count != items.length) {//队列未满 insert(e); return true; } if (nanos <= 0)//已超时 return false; try { /* * 进行等待: * 在这个过程中可能发生三件事: * 1、被唤醒-->继续当前这个for(;;)循环 * 2、超时-->继续当前这个for(;;)循环 * 3、被中断-->之后直接执行catch部分的代码 */ nanos = notFull.awaitNanos(nanos);//进行等待(在此过程中,时间会流失,在此过程中,线程也可能被唤醒) } catch (InterruptedException ie) {//在等待的过程中线程被中断 notFull.signal(); // 唤醒其他未被中断的线程 throw ie; } } } finally { lock.unlock(); } } View Code

注意:

  • awaitNanos(nanos)是AQS中的一个方法,这里就不详细说了,有兴趣的自己去查看AQS的源代码。
  • lockInterruptibly()与lock()的区别见注释

 

3.3、public void put(E e) throws InterruptedException

原理:

  • 在队尾插入一个元素,如果队列满了,一直阻塞,直到数组不满了或者线程被中断

使用方法:

彩民之家论坛9066777 8 try { abq.put("hello1"); } catch (InterruptedException e) { e.printStackTrace(); } View Code

源代码:

彩民之家论坛9066777 9 /** * 在队尾插入一个元素 * 如果队列满了,一直阻塞,直到数组不满了或者线程被中断 */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == items.length)//队列满了,一直阻塞在这里 /* * 一直等待条件notFull,即被其他线程唤醒 * (唤醒其实就是,有线程将一个元素出队了,然后调用notFull.signal()唤醒其他等待这个条件的线程,同时队列也不慢了) */ notFull.await(); } catch (InterruptedException ie) {//如果被中断 notFull.signal(); // 唤醒其他等待该条件(notFull,即入队)的线程 throw ie; } insert(e); } finally { lock.unlock(); } } View Code

 

4、出队

4.1、public E poll()

原理:

  • 如果没有元素,直接返回null;如果有元素,将队头元素置null,但是要注意队头是随时变化的,并非一直是items[0]。

使用方法:

abq.poll();

源代码:

彩民之家论坛9066777 10 /** * 出队 */ public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { if (count == 0)//如果没有元素,直接返回null,而非抛出异常 return null; E x = extract(); return x; } finally { lock.unlock(); } } View Code 彩民之家论坛9066777 11 /** * 出队 */ private E extract() { final E[] items = this.items; E x = items[takeIndex];//获取出队元素 items[takeIndex] = null;//将出队元素位置置空 /* * 第一次出队的元素takeIndex==0,第二次出队的元素takeIndex==1 * (注意:这里出队之后,并没有将后面的数组元素向前移) */ takeIndex = inc(takeIndex); --count;//数组元素个数-1 notFull.signal();//数组已经不满了,唤醒其他等待notFull条件的线程 return x;//返回出队的元素 } View Code

 

4.2、public E poll(long timeout, TimeUnit unit) throws InterruptedException

原理:

  • 从对头删除一个元素,如果数组不空,出队;如果数组已空且已经超时,返回null;如果数组已空且时间未超时,则进入等待,直到出现以下三种情况:
    • 被唤醒
    • 等待时间超时
    • 当前线程被中断

使用方法:

彩民之家论坛9066777 12 try { abq.poll(1000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } View Code

源代码:

彩民之家论坛9066777 13 /** * 从对头删除一个元素, * 如果数组不空,出队; * 如果数组已空,判断时间是否超时,如果已经超时,返回null * 如果数组已空且时间未超时,则进入等待,直到出现以下三种情况: * 1、被唤醒 * 2、等待时间超时 * 3、当前线程被中断 */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout);//将时间转换为纳秒 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { if (count != 0) {//数组不空 E x = extract();//出队 return x; } if (nanos <= 0)//时间超时 return null; try { /* * 进行等待: * 在这个过程中可能发生三件事: * 1、被唤醒-->继续当前这个for(;;)循环 * 2、超时-->继续当前这个for(;;)循环 * 3、被中断-->之后直接执行catch部分的代码 */ nanos = notEmpty.awaitNanos(nanos); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } } } finally { lock.unlock(); } } View Code

 

4.3、public E take() throws InterruptedException

原理:

  • 将队头元素出队,如果队列空了,一直阻塞,直到数组不为空或者线程被中断

使用方法:

彩民之家论坛9066777 14 try { abq.take(); } catch (InterruptedException e) { e.printStackTrace(); } View Code

源代码:

彩民之家论坛9066777 15 /** * 将队头元素出队 * 如果队列空了,一直阻塞,直到数组不为空或者线程被中断 */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == 0)//如果数组为空,一直阻塞在这里 /* * 一直等待条件notEmpty,即被其他线程唤醒 * (唤醒其实就是,有线程将一个元素入队了,然后调用notEmpty.signal()唤醒其他等待这个条件的线程,同时队列也不空了) */ notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = extract(); return x; } finally { lock.unlock(); } } View Code

  

总结:

1、具体入队与出队的原理图:这里只说一种情况,见下图,途中深色部分表示已有元素,浅色部分没有元素。

彩民之家论坛9066777 16

 

上面这种情况是怎么形成的呢?当队列满了,这时候,队头元素为items[0]出队了,就形成上边的这种情况。

假设现在又要出队了,则现在的队头元素是items[1],出队后就形成下面的情形。

彩民之家论坛9066777 17

 

出队后,对头元素就是items[2]了,假设现在有一个元素将要入队,根据inc方法,我们可以得知,他要插入到items[0]去,入队了形成下图:

彩民之家论坛9066777 18

以上就是整个入队出队的流程,inc方法上边已经给出,这里再贴一遍:

彩民之家论坛9066777 19 /** * i 1,数组下标 1 * 注意:这里这样写的原因。 */ final int inc(int i) { return ( i == items.length) ? 0 : i; } View Code

 

2、三种入队对比:

  • offer(E e):如果队列没满,立即返回true; 如果队列满了,立即返回false-->不阻塞
  • put(E e):如果队列满了,一直阻塞,直到数组不满了或者线程被中断-->阻塞
  • offer(E e, long timeout, TimeUnit unit):在队尾插入一个元素,,如果数组已满,则进入等待,直到出现以下三种情况:-->阻塞
    • 被唤醒
    • 等待时间超时
    • 当前线程被中断

 

3、三种出对对比:

  • poll():如果没有元素,直接返回null;如果有元素,出队
  • take():如果队列空了,一直阻塞,直到数组不为空或者线程被中断-->阻塞
  • poll(long timeout, TimeUnit unit):如果数组不空,出队;如果数组已空且已经超时,返回null;如果数组已空且时间未超时,则进入等待,直到出现以下三种情况:
    • 被唤醒
    • 等待时间超时
    • 当前线程被中断

ArrayBlockingQueue源码解析,arraydeque源码解析 注意:在阅读本文之前或在阅读的过程中,需要用到ReentrantLock,内容见《第五章 Reentra...

 JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue

 

目的:本文通过分析JDK源码来对比ArrayBlockingQueue 和LinkedBlockingQueue,以便日后灵活使用。

1. 在Java的Concurrent包中,添加了阻塞队列BlockingQueue,用于多线程编程。BlockingQueue的核心方法有:

boolean add(E e) ,把 e 添加到BlockingQueue里。如果BlockingQueue可以容纳,则返回true,否则抛出异常。
boolean offer(E e),表示如果可能的话,将 e 加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。
void put(E e),把 e 添加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻塞直到BlockingQueue里面有空间再继续。
E poll(long timeout, TimeUnit unit) ,取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。
E take() ,取走BlockingQueue里排在首位的对象,若BlockingQueue为空,则调用此方法的线程被阻塞直到BlockingQueue有新的数据被加入。
int drainTo(Collection<? super E> c) 和 int drainTo(Collection<? super E> c, int maxElements) ,一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取 数据的个数),通过该方法,可以提升获取数据效率,不需要多次分批加锁或 释放锁。

注意:BlockingQueue 不接受null 元素。试图add、put 或offer 一个null 元素时,某些实现会抛出NullPointerException。null 被用作指示poll 操作失败的警戒值。

2. BlockingQueue常用的四个实现类

ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的.
2) LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue 有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先 出)顺序排序的
3) PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序.
4) SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的.

 本文将从JDK源码层面分析对比ArrayBlockingQueue和LinkedBlockingQueue

3. ArrayBlockingQueue源码分析

    ArrayBlockingQueue是一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素,队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列检索操作则是从队列头部开始获得元素。
     这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致放入操作受阻塞;试图从空队列中检索元素将导致类似阻塞。

    ArrayBlockingQueue创建的时候需要指定容量capacity(可以存储的最大的元素个数,因为它不会自动扩容)。其中一个构造方法为:

[java] view plaincopyprint?彩民之家论坛9066777 20彩民之家论坛9066777 21

  1. public ArrayBlockingQueue(int capacity, boolean fair) {  
  2.        if (capacity <= 0)  
  3.            throw new IllegalArgumentException();  
  4.        this.items = (E[]) new Object[capacity];  
  5.        lock = new ReentrantLock(fair);  
  6.        notEmpty = lock.newCondition();  
  7.        notFull =  lock.newCondition();  
  8.    }  

    ArrayBlockingQueue类中定义的变量有:

[java] view plaincopyprint?彩民之家论坛9066777 22彩民之家论坛9066777 23

  1. /** The queued items  */  
  2. private final E[] items;  
  3. /** items index for next take, poll or remove */  
  4. private int takeIndex;  
  5. /** items index for next put, offer, or add. */  
  6. private int putIndex;  
  7. /** Number of items in the queue */  
  8. private int count;  
  9.   
  10. /* 
  11.  * Concurrency control uses the classic two-condition algorithm 
  12.  * found in any textbook. 
  13.  */  
  14.   
  15. /** Main lock guarding all access */  
  16. private final ReentrantLock lock;  
  17. /** Condition for waiting takes */  
  18. private final Condition notEmpty;  
  19. /** Condition for waiting puts */  
  20. private final Condition notFull;  

使 用数组items来存储元素,由于是循环队列,使用takeIndex和putIndex来标记put和take的位置。可以看到,该类中只定义了一个锁 ReentrantLock,定义两个Condition对象:notEmputy和notFull,分别用来对take和put操作进行所控制。注:本 文主要讲解put()和take()操作,其他方法类似。

put(E e)方法的源码如下。进行put操作之前,必须获得锁并进行加锁操作,以保证线程安全性。加锁后,若发现队列已满,则调用notFull.await() 方法,如当前线程陷入等待。直到其他线程take走某个元素后,会调用notFull.signal()方法来激活该线程。激活之后,继续下面的插入操 作。

[java] view plaincopyprint?彩民之家论坛9066777 24彩民之家论坛9066777 25

  1. /** 
  2.      * Inserts the specified element at the tail of this queue, waiting 
  3.      * for space to become available if the queue is full. 
  4.      * 
  5.      */  
  6.     public void put(E e) throws InterruptedException {  
  7.         //不能存放 null  元素  
  8.         if (e == null) throw new NullPointerException();  
  9.         final E[] items = this.items;   //数组队列  
  10.         final ReentrantLock lock = this.lock;  
  11.         //加锁  
  12.         lock.lockInterruptibly();  
  13.         try {  
  14.             try {  
  15.                 //当队列满时,调用notFull.await()方法,使该线程阻塞。  
  16.                 //直到take掉某个元素后,调用notFull.signal()方法激活该线程。  
  17.                 while (count == items.length)  
  18.                     notFull.await();  
  19.             } catch (InterruptedException ie) {  
  20.                 notFull.signal(); // propagate to non-interrupted thread  
  21.                 throw ie;  
  22.             }  
  23.             //把元素 e 插入到队尾  
  24.             insert(e);  
  25.         } finally {  
  26.             //解锁  
  27.             lock.unlock();  
  28.         }  
  29.     }  

insert(E e) 方法如下:

[java] view plaincopyprint?彩民之家论坛9066777 26彩民之家论坛9066777 27

  1.   /** 
  2.    * Inserts element at current put position, advances, and signals. 
  3.    * Call only when holding lock. 
  4.    */  
  5.   private void insert(E x) {  
  6.       items[putIndex] = x;    
  7. //下标加1或者等于0  
  8.       putIndex = inc(putIndex);  
  9.        count;  //计数加1  
  10. //若有take()线程陷入阻塞,则该操作激活take()线程,继续进行取元素操作。  
  11. //若没有take()线程陷入阻塞,则该操作无意义。  
  12.       notEmpty.signal();  
  13.   }  
  14.   
  15. **  
  16.    * Circularly increment i.  
  17.    */  
  18.   final int inc(int i) {  
  19. //此处可以看到使用了循环队列  
  20.       return ( i == items.length)? 0 : i;  
  21.   }  

take()方法代码如下。take操作和put操作相反,故不作详细介绍。

[java] view plaincopyprint?彩民之家论坛9066777 28彩民之家论坛9066777 29

  1. public E take() throws InterruptedException {  
  2.         final ReentrantLock lock = this.lock;  
  3.         lock.lockInterruptibly();  //加锁  
  4.         try {  
  5.             try {  
  6.                 //当队列空时,调用notEmpty.await()方法,使该线程阻塞。  
  7.                 //直到take掉某个元素后,调用notEmpty.signal()方法激活该线程。  
  8.                 while (count == 0)  
  9.                     notEmpty.await();  
  10.             } catch (InterruptedException ie) {  
  11.                 notEmpty.signal(); // propagate to non-interrupted thread  
  12.                 throw ie;  
  13.             }  
  14.             //取出队头元素  
  15.             E x = extract();  
  16.             return x;  
  17.         } finally {  
  18.             lock.unlock();  //解锁  
  19.         }  
  20.     }  

extract() 方法如下:

[java] view plaincopyprint?彩民之家论坛9066777 30彩民之家论坛9066777 31

  1. /** 
  2.      * Extracts element at current take position, advances, and signals. 
  3.      * Call only when holding lock. 
  4.      */  
  5.     private E extract() {  
  6.         final E[] items = this.items;  
  7.         E x = items[takeIndex];  
  8.         items[takeIndex] = null;  
  9.         takeIndex = inc(takeIndex);  
  10.         --count;  
  11.         notFull.signal();  
  12.         return x;  
  13.     }  

小结:进行put和take操作,共用同一个锁对象。也即是说,put和take无法并行执行!
4. LinkedBlockingQueue 源码分析

    基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓 冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区 达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生 产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生 产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性 能。
作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大 小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于 消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

 LinkedBlockingQueue 类中定义的变量有:

[java] view plaincopyprint?彩民之家论坛9066777 32彩民之家论坛9066777 33

  1. /** The capacity bound, or Integer.MAX_VALUE if none */  
  2. private final int capacity;  
  3.   
  4. /** Current number of elements */  
  5. private final AtomicInteger count = new AtomicInteger(0);  
  6.   
  7. /** Head of linked list */  
  8. private transient Node<E> head;  
  9.   
  10. /** Tail of linked list */  
  11. private transient Node<E> last;  
  12.   
  13. /** Lock held by take, poll, etc */  
  14. private final ReentrantLock takeLock = new ReentrantLock();  
  15.   
  16. /** Wait queue for waiting takes */  
  17. private final Condition notEmpty = takeLock.newCondition();  
  18.   
  19. /** Lock held by put, offer, etc */  
  20. private final ReentrantLock putLock = new ReentrantLock();  
  21.   
  22. /** Wait queue for waiting puts */  
  23. private final Condition notFull = putLock.newCondition();  

该类中定义了两个ReentrantLock锁:putLock和takeLock,分别用于put端和take端。也就是说,生成端和消费端各自独立拥有一把锁,避免了读(take)写(put)时互相竞争锁的情况。

[java] view plaincopyprint?彩民之家论坛9066777 34彩民之家论坛9066777 35

  1. /** 
  2.      * Inserts the specified element at the tail of this queue, waiting if 
  3.      * necessary for space to become available. 
  4.      */  
  5.     public void put(E e) throws InterruptedException {  
  6.         if (e == null) throw new NullPointerException();  
  7.         // Note: convention in all put/take/etc is to preset local var  
  8.         // holding count negative to indicate failure unless set.  
  9.         int c = -1;  
  10.         final ReentrantLock putLock = this.putLock;  
  11.         final AtomicInteger count = this.count;  
  12.         putLock.lockInterruptibly(); //加 putLock 锁  
  13.         try {  
  14.             /* 
  15.              * Note that count is used in wait guard even though it is 
  16.              * not protected by lock. This works because count can 
  17.              * only decrease at this point (all other puts are shut 
  18.              * out by lock), and we (or some other waiting put) are 
  19.              * signalled if it ever changes from 
  20.              * capacity. Similarly for all other uses of count in 
  21.              * other wait guards. 
  22.              */  
  23.             //当队列满时,调用notFull.await()方法释放锁,陷入等待状态。  
  24.             //有两种情况会激活该线程  
  25.             //第一、 某个put线程添加元素后,发现队列有空余,就调用notFull.signal()方法激活阻塞线程  
  26.             //第二、 take线程取元素时,发现队列已满。则其取出元素后,也会调用notFull.signal()方法激活阻塞线程  
  27.             while (count.get() == capacity) {   
  28.                     notFull.await();  
  29.             }  
  30.             // 把元素 e 添加到队列中(队尾)  
  31.             enqueue(e);  
  32.             c = count.getAndIncrement();  
  33.             //发现队列未满,调用notFull.signal()激活阻塞的put线程(可能存在)  
  34.             if (c   1 < capacity)  
  35.                 notFull.signal();  
  36.         } finally {  
  37.             putLock.unlock();  
  38.         }  
  39.         if (c == 0)  
  40.             //队列空,说明已经有take线程陷入阻塞,故调用signalNotEmpty激活阻塞的take线程  
  41.             signalNotEmpty();  
  42.     }  

enqueue(E e)方法如下:

[java] view plaincopyprint?彩民之家论坛9066777 36彩民之家论坛9066777 37

  1. /** 
  2.  * Creates a node and links it at end of queue. 
  3.  * @param x the item 
  4.  */  
  5. private void enqueue(E x) {  
  6.     // assert putLock.isHeldByCurrentThread();  
  7.     last = last.next = new Node<E>(x);  
  8. }  

take()方法代码如下。take操作和put操作相反,故不作详细介绍。

[java] view plaincopyprint?彩民之家论坛9066777 38彩民之家论坛9066777 39

  1. public E take() throws InterruptedException {  
  2.        E x;  
  3.        int c = -1;  
  4.        final AtomicInteger count = this.count;  
  5.        final ReentrantLock takeLock = this.takeLock;  
  6.        takeLock.lockInterruptibly();  
  7.        try {  
  8.                while (count.get() == 0) {  
  9.                    notEmpty.await();  
  10.                }  
  11.            x = dequeue();  
  12.            c = count.getAndDecrement();  
  13.            if (c > 1)  
  14.                notEmpty.signal();  
  15.        } finally {  
  16.            takeLock.unlock();  
  17.        }  
  18.        if (c == capacity)  
  19.            signalNotFull();  
  20.        return x;  
  21.    }  

dequeue()方法如下:

[java] view plaincopyprint?彩民之家论坛9066777 40彩民之家论坛9066777 41

  1. /** 
  2.  * Removes a node from head of queue. 
  3.  * @return the node 
  4.  */  
  5. private E dequeue() {  
  6.     // assert takeLock.isHeldByCurrentThread();  
  7.     Node<E> h = head;  
  8.     Node<E> first = h.next;  
  9.     h.next = h; // help GC  
  10.     head = first;  
  11.     E x = first.item;  
  12.     first.item = null;  
  13.     return x;  
  14. }  

小结:take和put操作各有一把锁,可并行读取。

参考地址:

1). Java多线程-工具篇-BlockingQueue:

2). Java多线程(五)之BlockingQueue深入分析:

好了,进入今天的正题,今天要讲的是ArrayBlockQueue,ArrayBlockQueue是JUC提供的线程安全的有界的阻塞队列,一看到Array,第一反应:这货肯定和数组有关,既然是数组,那自然是有界的了,我们先来看看ArrayBlockQueue的基本使用方法,然后再看看ArrayBlockQueue的源码。

ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c)
 public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { //调用第二个构造方法,方法内部就是初始化数组,排他锁,两个条件变量 this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // 开启排他锁 try { int i = 0; try { // 循环传入的集合,把集合中的元素赋值给items数组,其中i会自增 for  { checkNotNull; items[i  ] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i;//把i赋值给count //如果i==capacity,也就是到了最大容量,把0赋值给putIndex,否则把i赋值给putIndex putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock();//释放排他锁 } }
  1. 调用第二个构造方法,方法内部就是初始化数组items,排他锁lock,以及两个条件变量。
  2. 开启排他锁。
  3. 循环传入的集合,将集合中的元素赋值给items数组,其中i会自增。
  4. 把i赋值给count。
  5. 如果i==capacity,说明到了最大的容量,就把0赋值给putIndex,否则把i赋值给putIndex。
  6. 在finally中释放排他锁。

看到这里,我们应该明白这个构造方法的作用是什么了,就是把传入的集合作为ArrayBlockingQueuede初始化数据,但是我们又会有一个新的疑问:count,putIndex 是做什么用的。

 public boolean offer { checkNotNull; final ReentrantLock lock = this.lock; lock.lock();//开启排他锁 try { if (count == items.length)//如果count==items.length,返回false return false; else { enqueue;//入队 return true;//返回true } } finally { lock.unlock();//释放锁 } }
  1. 开启排他锁。
  2. 如果count==items.length,也就是到了最大的容量,返回false。
  3. 如果count<items.length,执行入队方法,并且返回true。
  4. 释放排他锁。

看到这里,我们应该可以明白了,ArrayBlockQueue是如何保证线程安全的,还是利用了ReentrantLock排他锁,count就是用来保存数组的当前大小的。我们再来看看enqueue方法。

 private void enqueue { final Object[] items = this.items; items[putIndex] = x; if (  putIndex == items.length) putIndex = 0; count  ; notEmpty.signal(); }

这方法比较简单,在代码里面就不写注释了,做了如下的操作:

  1. 把x赋值给items[putIndex] 。
  2. 将putIndex进行自增,如果自增后的值 == items.length,把0赋值给putIndex 。
  3. 执行count 操作。
  4. 调用条件变量notEmpty的signal方法,说明在某个地方,必定调用了notEmpty的await方法,这里就是唤醒因为调用notEmpty的await方法而被阻塞的线程。

这里就解答了一个疑问:putIndex是做什么的,就是入队元素的下标。

 public boolean add { return super.add; }

 public boolean add { if  return true; else throw new IllegalStateException("Queue full"); }

这个方法内部最终还是调用的offer方法。

 public void put throws InterruptedException { checkNotNull; final ReentrantLock lock = this.lock; lock.lockInterruptibly();//开启响应中断的排他锁 try { while (count == items.length)//如果队列满了,调用notFull的await notFull.await(); enqueue;//入队 } finally { lock.unlock();//释放排他锁 } }
  1. 开启响应中断的排他锁,如果在获取锁的过程中,当前的线程被中断,会抛出异常。
  2. 如果队列满了,调用notFull的await方法,说明在某个地方,必定调用了notFull的signal方法来唤醒当前线程,这里用while循环是为了防止虚假唤醒。
  3. 执行入队操作。
  4. 释放排他锁。

可以看到put方法和 offer/add方法的区别了:

  • offer/add:如果队列满了,直接返回false。
  • put:如果队列满了,当前线程被阻塞,等待唤醒。
 public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
  1. 开启排他锁。
  2. 如果count==0,直接返回false,否则执行dequeue出队操作。
  3. 释放排他锁。

我们来看dequeue方法:

 private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x =  items[takeIndex];//获得元素的值 items[takeIndex] = null;//把null赋值给items[takeIndex] if (  takeIndex == items.length)//如果takeIndex自增后的值== items.length,就把0赋值给takeIndex takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal();//唤醒因为调用notFull的await方法而被阻塞的线程 return x; }
  1. 获取元素的值,takeIndex保存的是出队的下标。
  2. 把null赋值给items[takeIndex],也就是清空被弹出的元素。
  3. 如果takeIndex自增后的值== items.length,就把0赋值给takeIndex。
  4. count--。
  5. 唤醒因为调用notFull的await方法而被阻塞的线程。

这里调用了notFull的signal方法来唤醒因为调用notFull的await方法而被阻塞的线程,那到底在哪里调用了notFull的await方法呢,还记不记得在put方法中调用了notFull的await方法,我们再看看:

 while (count == items.length) notFull.await();

当队列满了,就调用 notFull.await()来等待,在出队操作中,又调用了notFull.signal()来唤醒。

 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
  1. 开启排他锁。
  2. 如果count==0,代表队列是空的,则调用notEmpty的await方法,用while循环是为了防止虚假唤醒。
  3. 执行出队操作。
  4. 释放排他锁。

这里调用了notEmpty的await方法,那么哪里调用了notEmpty的signal方法呢?在enqueue入队方法里。

我们可以看到take和poll的区别:

  • take:如果队列为空,会阻塞,直到被唤醒了。
  • poll: 如果队列为空,直接返回null。
 public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); } finally { lock.unlock(); } }

 final E itemAt { return  items[i]; }
  1. 开启排他锁。
  2. 获得元素。
  3. 释放排他锁。

我们可以看到peek和poll/take的区别:

  • peek,只是获取元素,不会清空元素。
  • poll/take,获取并清空元素。
 public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return count; } finally { lock.unlock(); } }
  1. 开启排他锁。
  2. 返回count。
  3. 释放排他锁。
ArrayBlockingQueue(int capacity)
 public ArrayBlockingQueue(int capacity) { this(capacity, false); }

这是最常用的构造方法,传入capacity,capacity是容量的意思,也就是ArrayBlockingQueue的最大长度,方法内部直接调用了第二个构造方法,传入的第二个参数为false。

ArrayBlockQueue源码解析

ArrayBlockQueue提供了三个构造方法,如下图所示:

彩民之家论坛9066777 42image.png

ArrayBlockingQueue(int capacity, boolean fair)
 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock; notEmpty = lock.newCondition(); notFull = lock.newCondition(); }

这个构造方法接受两个参数,分别是capacity和fair,fair是boolean类型的,代表是公平锁,还是非公平锁,可以看出如果我们用第一个构造方法来创建ArrayBlockingQueue的话,采用的是非公平锁,因为公平锁会损失一定的性能,在没有充足的理由的情况下,是没有必要采用公平锁的。

方法内部做了几件事情:

  1. 创建Object类型的数组,容量为capacity,并且赋值给当前类对象的items。
  2. 创建排他锁。
  3. 创建条件变量notEmpty 。
  4. 创建条件变量notFull。

至于排他锁和两个条件变量是做用什么的,看到后面就明白了。

彩民之家论坛9066777 43image.png彩民之家论坛9066777 44image.png

ArrayBlockQueue基本使用

public static void main(String[] args) throws InterruptedException { ArrayBlockingQueue<Integer> arrayBlockingQueue=new ArrayBlockingQueue; arrayBlockingQueue.offer; arrayBlockingQueue.offer; arrayBlockingQueue.add; arrayBlockingQueue.add; System.out.println(arrayBlockingQueue); System.out.println(arrayBlockingQueue.poll; System.out.println(arrayBlockingQueue); System.out.println(arrayBlockingQueue.take; System.out.println(arrayBlockingQueue); System.out.println(arrayBlockingQueue.peek; System.out.println(arrayBlockingQueue); }

运行结果:

彩民之家论坛9066777 45image.png

  1. 创建了一个长度为5的ArrayBlockQueue。
  2. 用offer方法,向ArrayBlockQueue添加了两个元素,分别是10,50。
  3. 用put方法,向ArrayBlockQueue添加了两个元素,分别是20,60。
  4. 打印出ArrayBlockQueue,结果是10,50,20,60。
  5. 用poll方法,弹出ArrayBlockQueue第一个元素,并且打印出来:10。
  6. 打印出ArrayBlockQueue,结果是50,20,60。
  7. 用take方法,弹出ArrayBlockQueue第一个元素,并且打印出来:50。
  8. 打印出ArrayBlockQueue,结果是20,60。
  9. 用peek方法,弹出ArrayBlockQueue第一个元素,并且打印出来:20。
  10. 打印出ArrayBlockQueue,结果是20,60。

代码比较简单,但是你肯定会有疑问

  • offer/add(在上面的代码中没有演示)/put都是往队列里面添加元素,区别是什么?
  • poll/take/peek都是弹出队列的元素,区别是什么?
  • 底层代码是如何保证线程安全的?
  • 数据保存在哪里?

要解决上面几个疑问,最好的办法当然是看下源码,通过亲自阅读源码所产生的印象远远要比看视频,看博客,死记硬背最后的结论要深刻的多。就算真的忘记了,只要再看看源码,瞬间可以回忆起来。

总结

至此,ArrayBlockQueue的核心源码就分析完毕了,我们来做一个总结:

  • ArrayBlockQueue有几个比较重要的字段,分别是items,保存的是队列的数据,putIndex保存的是入队的下标,takeIndex保存的是出队的下标,count用来统计队列元素的个数,lock用来保证线程的安全性,notEmpty和notFull两个条件变量实现唤醒和阻塞。
  • offer和add是一样的,其中add方法内部调用的就是offer方法,如果队列满了,直接返回false。
  • put,如果队列满了,会被阻塞。
  • peek,只是弹出元素,不会清空元素。
  • poll,弹出并清空元素,如果队列为空,直接返回null。
  • take,弹出并清空元素,如果队列为空,会被阻塞。
版权声明:本文由彩民之家高手论坛发布于编程技术,转载请注明出处:JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQu