While Jetty has internally used asynchronous IO since 7.0, Servlet 3.1 has added asynchronous IO to the application API and Jetty-9.1 now supports asynchronous IO in an unbroken chain from application to socket. Asynchronous APIs can often look intuitively simple, but there are many important subtleties to asynchronous programming and this blog looks at one important pattern used within Jetty.  Specifically we look at how an iterating callback pattern is used to avoid deeps stacks and unnecessary thread dispatches.

Asynchronous Callback

Many programmers wrongly believe that asynchronous programming is about Futures. However Futures are a mostly broken abstraction and could best be described as a deferred blocking API rather than an Asynchronous API.    True asynchronous programming is about callbacks, where the asynchronous operation calls back the caller when the operation is complete.  A classic example of this is the NIO AsynchronousByteChannel write method:

<A> void write(ByteBuffer src,
               A attachment,
               CompletionHandler<Integer,? super A> handler);
public interface CompletionHandler<V,A>
{
  void completed(V result, A attachment);
  void failed(Throwable exc, A attachment);
}

With an NIO asynchronous write, a CompletionHandler instance is pass that is called back once the write operation has completed or failed.   If the write channel is congested, then no calling thread is held or blocked whilst the operation waits for the congestion to clear and the callback will be invoked by a thread typically taken from a thread pool.

The Servlet 3.1 Asynchronous IO API is syntactically very different, but semantically similar to NIO. Rather than have a callback when a write operation has completed the API has a WriteListener API that is called when a write operation can proceed without blocking:

public interface WriteListener extends EventListener
{
    public void onWritePossible() throws IOException;
    public void onError(final Throwable t);
}

Whilst this looks different to the NIO write CompletionHandler, effectively a write is possible only when the previous write operation has completed, so the callbacks occur on essentially the same semantic event.

Callback Threading Issues

So that asynchronous callback concept looks pretty simple!  How hard could it be to implement and use!   Let’s consider an example of asynchronously writing the data obtained from an InputStream.  The following WriteListener can achieve this:

public class AsyncWriter implements WriteListener
{
  private InputStream in;
  private ServletOutputStream out;
  private AsyncContext context;
  public AsyncWriter(AsyncContext context,
                     InputStream in,
                     ServletOutputStream out)
  {
    this.context=context;
    this.in=in;
    this.out=out;
  }
  public void onWritePossible() throws IOException
  {
    byte[] buf = new byte[4096];
    while(out.isReady())
    {
      int l=in.read(buf,0,buf.length);
      if (l<0)
      {
        context.complete();
        return;
      }
      out.write(buf,0,l);
    }
  }
  ...
}

Whenever a write is possible, this listener will read some data from the input and write it asynchronous to the output. Once all the input is written, the asynchronous Servlet context is signalled that the writing is complete.

However there are several key threading issues with a WriteListener like this from both the caller and callee’s point of view.  Firstly this is not entirely non blocking, as the read from the input stream can block.  However if the input stream is from the local file system and the output stream is to a remote socket, then the probability and duration of the input blocking is much less than than of the output, so this is substantially non-blocking asynchronous code and thus is reasonable to include in an application.  What this means for asynchronous operations providers (like Jetty), is that you cannot trust any code you callback to not block and thus you cannot use an important thread (eg one iterating over selected keys from a Selector) to do the callback, else an application may inadvertently block other tasks from proceeding.  Thus Asynchronous IO Implementations thus must often dispatch a thread to perform a callback to application code.

Because dispatching threads is expensive in both CPU and latency, Asynchronous IO implementations look for opportunities to optimise away thread dispatches to callbacks.  There Servlet 3.1 API has by design such an optimisation with the out.isReady() call that allows iteration of multiple operations within the one callback. A dispatch to onWritePossible only happens when it is required to avoid a blocking write and often many write iterations can proceed within a single callback. An NIO CompletionHandler based implementation of the same task is only able to perform one write operation per callback and must wait for the invocation of the complete handler for that operation before proceeding:

public class AsyncWriter implements CompletionHandler<Integer,Void>
{
  private InputStream in;
  private AsynchronousByteChannel out;
  private CompletionHandler<Void,Void> complete;
  private byte[] buf = new byte[4096];
  public AsyncWriter(InputStream in,
                     AsynchronousByteChannel out,
                     CompletionHandler<Void,Void> complete)
  {
    this.in=in;
    this.out=out;
    this.complete=complete;
    completed(0,null);
  }
  public void completed(Integer w,Void a) throws IOException
  {
    int l=in.read(buf,0,buf.length);
    if (l<0)
      complete.completed(null,null);
    else
      out.write(ByteBuffer.wrap(buf,0,l),this);
  }
  ...
}

Apart from an unrelated significant bug (left as an exercise for the reader to find), this version of the AsyncWriter has a significant threading challenge.  If the write can trivially completes without blocking, should the callback to CompletionHandler be dispatched to a new thread or should it just be called from the scope of the write using the caller thread?  If a new thread is always used, then many many dispatch delays will be incurred and throughput will be very low.  But if the callback is invoked from the scope of the write call, then if the callback does a re-entrant call to write, it may call a callback again which calls write again etc. etc. and a very deep stack will result and often a stack overflow can occur.

The JVM’s implementation of NIO resolves this dilemma by doing both!  It performs the callback in the scope of the write call until it detects a deep stack, at which time it dispatches the callback to a new thread.    While this does work, I consider it a little bit of the worst of both worlds solution: you get deep stacks and you get dispatch latency.  Yet it is an accepted pattern and Jetty-8 uses this approach for callbacks via our ForkInvoker class.

