betacode

Руководство Java TransferQueue

Следуйте за нами на нашей фан-странице, чтобы получать уведомления каждый раз, когда появляются новые статьи. Facebook

1- TransferQueue

 Как подинтерфейс of BlockingQueue, TransferQueue обладает всеми функциями родительского интерфейса, кроме того, он предоставляет возможность Producer (производителю) подождать, пока Consumer (Потребитель) не получит "продукт" (элемент). TransferQueue полезна в некоторых типах приложений, таких как приложения для обмена сообщениями.

public interface TransferQueue<E> extends BlockingQueue<E>
По сравнению с BlockingQueue<E>, TransferQueue<E> предоставляет еще несколько методов, в том числе:

void transfer(E e) throws InterruptedException;
boolean tryTransfer(E e);
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
boolean hasWaitingConsumer();
int getWaitingConsumerCount();
transfer(e):
Добавьте элемент в эту TransferQueue и подождите, пока он не будет получен ожидающим потребителем с помощью метода BlockingQueue.take() или BlockingQueue.poll(timeout,unit).
tryTransfer(e):
Метод tryTransfer(e) добавляет элемент в эту TransferQueue только в том случае, если есть потребитель, ожидающий получения элемента с помощью метода BlockingQueue.take() или BlockingQueue.poll(timeout,unit), и убедитесь, что потребитель получит этот элемент немедленно. В противном случае метод возвращает значение false, и никакие другие действия не предпринимаются.
tryTransfer(e, timeout, unit):
Метод tryTransfer(e,timeout,unit) добавляет элемент в эту TransferQueue только в том случае, если в течение указанного периода ожидания потребитель ожидает получения элемента с помощью BlockingQueue.take() или BlockingQueue.poll(timeout,unit) и убедитесь, что потребитель получает этот элемент. В противном случае метод возвращает значение false, и никакие другие действия не предпринимаются.

2- TransferQueue methods

Методы определены в интерфейсе TransferQueue<E>:

void transfer(E e) throws InterruptedException;
boolean tryTransfer(E e);
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
boolean hasWaitingConsumer();
int getWaitingConsumerCount();
Методы, унаследованные от интерфейса BlockingQueue<E>:

void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
int drainTo(Collection<? super E> c);  
int drainTo(Collection<? super E> c, int maxElements);
Методы, унаследованные от интерфейса Queue<E>:

boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
Методы, унаследованные от интерфейса Collection<E>:

int size();
boolean isEmpty();
boolean contains(Object o);
Iterator<E> iterator();
Object[] toArray();
<T> T[] toArray(T[] a);
boolean add(E e);
boolean remove(Object o);
boolean containsAll(Collection<?> c);  
boolean addAll(Collection<? extends E> c);
boolean removeAll(Collection<?> c);
boolean retainAll(Collection<?> c);
void clear();  
boolean equals(Object o);
int hashCode();

default <T> T[] toArray(IntFunction<T[]> generator)  
default boolean removeIf(Predicate<? super E> filter)
default Spliterator<E> spliterator()  
default Stream<E> stream()  
default Stream<E> parallelStream()

3- Например

В приведенном ниже примере Producer отправляет сообщения Consumer через метод TransferQueue.transfer(e).
Глядя на вывод этого примера, вы увидите, что: Если все Consumer заняты получением сообщений (что означает, что ни один Consumer не находится в состоянии ожидания), то метод  TransferQueue.transfer(e) будет заблокирован (перейдет в состояние ожидания).
TransferQueue_transfer_ex1.java

package org.o7planning.transferqueue.aa;

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;

public class TransferQueue_transfer_ex1 {

    public static void main(String[] args) {
        TransferQueue<String> queue = new LinkedTransferQueue<String>();

        queue.add("NORMAL-MESSAGE 1");
        queue.add("NORMAL-MESSAGE 2");
        queue.add("NORMAL-MESSAGE 3");

        Producer producer1 = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);

        // Start..
        producer1.start();
        
        consumer1.start();
        consumer2.start();
    }
}

class Producer extends Thread {
    final TransferQueue<String> queue;
    private static int count = 1;

    Producer(TransferQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                String message = "IMPORTANT-MESSAGE " + count++;
                System.out.println("[PRODUCER] Transfering: " + message);
                this.queue.transfer(message);
                System.out.println("[PRODUCER] Transfered: " + message + " (**)");
            }
        } catch (InterruptedException ex) {
        }
    }
}
class Consumer extends Thread {
    final TransferQueue<String> queue;

    Consumer(TransferQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                String message = queue.take();
                this.longConsume(message);
            }
        } catch (InterruptedException ex) {
        }
    }
    // Need 2 seconds to consume the message.
    private void longConsume(String message) throws InterruptedException  {
        System.out.println(" [CONSUMER] Consuming: " + message);
        Thread.sleep(2 * 1000); // 2 seconds.
        System.out.println(" [CONSUMER] Consumed: " + message);
    }
}
Output:

[PRODUCER] Transfering: IMPORTANT-MESSAGE 1
 [CONSUMER] Consuming: NORMAL-MESSAGE 1
 [CONSUMER] Consuming: NORMAL-MESSAGE 2
 [CONSUMER] Consumed: NORMAL-MESSAGE 1
 [CONSUMER] Consumed: NORMAL-MESSAGE 2
 [CONSUMER] Consuming: NORMAL-MESSAGE 3
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 1
[PRODUCER] Transfered: IMPORTANT-MESSAGE 1 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 2
 [CONSUMER] Consumed: NORMAL-MESSAGE 3
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 2
 [CONSUMER] Consumed: IMPORTANT-MESSAGE 1
