betacode

Руководство Java TransferQueue

  1. TransferQueue
  2. TransferQueue methods
  3. Например
  4. getWaitingConsumerCount()
  5. hasWaitingConsumer()
  6. transfer(E)
  7. tryTransfer(E)
  8. tryTransfer(E, long, TimeUnit)

1. TransferQueue

Как подинтерфейс of BlockingQueue, TransferQueue обладает всеми функциями родительского интерфейса, кроме того, он предоставляет возможность Producer (производителю) подождать, пока Consumer (Потребитель) не получит "продукт" (элемент). TransferQueue полезна в некоторых типах приложений, таких как приложения для обмена сообщениями.
public interface TransferQueue<E> extends BlockingQueue<E>
По сравнению с BlockingQueue<E>, TransferQueue<E> предоставляет еще несколько методов, в том числе:
void transfer(E e) throws InterruptedException;
boolean tryTransfer(E e);
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
boolean hasWaitingConsumer();
int getWaitingConsumerCount();
transfer(e):
Добавьте элемент в эту TransferQueue и подождите, пока он не будет получен ожидающим потребителем с помощью метода BlockingQueue.take() или BlockingQueue.poll(timeout,unit).
tryTransfer(e):
Метод tryTransfer(e) добавляет элемент в эту TransferQueue только в том случае, если есть потребитель, ожидающий получения элемента с помощью метода BlockingQueue.take() или BlockingQueue.poll(timeout,unit), и убедитесь, что потребитель получит этот элемент немедленно. В противном случае метод возвращает значение false, и никакие другие действия не предпринимаются.
tryTransfer(e, timeout, unit):
Метод tryTransfer(e,timeout,unit) добавляет элемент в эту TransferQueue только в том случае, если в течение указанного периода ожидания потребитель ожидает получения элемента с помощью BlockingQueue.take() или BlockingQueue.poll(timeout,unit) и убедитесь, что потребитель получает этот элемент. В противном случае метод возвращает значение false, и никакие другие действия не предпринимаются.

2. TransferQueue methods

Методы определены в интерфейсе TransferQueue<E>:
void transfer(E e) throws InterruptedException;
boolean tryTransfer(E e);
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
boolean hasWaitingConsumer();
int getWaitingConsumerCount();
Методы, унаследованные от интерфейса BlockingQueue<E>:
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
int drainTo(Collection<? super E> c);  
int drainTo(Collection<? super E> c, int maxElements);
Методы, унаследованные от интерфейса Queue<E>:
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
Методы, унаследованные от интерфейса Collection<E>:
int size();
boolean isEmpty();
boolean contains(Object o);
Iterator<E> iterator();
Object[] toArray();
<T> T[] toArray(T[] a);
boolean add(E e);
boolean remove(Object o);
boolean containsAll(Collection<?> c);  
boolean addAll(Collection<? extends E> c);
boolean removeAll(Collection<?> c);
boolean retainAll(Collection<?> c);
void clear();  
boolean equals(Object o);
int hashCode();

default <T> T[] toArray(IntFunction<T[]> generator)  
default boolean removeIf(Predicate<? super E> filter)
default Spliterator<E> spliterator()  
default Stream<E> stream()  
default Stream<E> parallelStream()

3. Например

В приведенном ниже примере Producer отправляет сообщения Consumer через метод TransferQueue.transfer(e).
Глядя на вывод этого примера, вы увидите, что: Если все Consumer заняты получением сообщений (что означает, что ни один Consumer не находится в состоянии ожидания), то метод TransferQueue.transfer(e) будет заблокирован (перейдет в состояние ожидания).
TransferQueue_transfer_ex1.java
package org.o7planning.transferqueue.aa;

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;

public class TransferQueue_transfer_ex1 {

    public static void main(String[] args) {
        TransferQueue<String> queue = new LinkedTransferQueue<String>();

        queue.add("NORMAL-MESSAGE 1");
        queue.add("NORMAL-MESSAGE 2");
        queue.add("NORMAL-MESSAGE 3");

        Producer producer1 = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);

        // Start..
        producer1.start();
        
        consumer1.start();
        consumer2.start();
    }
}

class Producer extends Thread {
    final TransferQueue<String> queue;
    private static int count = 1;

    Producer(TransferQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                String message = "IMPORTANT-MESSAGE " + count++;
                System.out.println("[PRODUCER] Transfering: " + message);
                this.queue.transfer(message);
                System.out.println("[PRODUCER] Transfered: " + message + " (**)");
            }
        } catch (InterruptedException ex) {
        }
    }
}
class Consumer extends Thread {
    final TransferQueue<String> queue;

    Consumer(TransferQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                String message = queue.take();
                this.longConsume(message);
            }
        } catch (InterruptedException ex) {
        }
    }
    // Need 2 seconds to consume the message.
    private void longConsume(String message) throws InterruptedException  {
        System.out.println(" [CONSUMER] Consuming: " + message);
        Thread.sleep(2 * 1000); // 2 seconds.
        System.out.println(" [CONSUMER] Consumed: " + message);
    }
}
Output:
[PRODUCER] Transfering: IMPORTANT-MESSAGE 1
 [CONSUMER] Consuming: NORMAL-MESSAGE 1
 [CONSUMER] Consuming: NORMAL-MESSAGE 2
 [CONSUMER] Consumed: NORMAL-MESSAGE 1
 [CONSUMER] Consumed: NORMAL-MESSAGE 2
 [CONSUMER] Consuming: NORMAL-MESSAGE 3
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 1
[PRODUCER] Transfered: IMPORTANT-MESSAGE 1 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 2
 [CONSUMER] Consumed: NORMAL-MESSAGE 3
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 2
 [CONSUMER] Consumed: IMPORTANT-MESSAGE 1