Jetty-9 IO Callbacks

For Jetty-9, we wanted the best of all worlds.  We wanted to avoid deep re entrant stacks and to avoid dispatch delays.  In a similar way to Servlet 3.1 WriteListeners, we wanted to substitute iteration for reentrancy when ever possible.    Thus Jetty does not use NIO asynchronous IO channel APIs, but rather implements our own asynchronous IO pattern using the NIO Selector to implement our own EndPoint abstraction and a simple Callback interface:

public interface EndPoint extends Closeable
{
  ...
  void write(Callback callback, ByteBuffer... buffers)
    throws WritePendingException;
  ...
}
public interface Callback
{
  public void succeeded();
  public void failed(Throwable x);
}

One key feature of this API is that it supports gather writes, so that there is less need for either iteration or re-entrancy when writing multiple buffers (eg headers, chunk and/or content).  But other than that it is semantically the same as the NIO CompletionHandler and if used incorrectly could also suffer from deep stacks and/or dispatch latency.

Jetty Iterating Callback

Jetty’s technique to avoid deep stacks and/or dispatch latency is to use the IteratingCallback class as the basis of callbacks for tasks that may take multiple IO operations:

public abstract class IteratingCallback implements Callback
{
  protected enum State
    { IDLE, SCHEDULED, ITERATING, SUCCEEDED, FAILED };
  private final AtomicReference<State> _state =
    new AtomicReference<>(State.IDLE);
  abstract protected void completed();  
  abstract protected State process() throws Exception;
  public void iterate()
  {
    while(_state.compareAndSet(State.IDLE,State.ITERATING))
    {
      State next = process();
      switch (next)
      {
        case SUCCEEDED:
          if (!_state.compareAndSet(State.ITERATING,State.SUCCEEDED))
            throw new IllegalStateException("state="+_state.get());
          completed();
          return;
        case SCHEDULED:
          if (_state.compareAndSet(State.ITERATING,State.SCHEDULED))
            return;
          continue;
        ...
      }
    }
    public void succeeded()
    {
      loop: while(true)
      {
        switch(_state.get())
        {
          case ITERATING:
            if (_state.compareAndSet(State.ITERATING,State.IDLE))
              break loop;
            continue;
          case SCHEDULED:
            if (_state.compareAndSet(State.SCHEDULED,State.IDLE))
              iterate();
            break loop;
          ...
        }
      }
    }

IteratingCallback is itself an example of another pattern used extensively in Jetty-9:  it is a lock-free atomic state machine implemented with an AtomicReference to an Enum.  This pattern allows very fast and efficient lock free thread safe code to be written, which is exactly what asynchronous IO needs.

The IteratingCallback class iterates on calling the abstract process() method until such time as it returns the SUCCEEDED state to indicate that all operations are complete.  If the process() method is not complete, it may return SCHEDULED to indicate that it has invoked an asynchronous operation (such as EndPoint.write(...)) and passed the IteratingCallback as the callback.

Once scheduled, there are two possible outcomes for a successful operation. In the case that the operations completed trivially it will have called back succeeded() within the scope of the write, thus the state will have been switched from ITERATING to IDLE so that the while loop in iterate will fail to set the SCHEDULED state and continue to switch from IDLE to ITERATING, thus calling process() again iteratively.

In the case that the schedule operation does not complete within the scope of process, then the iterate while loop will succeed in setting the SCHEDULED state and break the loop. When the IO infrastructure subsequently dispatches a thread to callback succeeded(), it will switch from SCHEDULED to IDLE state and itself call the iterate() method to continue to iterate on calling process().

Iterating Callback Example

A simplified example of using an IteratingCallback to implement the AsyncWriter example from above is given below:

private class AsyncWriter extends IteratingCallback
{
  private final Callback _callback;
  private final InputStream _in;
  private final EndPoint _endp;
  private final ByteBuffer _buffer;
  public AsyncWriter(InputStream in,EndPoint endp,Callback callback)
  {
    _callback=callback;
    _in=in;
    _endp=endp;
    _buffer = BufferUtil.allocate(4096);
  }
  protected State process() throws Exception
  {     
    int l=_in.read(_buffer.array(),
                   _buffer.arrayOffset(),
                   _buffer.capacity());
    if (l<0)
    {
       _callback.succeeded();
       return State.SUCCEEDED;
    }
    _buffer.position(0);
    _buffer.limit(len);
    _endp.write(this,_buffer);
    return State.SCHEDULED;
  }

Several production quality examples of IteratingCallbacks can be seen in the Jetty HttpOutput class, including a real example of asynchronously writing data from an input stream.

Conclusion

Jetty-9 has had a lot of effort put into using efficient lock free patterns to implement a high performance scalable IO layer that can be seamlessly extended all the way into the servlet application via the Servlet 3.1 asynchronous IO.   Iterating callback and lock free state machines are just some of the advanced techniques Jetty is using to achieve excellent scalability results.


2 Comments

Oleksandr Berezianskyi · 31/05/2014 at 09:37

Hey,
You are using spin-lock here
public void succeeded()
{
loop: while(true)
{
...

Do you assume that the code should not iterate too much inside the while(true) or is there any other reason to use spin-lock here?

    gregw · 02/06/2014 at 11:01

    That style of looping is a standard pattern when using atomic references as a state machine. A thread will only loop if another thread has raced it and changed the atomic between the switch and the compare and set. In that case, it loops again, checks the state and makes another attempt accordingly.
    cheers

Leave a Reply

Avatar placeholder

Your email address will not be published. Required fields are marked *