[PRODUCER] Transfered: IMPORTANT-MESSAGE 2 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 3
[PRODUCER] Transfered: IMPORTANT-MESSAGE 3 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 4
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 3
 [CONSUMER] Consumed: IMPORTANT-MESSAGE 2
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 4
[PRODUCER] Transfered: IMPORTANT-MESSAGE 4 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 5
 [CONSUMER] Consumed: IMPORTANT-MESSAGE 3
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 5
[PRODUCER] Transfered: IMPORTANT-MESSAGE 5 (**)
...
В следующем примере показано, как использовать метод TransferQueue.tryTransfer(e). В этом примере Producer создает сообщения и пытается передать их ожидающему Consumer.
Посмотрев на вывод этого примера, вы увидите, что Producer создал множество сообщений, которые будут проигнорированы, потому что во время вызова метода TransferQueue.tryTransfer(e) , нет ожидания Consumer.
TransferQueue_tryTransfer_ex1.java

package org.o7planning.transferqueue.bb;

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;

public class TransferQueue_tryTransfer_ex1 {

    public static void main(String[] args) {
        TransferQueue<String> queue = new LinkedTransferQueue<String>();
        queue.add("NORMAL-MESSAGE 1");
        queue.add("NORMAL-MESSAGE 2");
        queue.add("NORMAL-MESSAGE 3");

        Producer producer1 = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);

        // Start..
        producer1.start();
        consumer1.start();
        consumer2.start();
    }
}
class Producer extends Thread {
    final TransferQueue<String> queue;
    private static int count = 1;

    public Producer(TransferQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                String message = "IMPORTANT-MESSAGE " + count++;
                this.queue.tryTransfer(message); // Calling tryTransfer method.
                Thread.sleep(1 * 1000); // 1 seconds.
            }
        } catch (InterruptedException ex) {
        }
    }
}
class Consumer extends Thread {
    final TransferQueue<String> queue;

    Consumer(TransferQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                String message = queue.take();
                System.out.println(">> " + message);
                Thread.sleep(3 * 1000); // 3 seconds
            }
        } catch (InterruptedException ex) {
        }
    }
}
Output:

>> NORMAL-MESSAGE 1
>> NORMAL-MESSAGE 2
>> NORMAL-MESSAGE 3
>> IMPORTANT-MESSAGE 4
>> IMPORTANT-MESSAGE 7
>> IMPORTANT-MESSAGE 8
>> IMPORTANT-MESSAGE 10
>> IMPORTANT-MESSAGE 11
>> IMPORTANT-MESSAGE 13
>> IMPORTANT-MESSAGE 14
>> IMPORTANT-MESSAGE 16
>> IMPORTANT-MESSAGE 17
Сообщения, созданные Producer, были проигнорированы:
  • IMPORTANT-MESSAGE 1
  • IMPORTANT-MESSAGE 2
  • IMPORTANT-MESSAGE 3
  • IMPORTANT-MESSAGE 5
  • IMPORTANT-MESSAGE 6
  • ...

4- getWaitingConsumerCount()


int getWaitingConsumerCount();
Возвращает предполагаемое количество потребителей, ожидающих получения элемента из этой TransferQueue с помощью метода BlockingQueue.take() или BlockingQueue.poll(timeout,unit).
Возвращаемое значение является приближением к текущему состоянию дел, которое может быть неточным, если потребители завершили или отказались от ожидания. Значение может быть полезно для мониторинга и эвристики, но не для управления синхронизацией. Реализации этого метода, вероятно, будут заметно медленнее, чем для hasWaitingConsumer().

5- hasWaitingConsumer()


boolean hasWaitingConsumer();
Возвращает значение true, если хотя бы один потребитель ожидает получения элемента с помощью метода BlockingQueue.take() или BlockingQueue.poll(timeout,unit). Возвращаемое значение представляет собой временное состояние дел.

6- transfer(E)


void transfer(E e) throws InterruptedException;
Добавьте элемент в эту TransferQueue и подождите, пока он не будет получен ожидающим потребителем с помощью метода BlockingQueue.take() или BlockingQueue.poll(timeout,unit).

7- tryTransfer(E)


boolean tryTransfer(E e);
Метод tryTransfer(e) добавляет элемент в эту TransferQueue только в том случае, если есть потребитель, ожидающий получения элемента с помощью метода BlockingQueue.take() или BlockingQueue.poll(timeout,unit), и убедитесь, что потребитель получит этот элемент немедленно. В противном случае метод возвращает значение false, и никакие другие действия не предпринимаются.

8- tryTransfer(E, long, TimeUnit)


boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
Метод tryTransfer(e,timeout,unit) добавляет элемент в эту TransferQueue только в том случае, если в течение указанного периода ожидания потребитель ожидает получения элемента с помощью BlockingQueue.take() или BlockingQueue.poll(timeout,unit) и убедитесь, что потребитель получает этот элемент. В противном случае метод возвращает значение false, и никакие другие действия не предпринимаются.