betacode

Руководство Java Pipe.SourceChannel

  1. Pipe.SourceChannel
  2. Examples

1. Pipe.SourceChannel

Предположим, вы разрабатываете приложение Multithreading (многопоточное), и у вас есть 2 независимых Thread, Thread-A и Thread-B. Вопрос в том:
  • Что делать какждый раз, когда данные появляются в Thread-A, они автоматически переносятся в Thread-B?
Pipe.SinkChannel и Pipe.SourceChannel - это два класса, созданные для обработки ситуации, упомянутой выше. Каждый раз, когда данные записываются в Pipe.SinkChannel, они автоматически отображаются в Pipe.SourceChannel. Это называется эффектом трубы (pipe).
Класс Pipe.SinkChannel - это абстрактный класс, определенный внутри класса Pipe и реализующий interface WritableByteChannel и GatheringByteChannel. Он действует как канал записи данных.
public abstract static class SinkChannel extends AbstractSelectableChannel
                        implements WritableByteChannel, GatheringByteChannel
Класс Pipe.SourceChannel - это абстрактный класс, определенный внутри класса Pipe и реализующий interface ReadableByteChannel и ScatteringByteChannel. Он действует как канал чтения данных.
public abstract static class SourceChannel extends AbstractSelectableChannel
                        implements ReadableByteChannel, ScatteringByteChannel

2. Examples

В этом примере мы будем записывать сообщения (messages) в Pipe.SinkChannel (управляемый ThreadA). Они автоматически появятся на Pipe.SourceChannel (контролируется ThreadB).
Pipe_ex1.java
package org.o7planning.pipe.sourcechannel.ex;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;

public class Pipe_ex1 {

    public static void main(String[] args) throws IOException, InterruptedException {
        Pipe pipe = Pipe.open();

        ThreadA threadA = new ThreadA(pipe);
        ThreadB threadB = new ThreadB(pipe);

        threadA.start();
        threadB.start();
        threadA.join(); // Waits for this thread to die.
        threadB.join(); // Waits for this thread to die.
        System.out.println();
        System.out.println("Done!");
    }
}

//
class ThreadA extends Thread {
    private Pipe pipe;

    public ThreadA(Pipe pipe) {
        this.pipe = pipe;
    }

    @Override
    public void run() {
        try (Pipe.SinkChannel skChannel = this.pipe.sink();) { // try
            String[] messages = new String[] { "Hello\n", "Hi\n", "Bye\n" };

            ByteBuffer buffer = ByteBuffer.allocate(512);

            for (String msg : messages) {
                // Set position = 0; limit = capacity;
                buffer.clear();
                buffer.put(msg.getBytes("UTF-8"));
                buffer.flip();
                while (buffer.hasRemaining()) {
                    skChannel.write(buffer);
                }
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//
class ThreadB extends Thread {
    private Pipe pipe;

    public ThreadB(Pipe pipe) {
        this.pipe = pipe;
    }

    @Override
    public void run() {
        try (Pipe.SourceChannel srcChannel = this.pipe.source();) { // try
            ByteBuffer buffer = ByteBuffer.allocate(512);

            while (srcChannel.read(buffer) != -1) {
                buffer.flip(); // Set limit = current position; position = 0;
                ByteArrayOutputStream baos = new ByteArrayOutputStream();

                while (buffer.hasRemaining()) {
                    byte b = buffer.get();
                    if (b != '\n') {
                        baos.write(b);
                    } else {
                        String s = baos.toString("UTF-8");
                        System.out.println(s);
                    }
                }
                buffer.clear(); // Set position =0; limit = capacity;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
Output: