Class ReadAheadInputStream

java.lang.Object
java.io.InputStream
org.apache.commons.io.input.ReadAheadInputStream
All Implemented Interfaces:
Closeable, AutoCloseable

public class ReadAheadInputStream extends InputStream
Implements InputStream to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O.

This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19.

Since:
2.9.0
  • Field Details

    • oneByte

      private static final ThreadLocal<byte[]> oneByte
    • stateChangeLock

      private final ReentrantLock stateChangeLock
    • activeBuffer

      private ByteBuffer activeBuffer
    • readAheadBuffer

      private ByteBuffer readAheadBuffer
    • endOfStream

      private boolean endOfStream
    • readInProgress

      private boolean readInProgress
    • readAborted

      private boolean readAborted
    • readException

      private Throwable readException
    • isClosed

      private boolean isClosed
    • isUnderlyingInputStreamBeingClosed

      private boolean isUnderlyingInputStreamBeingClosed
    • isReading

      private boolean isReading
    • isWaiting

      private final AtomicBoolean isWaiting
    • underlyingInputStream

      private final InputStream underlyingInputStream
    • executorService

      private final ExecutorService executorService
    • shutdownExecutorService

      private final boolean shutdownExecutorService
    • asyncReadComplete

      private final Condition asyncReadComplete
  • Constructor Details

    • ReadAheadInputStream

      public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes)
      Creates an instance with the specified buffer size and read-ahead threshold
      Parameters:
      inputStream - The underlying input stream.
      bufferSizeInBytes - The buffer size.
    • ReadAheadInputStream

      public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, ExecutorService executorService)
      Creates an instance with the specified buffer size and read-ahead threshold
      Parameters:
      inputStream - The underlying input stream.
      bufferSizeInBytes - The buffer size.
      executorService - An executor service for the read-ahead thread.
    • ReadAheadInputStream

      private ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, ExecutorService executorService, boolean shutdownExecutorService)
      Creates an instance with the specified buffer size and read-ahead threshold
      Parameters:
      inputStream - The underlying input stream.
      bufferSizeInBytes - The buffer size.
      executorService - An executor service for the read-ahead thread.
      shutdownExecutorService - Whether or not to shutdown the given ExecutorService on close.
  • Method Details

    • newExecutorService

      private static ExecutorService newExecutorService()
      Creates a new daemon executor service.
      Returns:
      a new daemon executor service.
    • newThread

      private static Thread newThread(Runnable r)
      Creates a new daemon thread.
      Parameters:
      r - the thread's runnable.
      Returns:
      a new daemon thread.
    • available

      public int available() throws IOException
      Overrides:
      available in class InputStream
      Throws:
      IOException
    • checkReadException

      private void checkReadException() throws IOException
      Throws:
      IOException
    • close

      public void close() throws IOException
      Specified by:
      close in interface AutoCloseable
      Specified by:
      close in interface Closeable
      Overrides:
      close in class InputStream
      Throws:
      IOException
    • closeUnderlyingInputStreamIfNecessary

      private void closeUnderlyingInputStreamIfNecessary()
    • isEndOfStream

      private boolean isEndOfStream()
    • read

      public int read() throws IOException
      Specified by:
      read in class InputStream
      Throws:
      IOException
    • read

      public int read(byte[] b, int offset, int len) throws IOException
      Overrides:
      read in class InputStream
      Throws:
      IOException
    • readAsync

      private void readAsync() throws IOException
      Read data from underlyingInputStream to readAheadBuffer asynchronously.
      Throws:
      IOException
    • signalAsyncReadComplete

      private void signalAsyncReadComplete()
    • skip

      public long skip(long n) throws IOException
      Overrides:
      skip in class InputStream
      Throws:
      IOException
    • skipInternal

      private long skipInternal(long n) throws IOException
      Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before calling this function.
      Parameters:
      n - the number of bytes to be skipped.
      Returns:
      the actual number of bytes skipped.
      Throws:
      IOException
    • swapBuffers

      private void swapBuffers()
      Flips the active and read ahead buffer
    • waitForAsyncReadComplete

      private void waitForAsyncReadComplete() throws IOException
      Throws:
      IOException