betacode

Руководство Java BlockingQueue

Следуйте за нами на нашей фан-странице, чтобы получать уведомления каждый раз, когда появляются новые статьи. Facebook

1- BlockingQueue

BlockingQueue - это подинтерфейс of Queue, который предоставляет дополнительные операции и полезен в ситуациях, когда очередь пуста или заполнена элементами.

public interface BlockingQueue<E> extends Queue<E>
Разница между Queue и BlockingQueue проявляется в методах, которые они предоставляют:
Interface Queue<E>  
BlockingQueue<E>
Action Throws exception Special value Blocks Times out
Insert boolean add(e) boolean offer(e) void put(e) boolean offer(e, time, unit)
Remove E remove() E poll() E take() E poll(time, unit)
Examine E element() E peek()    
take()/poll(time,unit)
Как мы знаем, методы remove(), element(), poll() и peek() интерфейса Queue возвращают элемент во главе очереди, который либо немедленно вызовет исключение, либо вернет значение null, если очередь не содержит никаких элементов. Такие операции недостаточно хороши в многопоточной среде, поэтому интерфейс BlockingQueue предоставляет новые методы take() и poll(time,unit).
  • take(): Возвращает верхний элемент и удаляет его из очереди. Если очередь пуста, метод будет ждать, пока элемент не станет доступен в очереди.
  • poll(timeout,unit): Возвращает головной элемент и удаляет его из очереди. Если очередь пуста, метод будет ждать, пока элемент будет доступен в течение указанного промежутка времени. Если время ожидания заканчивается без доступных элементов, метод вернет значение null.
put(e)/offer(e,time,unit)
Методы add(e) и offer(e) интерфейса Queue используются для добавления элемента в очередь. Они либо немедленно выдадут исключение, либо вернут значение false, если очередь заполнена. Интерфейс BlockingQueue предоставляет методы put(e) и offer(e,timeout,unit) для той же цели, но у них есть более специальные функции.
  • put(e): Вставить элемент в очередь. Если очередь заполнена, этот метод будет ждать, пока не появится свободное место для вставки.
  • offer(e,timeout,unit):  Вставить элемент в очередь. Если очередь заполнена, метод будет ждать, пока освободится место для вставки в течение указанного промежутка времени. Если тайм-аут заканчивается без свободного места, никаких действий предпринято не будет, и метод вернет значение false.
Иерархия классов и интерфейсов, связанная с интерфейсом BlockingQueue:
Характеристики BlockingQueue:
  • BlockingQueue не принимает элементы null, если вы намеренно добавите элемент null в эту очередь, будет выдано исключение NullPointerException.
  • BlockingQueue может быть ограничена по емкости. Метод remainingCapacity() возвращает оставшуюся емкость этой очереди или Integer.MAX_VALUE, если емкость очереди не ограничена.
  • BlockingQueue обычно используется в приложениях типа Producer & Consumer (производитель и потребитель). BlockingQueue является потомком интерфейса Collection, поэтому также поддерживается метод remove(e). Однако такие методы работают неэффективно и только для случайного использования. Например, удалите дефектный товар из очереди.
  • BlockingQueue - это потокобезопасная очередь (thread-safe). Все методы очереди являются атомарными операциями (Atomic Operations). Однако методы, унаследованные от интерфейса Collection, такие как addAll, containsAll, retainAll и removeAll , не обязательно являются атомарными операциями, это зависит от класса, реализующего интерфейс BockingQueue. Таким образом, возможно, например, вызов addAll(aCollection) может вызвать исключение, если другой поток одновременно добавляет элемент aCollection.
  • BlockingQueue не поддерживает такие методы, как "close" (закрыть) или  "shutdown" (завершение работы), например, когда Producer (производитель) хочет отправить сигнал о том, что в очередь больше не будут добавлены "продукты". Необходимость и использование этих функций, как правило, зависят от реализации. Решение может быть следующим: Конечный и специальный "продукт" добавляется в очередь в качестве сигнала, сообщающего Consumer (потребителю), что это последний продукт, добавляемый в очередь.
Смотрите также:

2- BlockingQueue methods

Список методов интерфейса BlockingQueue<E>:

void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
int drainTo(Collection<? super E> c);  
int drainTo(Collection<? super E> c, int maxElements);
Методы, унаследованные от интерфейса Queue<E>:

boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
Методы, унаследованные от интерфейса Collection<E>:

int size();
boolean isEmpty();
boolean contains(Object o);
Iterator<E> iterator();
Object[] toArray();
<T> T[] toArray(T[] a);
boolean add(E e);
boolean remove(Object o);
boolean containsAll(Collection<?> c);  
boolean addAll(Collection<? extends E> c);
boolean removeAll(Collection<?> c);
boolean retainAll(Collection<?> c);
void clear();  
boolean equals(Object o);
int hashCode();

default <T> T[] toArray(IntFunction<T[]> generator)  
default boolean removeIf(Predicate<? super E> filter)
default Spliterator<E> spliterator()  
default Stream<E> stream()  
default Stream<E> parallelStream() 

3- Например

Модель Producer/Consumer (производитель и потребитель) является хорошим примером использования интерфейса BlockingQueue. Продукты, созданные производителями, добавляются в очередь до того, как их заберут потребители.
  • thread Producer вызывают метод BlockingQueue.put(e), чтобы добавить продукты в BlockingQueue. Если очередь заполнена, метод put(e) будет ждать, пока не освободится свободное место.
  • thread Consumer  вызывают метод BlockingQueue.take() для извлечения продуктов из очереди. Если очередь пуста, этот метод будет ждать, пока продукт не будет доступен.
