Asynchronous Java NIO for dummies

Print Friendly

This article explains the fundamentals of Java NIO asynchronous socket programming with a very simple server/client example. Java NIO uses multiplexing to server multiple clients from the same thread. Before NIO, a server had to open a thread for each client. At first NIO is very intimidating and a bit confusing. I hope to make this as simple as possible for your understanding.

You can download sourcecode: http://ece301-examples.googlecode.com/files/SimpleServer.zip

The main components of NIO are:
– Selectors and SelectionKeys
– Channels
– Buffers

NIO internally uses multiplexing to serve many clients and do read and write operations asynchronously. Clients can also use NIO to read/write asynchronously. So how does it work?

Channels

While trying to understand java NIO programming, I was reading a book that had a good analogy which I will explain here. Do you guys remember those bank deposit tubes? (http://en.wikipedia.org/wiki/Pneumatic_tube). The way the deposit tube works is, you have a person drive up to the tube, insert a paycheck into the container and the vacuum sucks up the container and sends it to the teller inside the bank. The teller takes the check from the container, deposits the check into your account and puts the receipt into the container. The vacuum sucks up the container and sends it back to you, you take your receipt and drive away. The next person is waiting in line to do it all over again. This was Java I/O, but it serves the example. So the bank deposit tube is the Channel (That’s all you need to know for now). Lets recap in programming terms:

Bank Teller – Server
Deposit Tube – Channel
Container – Buffer
You – client

In synchronous communication, there is a line of people waiting for their turn in order to deposit their checks. In I/O, to speed up the process, you can have 2 or 3 tubes (threads) to serve more clients at the same time.

Buffers

I won’t go into too much detail about buffers here. Buffers are like smart arrays and keep track of certain position parameters of the buffer. A Buffer (and we will be using ByteBuffers) is the only data structure used in NIO for data transfer. For the simple understanding, you just need to know this; I don’t want to confuse you.

Selectors

Back to our bank teller example. In I/O communication, you need to have many tubes in order to server many clients. At the same time, you need to have many tellers, each serving a different client. This are the threads, each one to server a client. Something like this:

Person 1 –> deposit tube –> Teller 1 (THREAD 1)
Person 2 –> deposit tube –> Teller 2 (THREAD 2)
Person 3 –> deposit tube –> Teller 3 (THREAD 3)

But, what if there was a mechanism where there are many deposit tubes and they all go to the same teller? Now the bank doesn’t have to hire as many people (resources). Well in Java, this is NIO using Selectors. Lets recap:

Person 1 (CLIENT) –> deposit tube (CHANNEL) |SE |
Person 2 (CLIENT) –> deposit tube (CHANNEL) |LEC|–> Teller (THREAD 1)
Person 3 (CLIENT) –> deposit tube (CHANNEL) |TOR|

But how? Using a Selector. A Selector selects a REGISTERED channel that is ready to be read/write/accept/connect (OPERATIONS). Only one of those can happen at a time, but it happens fast, so you don’t even notice the wait (Thus why it is called asynchronous). It is a multiplexing system and you don’t have to care how it works internally, all you need to care about is that it selects the next available operation. What operation?

OPERATIONS:
read
write
accept
connect

These operations are your SelectionKeys (SelectionKey.class). They tell your selector which operation you are ready to execute. Oh and since this is asynchronous, you might have more than one operation pending from a different channel. Something like this:

Person 1 (CLIENT) is writing –> deposit tube (CHANNEL) |SE |
Person 2 (CLIENT) is writing –> deposit tube (CHANNEL) |LEC|–> Teller (THREAD 1)
Person 3 (CLIENT) is reading –> deposit tube (CHANNEL) |TOR|

The Teller in this case is accepting (must accept in case another client comes in),reading from Person1, reading from Person2, and writing to Person3.

Before we get into coding a Server, let me explain how it works.

A Server is created by creating a channel (ServerSocketChannel) and a Selector (that’s the class name). Since the channel is a server, the first thing you want to do is accept clients. For this you must register to the selector as accept:

ServerSocketChannel channel = ServerSocketChannel.open();
Selector selector = Selector.open();
//Remember, you must be in non-blocking mode.
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(ADDRESS, PORT));
channel.register(selector,SelectionKey.OP_ACCEPT)

