Piped IO Stream
Piped IO Stream:
Piped Streams are a clean way of opening up a
communication channel between Threads. A Piped Input Stream in one Thread is
bound to a Piped Output Stream of another Thread. And from there on any data
written into the Piped Output Stream is channeled to Piped Input Stream of the
other Thread. Please note one might experience a Deadlock or Race condition if
both the Piped Input and Output Streams are part of the same Thread. In case of
no available data from Piped Output Stream, the Thread holding the Piped Input
Stream is suspended at ‘READ’ until further data is available from Piped Output
Stream.
Below is an example of a Thread containing PipedInputStream.
public class PipedInputStreamThread extends Thread { // A private variable to hold a reference of the Piped Input Stream private PipedInputStream pipedInputStream; // Constructor which initializes the Piped Input Stream public PipedInputStreamThread() { pipedInputStream = new PipedInputStream(); } public PipedInputStream getInputStream() { return pipedInputStream; } // A public method responsible for BINDING the Piped Input Stream to a Piped Output Stream public void Connect(PipedOutputStream pipedOutputStream) throws IOException { // Binding IO Streams pipedInputStream.connect(pipedOutputStream); } @Override public void run() { try { // Array to hold the Input Data byte[] eachInput = new byte[1000]; // Piped Input Stream READ's the content published by the Piped Output Stream. // The Thread is suspended if there is no published Data from Piped Output Stream and EOF is not encountered. // A call to READ return's -1 when EOF is reached. while( pipedInputStream.read(eachInput) != -1 ) { System.out.println(" ======== PipedInputStreamThread :: Input Received ======== > "+new String(eachInput)); } pipedInputStream.close(); }catch(IOException oiexcp) { oiexcp.printStackTrace(); System.out.println("PipedInputStream Failed to Read"); } } }
Below is an example of a Thread holding PipedOutputStream.
public class PipedOutputStreamThread extends Thread { // A private variable to hold a reference of the Piped Output Stream private PipedOutputStream pipedOutputStream; // Constructor which initializes the Piped Output Stream public PipedOutputStreamThread() { pipedOutputStream = new PipedOutputStream(); } public PipedOutputStream getOutputStream() { return pipedOutputStream; } // A public method responsible for BINDING the Piped Output Stream to a Piped Input Stream public void Connect(PipedInputStream pipedInputStream) throws IOException { // Binding IO Streams pipedOutputStream.connect(pipedInputStream); } @Override public void run() { Random random = new Random(); long sleepTime; try{ for(int count = 0; count < 10; count++) { sleepTime = (new Integer(random.nextInt(6)*1000)).longValue(); System.out.println(" ++++++++ PipedOutputStreamThread :: sleeping - "+sleepTime+" ++++++++ "); // Piped Output Stream Thread is sleepng for 'X' sec before sending the next content // Deliberately put in place to test the Suspension of Piped Input Stream Thread during this period. Thread.sleep(sleepTime); System.out.println(" ++++++++ PipedOutputStreamThread :: Woke Up :: Sending Info ++++++++ "); // Writing content to the Piped Output Stream pipedOutputStream.write(UUID.randomUUID().toString().getBytes()); } pipedOutputStream.close(); }catch(IOException | InterruptedException ioexcp) { System.out.println("PipedOutputStream Failed to Write"); } } }
Below is a 'main' Method executing both the Piped Output Stream and the Piped Input Stream.
public static void main(String[] args) throws IOException { // Creating the Piped Input Stream Thread PipedInputStreamThread inputStreamThread = new PipedInputStreamThread(); // Creating the Piped Output Stream Thread PipedOutputStreamThread outputStreamThread = new PipedOutputStreamThread(); // Binding the Piped Output Stream to the Input Stream outputStreamThread.Connect(inputStreamThread.getInputStream()); // Executing both the Threads inputStreamThread.start(); outputStreamThread.start(); }
Sample Output:
++++++++ PipedOutputStreamThread :: sleeping - 4000 ++++++++ ======== PipedInputStreamThread :: Input Received ======== > 558da383-bdf9-4f1a-a22d-26a50e2089dfcd4fae57-2e68-4836-b7e7-031a1c76b864 ++++++++ PipedOutputStreamThread :: Woke Up :: Sending Info ++++++++ ++++++++ PipedOutputStreamThread :: sleeping - 2000 ++++++++ ======== PipedInputStreamThread :: Input Received ======== > 22b528d0-5fdb-49e6-a82c-31330e6cf70ccd4fae57-2e68-4836-b7e7-031a1c76b864 . . .
<< Object IO Stream | Sequence IO Stream >> |
No comments:
Post a Comment