Смотрите полный код примера:
Класс  Product имитирует продукт.
Product.java

package org.o7planning.blockingqueue.ex;

public class Product {
    private String name;
    private int serial;

    public Product(String name, int serial) {
        this.name = name;
        this.serial = serial;
    }
    public String getInfo() {
        return "Product: " + this.name + ". Serial: " + this.serial;
    }
}
Класс Consumer имитирует потребителя.
Consumer.java

package org.o7planning.blockingqueue.ex;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
    private String consumerName;
    private final BlockingQueue<Product> queue;

    public Consumer(String consumerName, BlockingQueue<Product> q) {
        this.consumerName = consumerName;
        this.queue = q;
    }
    @Override
    public void run() {
        try {
            while (true) {
                this.consume(queue.take());
            }
        } catch (InterruptedException ex) {
        }
    }  
    private void consume(Product x) {
        System.out.println(" --> " + this.consumerName + " >> Consume: " + x.getInfo());
    }
}
Класс Producer имитирует производителя.
Producer.java

package org.o7planning.blockingqueue.ex;

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {
    private static int serial = 1;

    private final String producerName;
    private final BlockingQueue<Product> queue;
    private final int delay; // Seconds

    public Producer(String producerName, int delay, BlockingQueue<Product> q) {
        this.producerName = producerName;
        this.delay = delay;
        this.queue = q;
    }
    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(this.delay * 1000); // 'delay' second.
                this.queue.put(this.produce());
            }
        } catch (InterruptedException ex) {
        }
    }
    private Product produce() {
        System.out.println("#" + this.producerName + " >> Create a new product!");
        return new Product("IPhone", serial++);
    }
}
Класс Setup используется для управления системой Producer/Consumer:
Setup.java

package org.o7planning.blockingqueue.ex;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Setup {
    public static void main(String[] args) {
        // Create a BlockingQueue with a capacity of 5.
        BlockingQueue<Product> q = new ArrayBlockingQueue<Product>(5);
        Producer producer1 = new Producer("Producer 01", 2, q);
        Producer producer2 = new Producer("Producer 02", 1, q);
        Consumer consumer1 = new Consumer("Consumer 01", q);
        Consumer consumer2 = new Consumer("Consumer 02", q);
        Consumer consumer3 = new Consumer("Consumer 03", q);

        // Starting the threads
        new Thread(producer1).start();
        new Thread(producer2).start();
        new Thread(consumer1).start();
        new Thread(consumer2).start();
        new Thread(consumer3).start();
    }
}
Запустите приведенный выше пример, и вы получите следующий результат:

#Producer 02 >> Create a new product!
 --> Consumer 03 >> Consume: Product: IPhone. Serial: 1
#Producer 01 >> Create a new product!
 --> Consumer 01 >> Consume: Product: IPhone. Serial: 2
#Producer 02 >> Create a new product!
 --> Consumer 02 >> Consume: Product: IPhone. Serial: 3
#Producer 02 >> Create a new product!
 --> Consumer 03 >> Consume: Product: IPhone. Serial: 4
#Producer 01 >> Create a new product!
 --> Consumer 01 >> Consume: Product: IPhone. Serial: 5
#Producer 02 >> Create a new product!
 --> Consumer 02 >> Consume: Product: IPhone. Serial: 6
#Producer 02 >> Create a new product!
 --> Consumer 03 >> Consume: Product: IPhone. Serial: 7
...

4- drainTo(Collection<? super E>)


int drainTo(Collection<? super E> c); 
Удаляет все элементы из этой BlockingQueue и добавляет их в указанную Collection. Использование этого метода более эффективно, чем многократный вызов функции poll() или remove().
Метод drainTo(Collection) гарантирует, что либо все элементы будут успешно перемещены в Collection, либо никакие элементы не будут перемещены в Collection в случае возникновения ошибки.

5- drainTo(Collection<? super E>, int)


int drainTo(Collection<? super E> c, int maxElements);
Удаляет элемент maxElements из этой BlockingQueue и добавляет их в указанную Collection. Использование этого метода более эффективно, чем многократный вызов функции poll() или remove().
Если произойдет ошибка, ни один элемент не будет удален из этой BlockingQueue и ни один элемент не будет добавлен в Collection.

6- offer(E, long, TimeUnit)


boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
Вставляет указанный элемент в очередь. Если очередь заполнена, метод будет ждать, пока освободится место для вставки, в течение указанного промежутка времени. Если тайм-аут заканчивается без свободного места, никаких действий предпринято не будет, и метод вернет значение false.
Например:

queue.offer(e, 5, TimeUnit.HOURS); // 5 hours.

7- poll(long, TimeUnit)


E poll(long timeout, TimeUnit unit) throws InterruptedException;
Возвращает верхний элемент и удаляет его из очереди. Если очередь пуста, метод будет ждать, пока элемент будет доступен в течение указанного промежутка времени. Если время ожидания заканчивается без доступных элементов, метод вернет значение null.
Например:

E e = queue.offer(2, TimeUnit.HOURS); // 2 hours

8- put(E e)


void put(E e) throws InterruptedException;
Вставьте элемент в очередь. Если очередь заполнена, этот метод будет ждать, пока не появится свободное место для вставки.

9- remainingCapacity()


int remainingCapacity();
Возвращает оставшуюся емкость этой очереди или Integer.MAX_VALUE, если вместимость очереди не ограничена.
Класс ArrayBlockingQueue позволяет создавать BlockingQueue с указанием максимального количества элементов.

10- take()


E take() throws InterruptedException;
Возвращает  верхний элемент и удаляет его из очереди. Если очередь пуста, метод будет ждать, пока элемент не станет доступен в очереди.