Here you are registering the serverSocketChannel to accept connection, thus the OP_ACCEPT. This means that you just told your selector that this channel will be used to accept connections. We can change this operation later to read/write, more on this later.

Usually you create this as a thread/runnable. On the run() method, you will have a while loop which has the Selector listening for new connections via the Selector.select(). When you have pending connections, they come to you as a SelectionKey. Each key corresponds to a read,write,accept,connect operation. Depending on the operation, then the server does that specific function.

NOTE: Server only accepts, reads and writes, while client only connects, reads and writes.

If you are following me so far, then you understand NIO. Pat yourself on the back and get ready to write some code.

Server example

So lets write the Server, with many comments. The comments are explained as if things were synchronously for easy of understanding, but like I said, things happen asynchronously.

package com.simple.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

public class Server implements Runnable {

	public final static String ADDRESS = "127.0.0.1";
	public final static int PORT = 8511;
	public final static long TIMEOUT = 10000;
	
	private ServerSocketChannel serverChannel;
	private Selector selector;
	/**
	 * This hashmap is important. It keeps track of the data that will be written to the clients.
	 * This is needed because we read/write asynchronously and we might be reading while the server
	 * wants to write. In other words, we tell the Selector we are ready to write (SelectionKey.OP_WRITE)
	 * and when we get a key for writting, we then write from the Hashmap. The write() method explains this further.
	 */
	private Map<SocketChannel,byte[]> dataTracking = new HashMap<SocketChannel, byte[]>();

	public Server(){
		init();
	}

