betacode

Руководство Java PriorityBlockingQueue

Следуйте за нами на нашей фан-странице, чтобы получать уведомления каждый раз, когда появляются новые статьи. Facebook

1- PriorityBlockingQueue

PriorityBlockingQueue<E> - это класс, представляющий очередь с элементами, отсортированными по приоритету, аналогично классу PriorityQueue<E>. Разница в том, что он реализует интерфейс BlockingQueue<E>, поэтому имеет дополнительные функции, определенные этим интерфейсом.
Вы должны просмотреть статью об интерфейсе BlockingQueue, чтобы более подробно ознакомиться с функциями этого интерфейса с основными примерами.

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
             implements BlockingQueue<E>, java.io.Serializable
Характеристики of PriorityBlockingQueue:
  • Очередь с неограниченной вместимостью.
  • Повторяющиеся элементы разрешены, но элементы null не разрешены.
  • Элементы сортируются в порядке возрастания в соответствии с поставляемым Comparator (компаратором) или элементы сортируются в их естественном порядке (в зависимости от используемого конструктора).
  • PriorityBlockingQueue и его Iterator поддерживают все дополнительные методы, указанные в интерфейсах Collection и Iterator.
По сути, PriorityBlockingQueue управляет внутренним массивом для хранения элементов, который может быть заменен новым массивом, если количество элементов очереди увеличивается.
Объект Iterator, полученный из метода iterator() и объект Spliterator, полученный из метода spliterator(), не гарантируют, что элементы этой очереди будут проходить в определенном порядке. Если вы хотите, рассмотрите возможность использования Arrays.sort(queue.toArray()).
Например:
PriorityBlockingQueue_traverse_ex1.java

// Create a queue that sorts its elements in natural order.
BlockingQueue<String> queue = new PriorityBlockingQueue<>();
queue.add("Rose");
queue.add("Lotus");
queue.add("Jasmine");
queue.add("Sunflower");
queue.add("Daisy");

System.out.println(queue); // [Daisy, Jasmine, Lotus, Sunflower, Rose] (Not ordered)
String[] array = new String[queue.size()];
queue.toArray(array);
Arrays.sort(array);
for(String flower: array)  {
    System.out.println(flower);
}
Output:

[Daisy, Jasmine, Lotus, Sunflower, Rose]
Daisy
Jasmine
Lotus
Rose
Sunflower

2- Constructors


public PriorityBlockingQueue()
public PriorityBlockingQueue(int initialCapacity)  
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)  
public PriorityBlockingQueue(Collection<? extends E> c) 
Смотрите подробное объяснение каждого конструктора ниже.

3- Например:

Основные примеры см. также в статье о BockingQueue.
Как известно, модель Producer/Consumer (Производитель/потребитель) является хорошим примером использования интерфейса BlockingQueue. Продукты, созданные производителями, добавляются в очередь до того, как они будут приняты потребителями. В некоторых случаях необходимо использовать очередь приоритетов. Например, продукты с коротким сроком годности должны иметь более высокий приоритет, чтобы быть потребленными потребителями в ближайшее время.
Давайте рассмотрим простой пример:
Класс Product представляет продукты с указанием названия и срока годности:
Product.java

package org.o7planning.priorityblockingqueue.ex;

public class Product {
    private String name;
    private int shelfLife;

    public Product(String name, int shelfLife) {
        this.name = name;
        this.shelfLife = shelfLife;
    }  
    public int getShelfLife() {
        return shelfLife;
    }
    @Override
    public String toString() {
        return this.name + ":" + this.shelfLife;
    }
}
Класс ProductComparator используется для сравнения объектов Product. Продукт с большим сроком годности считается большим. Продукт с наименьшим сроком годности будет стоять во главе очереди.
ProductComparator.java

package org.o7planning.priorityblockingqueue.ex;

import java.util.Comparator;

public class ProductComparator implements Comparator<Product> {

    @Override
    public int compare(Product o1, Product o2) {
        return o1.getShelfLife() - o2.getShelfLife();
    }
}
  • TODO Link?
Producer.java

package org.o7planning.priorityblockingqueue.ex;

import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {  
    private final String producerName;
    private final BlockingQueue<Product> queue;
    private final int delay; // Seconds

    public Producer(String producerName, int delay, BlockingQueue<Product> q) {
        this.producerName = producerName;
        this.delay = delay;
        this.queue = q;
    }
    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(this.delay * 1000); // 'delay' seconds.
                Product newProduct = this.produce();
                System.out.println("\n#" + this.producerName + " >> New Product: " + newProduct);
                this.queue.put(newProduct);
                // Printed results may not be sorted (***):
                System.out.println("  Current products in queue: " + this.queue + " (***)");
            }
        } catch (InterruptedException ex) {
        }
    }
    private Product produce() {
        int shelfLife = new Random().nextInt(3) + 3;
        return new Product("Apple", shelfLife);
    }
}
Consumer.java

