Руководство Java Pipe.SourceChannel
1. Pipe.SourceChannel
Предположим, вы разрабатываете приложение Multithreading (многопоточное), и у вас есть 2 независимых Thread, Thread-A и Thread-B. Вопрос в том:
- Что делать какждый раз, когда данные появляются в Thread-A, они автоматически переносятся в Thread-B?
Pipe.SinkChannel и Pipe.SourceChannel - это два класса, созданные для обработки ситуации, упомянутой выше. Каждый раз, когда данные записываются в Pipe.SinkChannel, они автоматически отображаются в Pipe.SourceChannel. Это называется эффектом трубы (pipe).
Класс Pipe.SinkChannel - это абстрактный класс, определенный внутри класса Pipe и реализующий interface WritableByteChannel и GatheringByteChannel. Он действует как канал записи данных.
public abstract static class SinkChannel extends AbstractSelectableChannel
implements WritableByteChannel, GatheringByteChannel
Класс Pipe.SourceChannel - это абстрактный класс, определенный внутри класса Pipe и реализующий interface ReadableByteChannel и ScatteringByteChannel. Он действует как канал чтения данных.
public abstract static class SourceChannel extends AbstractSelectableChannel
implements ReadableByteChannel, ScatteringByteChannel
- ReadableByteChannel
- InterruptibleChannel
- WritableByteChannel
- ScatteringByteChannel
- SelectableChannel
- GatheringByteChannel
- Pipe.SinkChannel
2. Examples
В этом примере мы будем записывать сообщения (messages) в Pipe.SinkChannel (управляемый ThreadA). Они автоматически появятся на Pipe.SourceChannel (контролируется ThreadB).
Pipe_ex1.java
package org.o7planning.pipe.sourcechannel.ex;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;
public class Pipe_ex1 {
public static void main(String[] args) throws IOException, InterruptedException {
Pipe pipe = Pipe.open();
ThreadA threadA = new ThreadA(pipe);
ThreadB threadB = new ThreadB(pipe);
threadA.start();
threadB.start();
threadA.join(); // Waits for this thread to die.
threadB.join(); // Waits for this thread to die.
System.out.println();
System.out.println("Done!");
}
}
//
class ThreadA extends Thread {
private Pipe pipe;
public ThreadA(Pipe pipe) {
this.pipe = pipe;
}
@Override
public void run() {
try (Pipe.SinkChannel skChannel = this.pipe.sink();) { // try
String[] messages = new String[] { "Hello\n", "Hi\n", "Bye\n" };
ByteBuffer buffer = ByteBuffer.allocate(512);
for (String msg : messages) {
// Set position = 0; limit = capacity;
buffer.clear();
buffer.put(msg.getBytes("UTF-8"));
buffer.flip();
while (buffer.hasRemaining()) {
skChannel.write(buffer);
}
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
//
class ThreadB extends Thread {
private Pipe pipe;
public ThreadB(Pipe pipe) {
this.pipe = pipe;
}
@Override
public void run() {
try (Pipe.SourceChannel srcChannel = this.pipe.source();) { // try
ByteBuffer buffer = ByteBuffer.allocate(512);
while (srcChannel.read(buffer) != -1) {
buffer.flip(); // Set limit = current position; position = 0;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
while (buffer.hasRemaining()) {
byte b = buffer.get();
if (b != '\n') {
baos.write(b);
} else {
String s = baos.toString("UTF-8");
System.out.println(s);
}
}
buffer.clear(); // Set position =0; limit = capacity;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Output:
Руководства Java New IO
- Руководство Java ReadableByteChannel
- Руководство Java WritableByteChannel
- Руководство Java Pipe.SinkChannel
- Руководство Java Pipe.SourceChannel
- Руководство Java ScatteringByteChannel
- Руководство Java GatheringByteChannel
- Руководство Java Buffer
- Руководство Java DatagramChannel
- Руководство Java Channel
Show More