[PRODUCER] Transfered: IMPORTANT-MESSAGE 2 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 3
[PRODUCER] Transfered: IMPORTANT-MESSAGE 3 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 4
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 3
 [CONSUMER] Consumed: IMPORTANT-MESSAGE 2
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 4
[PRODUCER] Transfered: IMPORTANT-MESSAGE 4 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 5
 [CONSUMER] Consumed: IMPORTANT-MESSAGE 3
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 5
[PRODUCER] Transfered: IMPORTANT-MESSAGE 5 (**)
...
В следующем примере показано, как использовать метод TransferQueue.tryTransfer(e). В этом примере Producer создает сообщения и пытается передать их ожидающему Consumer.
Посмотрев на вывод этого примера, вы увидите, что Producer создал множество сообщений, которые будут проигнорированы, потому что во время вызова метода TransferQueue.tryTransfer(e) , нет ожидания Consumer.
TransferQueue_tryTransfer_ex1.java
package org.o7planning.transferqueue.bb;

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;

public class TransferQueue_tryTransfer_ex1 {

    public static void main(String[] args) {
        TransferQueue<String> queue = new LinkedTransferQueue<String>();
        queue.add("NORMAL-MESSAGE 1");
        queue.add("NORMAL-MESSAGE 2");
        queue.add("NORMAL-MESSAGE 3");

        Producer producer1 = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);

        // Start..
        producer1.start();
        consumer1.start();
        consumer2.start();
    }
}
class Producer extends Thread {
    final TransferQueue<String> queue;
    private static int count = 1;

    public Producer(TransferQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                String message = "IMPORTANT-MESSAGE " + count++;
                this.queue.tryTransfer(message); // Calling tryTransfer method.
                Thread.sleep(1 * 1000); // 1 seconds.
            }
        } catch (InterruptedException ex) {
        }
    }
}
class Consumer extends Thread {
    final TransferQueue<String> queue;

    Consumer(TransferQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                String message = queue.take();
                System.out.println(">> " + message);
                Thread.sleep(3 * 1000); // 3 seconds
            }
        } catch (InterruptedException ex) {
        }
    }
}
Output:
>> NORMAL-MESSAGE 1
>> NORMAL-MESSAGE 2
>> NORMAL-MESSAGE 3
>> IMPORTANT-MESSAGE 4
>> IMPORTANT-MESSAGE 7
>> IMPORTANT-MESSAGE 8
>> IMPORTANT-MESSAGE 10
>> IMPORTANT-MESSAGE 11
>> IMPORTANT-MESSAGE 13
>> IMPORTANT-MESSAGE 14
>> IMPORTANT-MESSAGE 16
>> IMPORTANT-MESSAGE 17
Сообщения, созданные Producer, были проигнорированы:
  • IMPORTANT-MESSAGE 1
  • IMPORTANT-MESSAGE 2
  • IMPORTANT-MESSAGE 3
  • IMPORTANT-MESSAGE 5
  • IMPORTANT-MESSAGE 6
  • ...

4. getWaitingConsumerCount()

int getWaitingConsumerCount();
Возвращает предполагаемое количество потребителей, ожидающих получения элемента из этой TransferQueue с помощью метода BlockingQueue.take() или BlockingQueue.poll(timeout,unit).
Возвращаемое значение является приближением к текущему состоянию дел, которое может быть неточным, если потребители завершили или отказались от ожидания. Значение может быть полезно для мониторинга и эвристики, но не для управления синхронизацией. Реализации этого метода, вероятно, будут заметно медленнее, чем для hasWaitingConsumer().

5. hasWaitingConsumer()

boolean hasWaitingConsumer();
Возвращает значение true, если хотя бы один потребитель ожидает получения элемента с помощью метода BlockingQueue.take() или BlockingQueue.poll(timeout,unit). Возвращаемое значение представляет собой временное состояние дел.

6. transfer(E)

void transfer(E e) throws InterruptedException;
Добавьте элемент в эту TransferQueue и подождите, пока он не будет получен ожидающим потребителем с помощью метода BlockingQueue.take() или BlockingQueue.poll(timeout,unit).

7. tryTransfer(E)

boolean tryTransfer(E e);
Метод tryTransfer(e) добавляет элемент в эту TransferQueue только в том случае, если есть потребитель, ожидающий получения элемента с помощью метода BlockingQueue.take() или BlockingQueue.poll(timeout,unit), и убедитесь, что потребитель получит этот элемент немедленно. В противном случае метод возвращает значение false, и никакие другие действия не предпринимаются.

8. tryTransfer(E, long, TimeUnit)

boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
Метод tryTransfer(e,timeout,unit) добавляет элемент в эту TransferQueue только в том случае, если в течение указанного периода ожидания потребитель ожидает получения элемента с помощью BlockingQueue.take() или BlockingQueue.poll(timeout,unit) и убедитесь, что потребитель получает этот элемент. В противном случае метод возвращает значение false, и никакие другие действия не предпринимаются.