	private void init(){
		System.out.println("initializing server");
		// We do not want to call init() twice and recreate the selector or the serverChannel.
		if (selector != null) return;
		if (serverChannel != null) return;

		try {
			// This is how you open a Selector
			selector = Selector.open();
			// This is how you open a ServerSocketChannel
			serverChannel = ServerSocketChannel.open();
			// You MUST configure as non-blocking or else you cannot register the serverChannel to the Selector.
			serverChannel.configureBlocking(false);
			// bind to the address that you will use to Serve.
			serverChannel.socket().bind(new InetSocketAddress(ADDRESS, PORT));

			/**
			 * Here you are registering the serverSocketChannel to accept connection, thus the OP_ACCEPT.
			 * This means that you just told your selector that this channel will be used to accept connections.
			 * We can change this operation later to read/write, more on this later.
			 */
			serverChannel.register(selector, SelectionKey.OP_ACCEPT);

		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void run() {
		System.out.println("Now accepting connections...");
		try{
			// A run the server as long as the thread is not interrupted.
			while (!Thread.currentThread().isInterrupted()){
				/**
				 * selector.select(TIMEOUT) is waiting for an OPERATION to be ready and is a blocking call.
				 * For example, if a client connects right this second, then it will break from the select()
				 * call and run the code below it. The TIMEOUT is not needed, but its just so it doesn't 
				 * block undefinitely.
				 */
				selector.select(TIMEOUT);

				/**
				 * If we are here, it is because an operation happened (or the TIMEOUT expired).
				 * We need to get the SelectionKeys from the selector to see what operations are available.
				 * We use an iterator for this. 
				 */
				Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

				while (keys.hasNext()){
					SelectionKey key = keys.next();
					// remove the key so that we don't process this OPERATION again.
					keys.remove();

					// key could be invalid if for example, the client closed the connection.
					if (!key.isValid()){
						continue;
					}
					/**
					 * In the server, we start by listening to the OP_ACCEPT when we register with the Selector.
					 * If the key from the keyset is Acceptable, then we must get ready to accept the client
					 * connection and do something with it. Go read the comments in the accept method.
					 */
					if (key.isAcceptable()){
						System.out.println("Accepting connection");
						accept(key);
					}
					/**
					 * If you already read the comments in the accept() method, then you know we changed
					 * the OPERATION to OP_WRITE. This means that one of these keys in the iterator will return
					 * a channel that is writable (key.isWritable()). The write() method will explain further.
					 */
					if (key.isWritable()){
						System.out.println("Writing...");
						write(key);
					}
					/**
					 * If you already read the comments in the write method then you understand that we registered
					 * the OPERATION OP_READ. That means that on the next Selector.select(), there is probably a key
					 * that is ready to read (key.isReadable()). The read() method will explain further. 
					 */
					if (key.isReadable()){
						System.out.println("Reading connection");
						read(key);
					}
				}
			}
		} catch (IOException e){
			e.printStackTrace();
		} finally{
			closeConnection();
		}

	}

	/**
	 * We registered this channel in the Selector. This means that the SocketChannel we are receiving
	 * back from the key.channel() is the same channel that was used to register the selector in the accept() 
	 * method. Again, I am just explaning as if things are synchronous to make things easy to understand.
	 * This means that later, we might register to write from the read() method (for example).
	 */
	private void write(SelectionKey key) throws IOException{
		SocketChannel channel = (SocketChannel) key.channel();
		/**
		 * The hashmap contains the object SockenChannel along with the information in it to be written.
		 * In this example, we send the "Hello from server" String and also an echo back to the client.
		 * This is what the hashmap is for, to keep track of the messages to be written and their socketChannels.
		 */
		byte[] data = dataTracking.get(channel);
		dataTracking.remove(channel);
		
		// Something to notice here is that reads and writes in NIO go directly to the channel and in form of 
		// a buffer.
		channel.write(ByteBuffer.wrap(data));
		
		// Since we wrote, then we should register to read next, since that is the most logical thing
		// to happen next. YOU DO NOT HAVE TO DO THIS. But I am doing it for the purpose of the example
                // Usually if you register once for a read/write/connect/accept, you never have to register again for that unless you 
                // register for none (0). Like it said, I am doing it here for the purpose of the example. The same goes for all others. 
		key.interestOps(SelectionKey.OP_READ);
		
	}
	// Nothing special, just closing our selector and socket.
	private void closeConnection(){
		System.out.println("Closing server down");
		if (selector != null){
			try {
				selector.close();
				serverChannel.socket().close();
				serverChannel.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	/**
	 * Since we are accepting, we must instantiate a serverSocketChannel by calling key.channel().
	 * We use this in order to get a socketChannel (which is like a socket in I/O) by calling
	 *  serverSocketChannel.accept() and we register that channel to the selector to listen 
	 *  to a WRITE OPERATION. I do this because my server sends a hello message to each
	 *  client that connects to it. This doesn't mean that I will write right NOW. It means that I
	 *  told the selector that I am ready to write and that next time Selector.select() gets called
	 *  it should give me a key with isWritable(). More on this in the write() method.
	 */
	private void accept(SelectionKey key) throws IOException{
		ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
		SocketChannel socketChannel = serverSocketChannel.accept();
		socketChannel.configureBlocking(false);
		
		socketChannel.register(selector, SelectionKey.OP_WRITE);
		byte[] hello = new String("Hello from server").getBytes();
		dataTracking.put(socketChannel, hello);
	}

	/**
	 * We read data from the channel. In this case, my server works as an echo, so it calls the echo() method.
	 * The echo() method, sets the server in the WRITE OPERATION. When the while loop in run() happens again,
	 * one of those keys from Selector.select() will be key.isWritable() and this is where the actual
	 * write will happen by calling the write() method.
	 */
	private void read(SelectionKey key) throws IOException{
		SocketChannel channel = (SocketChannel) key.channel();
		ByteBuffer readBuffer = ByteBuffer.allocate(1024);
		readBuffer.clear();
		int read;
		try {
			read = channel.read(readBuffer);
		} catch (IOException e) {
			System.out.println("Reading problem, closing connection");
			key.cancel();
			channel.close();
			return;
		}
		if (read == -1){
			System.out.println("Nothing was there to be read, closing connection");
			channel.close();
			key.cancel();
			return;
		}
		// IMPORTANT - don't forget the flip() the buffer. It is like a reset without clearing it.
		readBuffer.flip();
		byte[] data = new byte[1000];
		readBuffer.get(data, 0, read);
		System.out.println("Received: "+new String(data));

		echo(key,data);
	}

	private void echo(SelectionKey key, byte[] data){
		SocketChannel socketChannel = (SocketChannel) key.channel();
		dataTracking.put(socketChannel, data);
		key.interestOps(SelectionKey.OP_WRITE);
	}

}

Client

The client works exactly the same way as the server, except that instead of an ACCEPT OPERATION it contains a CONNECT OPERATION. If you followed the server example, then there are no surprises here.

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;


public class Main {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		String string1 = "Sending a test message";
		String string2 = "Second message";
		SocketTest test1 = new SocketTest(string1);
		Thread thread = new Thread(test1);
		thread.start();
		//thread2.start();
	}

	static class SocketTest implements Runnable {

		private String message = "";
		private Selector selector;


		public SocketTest(String message){
			this.message = message;
		}

		@Override
		public void run() {
			SocketChannel channel;
			try {
				selector = Selector.open();
				channel = SocketChannel.open();
				channel.configureBlocking(false);

				channel.register(selector, SelectionKey.OP_CONNECT);
				channel.connect(new InetSocketAddress("127.0.0.1", 8511));

				while (!Thread.interrupted()){

					selector.select(1000);
					
					Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

					while (keys.hasNext()){
						SelectionKey key = keys.next();
						keys.remove();

						if (!key.isValid()) continue;

						if (key.isConnectable()){
							System.out.println("I am connected to the server");
							connect(key);
						}	
						if (key.isWritable()){
							write(key);
						}
						if (key.isReadable()){
							read(key);
						}
					}	
				}
			} catch (IOException e1) {
				// TODO Auto-generated catch block
				e1.printStackTrace();
			} finally {
				close();
			}
		}
		
		private void close(){
			try {
				selector.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}

		private void read (SelectionKey key) throws IOException {
			SocketChannel channel = (SocketChannel) key.channel();
			ByteBuffer readBuffer = ByteBuffer.allocate(1000);
			readBuffer.clear();
			int length;
			try{
			length = channel.read(readBuffer);
			} catch (IOException e){
				System.out.println("Reading problem, closing connection");
				key.cancel();
				channel.close();
				return;
			}
			if (length == -1){
				System.out.println("Nothing was read from server");
				channel.close();
				key.cancel();
				return;
			}
			readBuffer.flip();
			byte[] buff = new byte[1024];
			readBuffer.get(buff, 0, length);
			System.out.println("Server said: "+new String(buff));
		}

		private void write(SelectionKey key) throws IOException {
			SocketChannel channel = (SocketChannel) key.channel();
			channel.write(ByteBuffer.wrap(message.getBytes()));

			// lets get ready to read.
			key.interestOps(SelectionKey.OP_READ);
		}

		private void connect(SelectionKey key) throws IOException {
			SocketChannel channel = (SocketChannel) key.channel();
			if (channel.isConnectionPending()){
				channel.finishConnect();
			}
			channel.configureBlocking(false);
			channel.register(selector, SelectionKey.OP_WRITE);
		}
	}
}

Well, I hope I made this easy to understand. This is the simplest I could get without getting into workerThreads, etc. Happy coding!

Comments are closed.