Piped IO Stream


Piped IO Stream

Piped IO Stream:
Piped Streams are a clean way of opening up a communication channel between Threads. A Piped Input Stream in one Thread is bound to a Piped Output Stream of another Thread. And from there on any data written into the Piped Output Stream is channeled to Piped Input Stream of the other Thread. Please note one might experience a Deadlock or Race condition if both the Piped Input and Output Streams are part of the same Thread. In case of no available data from Piped Output Stream, the Thread holding the Piped Input Stream is suspended at ‘READ’ until further data is available from Piped Output Stream.



Below is an example of a Thread containing PipedInputStream.



public class PipedInputStreamThread extends Thread
{
  // A private variable to hold a reference of the Piped Input Stream
     private PipedInputStream pipedInputStream;
 
  // Constructor which initializes the Piped Input Stream
     public PipedInputStreamThread()
     {
         pipedInputStream = new PipedInputStream();
     }
 
     public PipedInputStream getInputStream()
     {
         return pipedInputStream;
     }
 
  // A public method responsible for BINDING the Piped Input Stream to a Piped Output Stream 
     public void Connect(PipedOutputStream pipedOutputStream) throws IOException
     {
        // Binding IO Streams
        pipedInputStream.connect(pipedOutputStream);
     }
 
     @Override
     public void run() {
     try
     {
         // Array to hold the Input Data
         byte[] eachInput = new byte[1000];

         // Piped Input Stream READ's the content published by the Piped Output Stream.
         // The Thread is suspended if there is no published Data from Piped Output Stream and EOF is not encountered.
         // A call to READ return's -1 when EOF is reached.
         while( pipedInputStream.read(eachInput) != -1 )
         {
             System.out.println(" ======== PipedInputStreamThread :: Input Received ======== > "+new String(eachInput));
         }
         pipedInputStream.close();
      }catch(IOException oiexcp)
      {
          oiexcp.printStackTrace();
          System.out.println("PipedInputStream Failed to Read");
      }
    }
}

Below is an example of a Thread holding PipedOutputStream.


public class PipedOutputStreamThread extends Thread
{
 // A private variable to hold a reference of the Piped Output Stream 
 private PipedOutputStream pipedOutputStream;

 // Constructor which initializes the Piped Output Stream 
 public PipedOutputStreamThread()
 {
  pipedOutputStream = new PipedOutputStream();
 }
 
 public PipedOutputStream getOutputStream()
 {
  return pipedOutputStream;
 }

 // A public method responsible for BINDING the Piped Output Stream to a Piped Input Stream  
 public void Connect(PipedInputStream pipedInputStream) throws IOException
 {
  // Binding IO Streams
  pipedOutputStream.connect(pipedInputStream);
 }
 
 @Override
 public void run() {
  Random random = new Random();
  long sleepTime;
  try{
   for(int count = 0; count < 10; count++)
   {
    sleepTime = (new Integer(random.nextInt(6)*1000)).longValue();
    System.out.println(" ++++++++ PipedOutputStreamThread :: sleeping - "+sleepTime+" ++++++++ ");
    
    // Piped Output Stream Thread is sleepng for 'X' sec before sending the next content
    // Deliberately put in place to test the Suspension of Piped Input Stream Thread during this period.
    Thread.sleep(sleepTime);
    System.out.println(" ++++++++ PipedOutputStreamThread :: Woke Up :: Sending Info ++++++++ ");
    
    // Writing content to the Piped Output Stream
    pipedOutputStream.write(UUID.randomUUID().toString().getBytes());
   }
   pipedOutputStream.close();
  }catch(IOException | InterruptedException ioexcp)
  {
   System.out.println("PipedOutputStream Failed to Write");
  }
  
 }
}

Below is a 'main' Method executing both the Piped Output Stream and the Piped Input Stream.


public static void main(String[] args) throws IOException 
{
 // Creating the Piped Input Stream Thread
 PipedInputStreamThread inputStreamThread = new PipedInputStreamThread();

 // Creating the Piped Output Stream Thread
 PipedOutputStreamThread outputStreamThread = new PipedOutputStreamThread();
 
 // Binding the Piped Output Stream to the Input Stream 
 outputStreamThread.Connect(inputStreamThread.getInputStream());
  
 // Executing both the Threads
 inputStreamThread.start();
 outputStreamThread.start();
}

Sample Output:


++++++++ PipedOutputStreamThread :: sleeping - 4000 ++++++++ 
======== PipedInputStreamThread :: Input Received ======== > 558da383-bdf9-4f1a-a22d-26a50e2089dfcd4fae57-2e68-4836-b7e7-031a1c76b864
++++++++ PipedOutputStreamThread :: Woke Up :: Sending Info ++++++++ 
++++++++ PipedOutputStreamThread :: sleeping - 2000 ++++++++ 
======== PipedInputStreamThread :: Input Received ======== > 22b528d0-5fdb-49e6-a82c-31330e6cf70ccd4fae57-2e68-4836-b7e7-031a1c76b864
.
.
.


<< Object IO Stream Sequence IO Stream >>

No comments:

Post a Comment