package org.o7planning.priorityblockingqueue.ex;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
    private String consumerName;
    private final BlockingQueue<Product> queue;

    public Consumer(String consumerName, BlockingQueue<Product> q) {
        this.consumerName = consumerName;
        this.queue = q;
    }
    @Override
    public void run() {
        try {
            while (true) {
                this.consume(queue.take());
            }
        } catch (InterruptedException ex) {
        }
    }
    // Need 1 seconds to consume a product.
    private void consume(Product x) throws InterruptedException {
        System.out.println(" --> " + this.consumerName + " >> Consuming: " + x);
        Thread.sleep(1 * 1000); // 1 seconds.
    }
}
Setup.java

package org.o7planning.priorityblockingqueue.ex;

import java.util.Comparator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;

public class Setup {
    public static void main(String[] args) {
        // Comparator
        Comparator<Product> comparator = new ProductComparator();
        // Create a PriorityBlockingQueue with a capacity of 100.
        BlockingQueue<Product> queue = new PriorityBlockingQueue<Product>(100, comparator);
        queue.add(new Product("Banana", 5));
        queue.add(new Product("Banana", 2));
        queue.add(new Product("Banana", 7));
        queue.add(new Product("Banana", 3));
        queue.add(new Product("Banana", 1));  
        
        Producer producer1 = new Producer("Producer 01", 2, queue);
        Producer producer2 = new Producer("Producer 02", 3, queue);
        
        Consumer consumer1 = new Consumer("Consumer 01", queue);
        Consumer consumer2 = new Consumer("Consumer 02", queue);
        Consumer consumer3 = new Consumer("Consumer 03", queue);

        // Starting Producer threads
        new Thread(producer1).start();
        new Thread(producer2).start();
        // Starting Consumer threads
        new Thread(consumer1).start();
    }
}
(***) Примечание: В этом примере мы используем метод System.out.println(queue) для распечатки всех элементов очереди. Однако этот метод не гарантирует порядок элементов.
Output:

--> Consumer 01 >> Consuming: Banana:1
--> Consumer 01 >> Consuming: Banana:2

#Producer 01 >> New Product: Apple:4
Current products in queue: [Banana:3, Apple:4, Banana:7, Banana:5] (***)
--> Consumer 01 >> Consuming: Banana:3

#Producer 02 >> New Product: Apple:3
Current products in queue: [Apple:3, Apple:4, Banana:7, Banana:5] (***)
--> Consumer 01 >> Consuming: Apple:3

#Producer 01 >> New Product: Apple:5
Current products in queue: [Apple:4, Banana:5, Banana:7, Apple:5] (***)
--> Consumer 01 >> Consuming: Apple:4
--> Consumer 01 >> Consuming: Apple:5

#Producer 02 >> New Product: Apple:4
Current products in queue: [Apple:4, Banana:7, Banana:5] (***)

#Producer 01 >> New Product: Apple:5
Current products in queue: [Apple:4, Apple:5, Banana:5, Banana:7] (***)
--> Consumer 01 >> Consuming: Apple:4
--> Consumer 01 >> Consuming: Apple:5
...

4- PriorityBlockingQueue()


public PriorityBlockingQueue()
Создайте PriorityBlockingQueue с начальной емкостью по умолчанию (11). Элементы этой очереди будут отсортированы в их естественном порядке, что требует, чтобы все элементы были объектами интерфейса Comparable.
  • TODO Link?

5- PriorityBlockingQueue(int)


public PriorityBlockingQueue(int initialCapacity)  
Создает PriorityBlockingQueue с указанной начальной емкостью. Элементы этой очереди будут отсортированы в их естественном порядке, что требует, чтобы все элементы были объектами интерфейса Comparable.
  • TODO Link?

6- PriorityBlockingQueue(int, Comparator)


public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) 
Создайте PriorityBlockingQueue с заданным начальным значением, и для упорядочения ее элементов предусмотрен Comparator. Если comparator равен null, будет использоваться естественный порядок элементов, для этого требуется, чтобы все элементы были объектами интерфейса Comparable.

7- PriorityBlockingQueue(Collection)


public PriorityBlockingQueue(Collection<? extends E> c)
Создает PriorityBlockingQueue, содержащую все элементы указанной Collection.
Если указанная Collection является SortedSet или PriorityQueue, PriorityBlockingQueue будет использоваться тот же Comparator. В противном случае будет использоваться естественный порядок элементов, который требует, чтобы все элементы были объектами интерфейса Comparable.

8- Methods

Методы, унаследованные от интерфейса BockingQueue<E>:

void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
int drainTo(Collection<? super E> c);  
int drainTo(Collection<? super E> c, int maxElements);
Смотрите статью о BlockingQueue, чтобы узнать, как использовать описанные выше методы.
Методы, унаследованные от интерфейса Queue<E>:

boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
Методы, унаследованные от интерфейса Collection<E>:

int size();
boolean isEmpty();
boolean contains(Object o);
Iterator<E> iterator();
Object[] toArray();
<T> T[] toArray(T[] a);
boolean add(E e);
boolean remove(Object o);
boolean containsAll(Collection<?> c);  
boolean addAll(Collection<? extends E> c);
boolean removeAll(Collection<?> c);
boolean retainAll(Collection<?> c);
void clear();  
boolean equals(Object o);
int hashCode();

default <T> T[] toArray(IntFunction<T[]> generator)  
default boolean removeIf(Predicate<? super E> filter)
default Spliterator<E> spliterator()  
default Stream<E> stream()  
default Stream<E> parallelStream()
  • TODO Link?