Руководство Java BlockingQueue
1. BlockingQueue
BlockingQueue - это подинтерфейс of Queue, который предоставляет дополнительные операции и полезен в ситуациях, когда очередь пуста или заполнена элементами.
public interface BlockingQueue<E> extends Queue<E>
Разница между Queue и BlockingQueue проявляется в методах, которые они предоставляют:
Interface | Queue<E> | |||
Interface | BlockingQueue<E> | |||
Action | Throws exception | Special value | Blocks | Times out |
Insert | boolean add(e) | boolean offer(e) | void put(e) | boolean offer(e, time, unit) |
Remove | E remove() | E poll() | E take() | E poll(time, unit) |
Examine | E element() | E peek() |
take()/poll(time,unit)
Как мы знаем, методы remove(), element(), poll() и peek() интерфейса Queue возвращают элемент во главе очереди, который либо немедленно вызовет исключение, либо вернет значение null, если очередь не содержит никаких элементов. Такие операции недостаточно хороши в многопоточной среде, поэтому интерфейс BlockingQueue предоставляет новые методы take() и poll(time,unit).
- take(): Возвращает верхний элемент и удаляет его из очереди. Если очередь пуста, метод будет ждать, пока элемент не станет доступен в очереди.
- poll(timeout,unit): Возвращает головной элемент и удаляет его из очереди. Если очередь пуста, метод будет ждать, пока элемент будет доступен в течение указанного промежутка времени. Если время ожидания заканчивается без доступных элементов, метод вернет значение null.
put(e)/offer(e,time,unit)
Методы add(e) и offer(e) интерфейса Queue используются для добавления элемента в очередь. Они либо немедленно выдадут исключение, либо вернут значение false, если очередь заполнена. Интерфейс BlockingQueue предоставляет методы put(e) и offer(e,timeout,unit) для той же цели, но у них есть более специальные функции.
- put(e): Вставить элемент в очередь. Если очередь заполнена, этот метод будет ждать, пока не появится свободное место для вставки.
- offer(e,timeout,unit): Вставить элемент в очередь. Если очередь заполнена, метод будет ждать, пока освободится место для вставки в течение указанного промежутка времени. Если тайм-аут заканчивается без свободного места, никаких действий предпринято не будет, и метод вернет значение false.
Иерархия классов и интерфейсов, связанная с интерфейсом BlockingQueue:
- Queue
- Deque
- BlockingDeque
- TransferQueue
- SynchronousQueue
- PriorityBlockingQueue
- LinkedBlockingQueue
- ArrayBlockingQueue
- DelayQueue
Характеристики BlockingQueue:
- BlockingQueue не принимает элементы null, если вы намеренно добавите элемент null в эту очередь, будет выдано исключение NullPointerException.
- BlockingQueue может быть ограничена по емкости. Метод remainingCapacity() возвращает оставшуюся емкость этой очереди или Integer.MAX_VALUE, если емкость очереди не ограничена.
- BlockingQueue обычно используется в приложениях типа Producer & Consumer (производитель и потребитель). BlockingQueue является потомком интерфейса Collection, поэтому также поддерживается метод remove(e). Однако такие методы работают неэффективно и только для случайного использования. Например, удалите дефектный товар из очереди.
- BlockingQueue - это потокобезопасная очередь (thread-safe). Все методы очереди являются атомарными операциями (Atomic Operations). Однако методы, унаследованные от интерфейса Collection, такие как addAll, containsAll, retainAll и removeAll , не обязательно являются атомарными операциями, это зависит от класса, реализующего интерфейс BockingQueue. Таким образом, возможно, например, вызов addAll(aCollection) может вызвать исключение, если другой поток одновременно добавляет элемент aCollection.
- BlockingQueue не поддерживает такие методы, как "close" (закрыть) или "shutdown" (завершение работы), например, когда Producer (производитель) хочет отправить сигнал о том, что в очередь больше не будут добавлены "продукты". Необходимость и использование этих функций, как правило, зависят от реализации. Решение может быть следующим: Конечный и специальный "продукт" добавляется в очередь в качестве сигнала, сообщающего Consumer (потребителю), что это последний продукт, добавляемый в очередь.
Смотрите также:
- The concept of Atomic operations in computer science
2. BlockingQueue methods
Список методов интерфейса BlockingQueue<E>:
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
Методы, унаследованные от интерфейса Queue<E>:
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
Методы, унаследованные от интерфейса Collection<E>:
int size();
boolean isEmpty();
boolean contains(Object o);
Iterator<E> iterator();
Object[] toArray();
<T> T[] toArray(T[] a);
boolean add(E e);
boolean remove(Object o);
boolean containsAll(Collection<?> c);
boolean addAll(Collection<? extends E> c);
boolean removeAll(Collection<?> c);
boolean retainAll(Collection<?> c);
void clear();
boolean equals(Object o);
int hashCode();
default <T> T[] toArray(IntFunction<T[]> generator)
default boolean removeIf(Predicate<? super E> filter)
default Spliterator<E> spliterator()
default Stream<E> stream()
default Stream<E> parallelStream()
- Queue
- Collection
3. Например
Модель Producer/Consumer (производитель и потребитель) является хорошим примером использования интерфейса BlockingQueue. Продукты, созданные производителями, добавляются в очередь до того, как их заберут потребители.
- thread Producer вызывают метод BlockingQueue.put(e), чтобы добавить продукты в BlockingQueue. Если очередь заполнена, метод put(e) будет ждать, пока не освободится свободное место.
- thread Consumer вызывают метод BlockingQueue.take() для извлечения продуктов из очереди. Если очередь пуста, этот метод будет ждать, пока продукт не будет доступен.
Смотрите полный код примера:
Класс Product имитирует продукт.
Product.java
package org.o7planning.blockingqueue.ex;
public class Product {
private String name;
private int serial;
public Product(String name, int serial) {
this.name = name;
this.serial = serial;
}
public String getInfo() {
return "Product: " + this.name + ". Serial: " + this.serial;
}
}
Класс Consumer имитирует потребителя.
Consumer.java
package org.o7planning.blockingqueue.ex;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable {
private String consumerName;
private final BlockingQueue<Product> queue;
public Consumer(String consumerName, BlockingQueue<Product> q) {
this.consumerName = consumerName;
this.queue = q;
}
@Override
public void run() {
try {
while (true) {
this.consume(queue.take());
}
} catch (InterruptedException ex) {
}
}
private void consume(Product x) {
System.out.println(" --> " + this.consumerName + " >> Consume: " + x.getInfo());
}
}
Класс Producer имитирует производителя.
Producer.java
package org.o7planning.blockingqueue.ex;
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private static int serial = 1;
private final String producerName;
private final BlockingQueue<Product> queue;
private final int delay; // Seconds
public Producer(String producerName, int delay, BlockingQueue<Product> q) {
this.producerName = producerName;
this.delay = delay;
this.queue = q;
}
@Override
public void run() {
try {
while (true) {
Thread.sleep(this.delay * 1000); // 'delay' second.
this.queue.put(this.produce());
}
} catch (InterruptedException ex) {
}
}
private Product produce() {
System.out.println("#" + this.producerName + " >> Create a new product!");
return new Product("IPhone", serial++);
}
}
Класс Setup используется для управления системой Producer/Consumer:
Setup.java
package org.o7planning.blockingqueue.ex;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class Setup {
public static void main(String[] args) {
// Create a BlockingQueue with a capacity of 5.
BlockingQueue<Product> q = new ArrayBlockingQueue<Product>(5);
Producer producer1 = new Producer("Producer 01", 2, q);
Producer producer2 = new Producer("Producer 02", 1, q);
Consumer consumer1 = new Consumer("Consumer 01", q);
Consumer consumer2 = new Consumer("Consumer 02", q);
Consumer consumer3 = new Consumer("Consumer 03", q);
// Starting the threads
new Thread(producer1).start();
new Thread(producer2).start();
new Thread(consumer1).start();
new Thread(consumer2).start();
new Thread(consumer3).start();
}
}
Запустите приведенный выше пример, и вы получите следующий результат:
#Producer 02 >> Create a new product!
--> Consumer 03 >> Consume: Product: IPhone. Serial: 1
#Producer 01 >> Create a new product!
--> Consumer 01 >> Consume: Product: IPhone. Serial: 2
#Producer 02 >> Create a new product!
--> Consumer 02 >> Consume: Product: IPhone. Serial: 3
#Producer 02 >> Create a new product!
--> Consumer 03 >> Consume: Product: IPhone. Serial: 4
#Producer 01 >> Create a new product!
--> Consumer 01 >> Consume: Product: IPhone. Serial: 5
#Producer 02 >> Create a new product!
--> Consumer 02 >> Consume: Product: IPhone. Serial: 6
#Producer 02 >> Create a new product!
--> Consumer 03 >> Consume: Product: IPhone. Serial: 7
...
4. drainTo(Collection<? super E>)
int drainTo(Collection<? super E> c);
Удаляет все элементы из этой BlockingQueue и добавляет их в указанную Collection. Использование этого метода более эффективно, чем многократный вызов функции poll() или remove().
Метод drainTo(Collection) гарантирует, что либо все элементы будут успешно перемещены в Collection, либо никакие элементы не будут перемещены в Collection в случае возникновения ошибки.
5. drainTo(Collection<? super E>, int)
int drainTo(Collection<? super E> c, int maxElements);
Удаляет элемент maxElements из этой BlockingQueue и добавляет их в указанную Collection. Использование этого метода более эффективно, чем многократный вызов функции poll() или remove().
Если произойдет ошибка, ни один элемент не будет удален из этой BlockingQueue и ни один элемент не будет добавлен в Collection.
6. offer(E, long, TimeUnit)
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
Вставляет указанный элемент в очередь. Если очередь заполнена, метод будет ждать, пока освободится место для вставки, в течение указанного промежутка времени. Если тайм-аут заканчивается без свободного места, никаких действий предпринято не будет, и метод вернет значение false.
Например:
queue.offer(e, 5, TimeUnit.HOURS); // 5 hours.
7. poll(long, TimeUnit)
E poll(long timeout, TimeUnit unit) throws InterruptedException;
Возвращает верхний элемент и удаляет его из очереди. Если очередь пуста, метод будет ждать, пока элемент будет доступен в течение указанного промежутка времени. Если время ожидания заканчивается без доступных элементов, метод вернет значение null.
Например:
E e = queue.offer(2, TimeUnit.HOURS); // 2 hours
8. put(E e)
void put(E e) throws InterruptedException;
Вставьте элемент в очередь. Если очередь заполнена, этот метод будет ждать, пока не появится свободное место для вставки.
Руководства Java Collections Framework
- Руководство Java PriorityBlockingQueue
- Руководство Java Collections Framework
- Руководство Java SortedSet
- Руководство Java List
- Руководство Java Iterator
- Руководство Java NavigableSet
- Руководство Java ListIterator
- Руководство Java ArrayList
- Руководство Java CopyOnWriteArrayList
- Руководство Java LinkedList
- Руководство Java Set
- Руководство Java TreeSet
- Руководство Java CopyOnWriteArraySet
- Руководство Java Queue
- Руководство Java Deque
- Руководство Java IdentityHashMap
- Руководство Java WeakHashMap
- Руководство Java Map
- Руководство Java SortedMap
- Руководство Java NavigableMap
- Руководство Java HashMap
- Руководство Java TreeMap
- Руководство Java PriorityQueue
- Руководство Java BlockingQueue
- Руководство Java ArrayBlockingQueue
- Руководство Java TransferQueue
Show More