Saturday, July 30, 2016

Implementation of Reactor Pattern Using Java Selector.


As you already know web servers has evolved to a state where it should handle thousands of requests simultaneously.Handling thousands of connections concurrently is the main challenge faced by developers. As a result of researches went on Non Blocking IO  is invented. So Reactor Pattern is the main concept behind that. This post mainly explained how we can implement Reactor pattern using Java Selectors.


print.png


As you can see in the diagram Acceptor thread is synchronous blocking demultiplexer. Acceptor thread synchronously polls selector  and if there are ready channels which can be accepted then it accept those channels and placed accepted channels in the channel pool. If there is a  free IO worker then it will pick an accepted channel and start to listen for IO events in that channel. Listening for IO Events for accepted channel is again done via by polling Selector.Each and every accepted channel should be registered  with Selector for capture read events.

Following the actual implementation of how it can be done.

package reactor.pattern;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * An Actor class which accepts the incoming connections
 */
public class Acceptor {

    private ServerSocketChannel serverSocketChannel;
    private Reactor[] reactors;

    private Selector selector;
    private int noOfReactors;

    public Acceptor(String host, int port, int noOfWorkerThreads) {
        try {
            selector = Selector.open();
            reactors = new Reactor[noOfWorkerThreads];
            this.noOfReactors = noOfWorkerThreads;
            for (int i = 0; i < noOfWorkerThreads; i++) {
                reactors[i] = new Reactor();
                Thread thread = new Thread(reactors[i]);
                thread.start();
            }

            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(host, 9000));
            serverSocketChannel.configureBlocking(false);

            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        } catch (IOException e) {
            //handle exceptions
        }
    }

    public void start() throws IOException {

        int i = 0;

        while (true) {

            int readyChannels = selector.select();
            if (readyChannels == 0)
                continue;

            Set<SelectionKey> selectedKeys = selector.selectedKeys();

            Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

            while (keyIterator.hasNext()) {

                SelectionKey key = keyIterator.next();

                if (key.isAcceptable()) {

                    ServerSocketChannel serverSocket = (ServerSocketChannel) key.channel();
                    SocketChannel socket = serverSocket.accept();
                    reactors[i % noOfReactors].addChannel(socket);
                    i++;

                }

                keyIterator.remove();
            }
        }
    }

    public static void main(String[] args) {
        Acceptor acceptor = new Acceptor("localhost",9000,4);
        try {
            acceptor.start();
        } catch (IOException e) {
           // e.printStackTrace();
        }
    }
}


package reactor.pattern;

import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Reactor implements Runnable {

    private Queue queue = new ConcurrentLinkedQueue<SocketChannel>();

    private Selector selector;

    public Reactor() {
        try {
            this.selector = Selector.open();
        } catch (IOException e) {
            // e.printStackTrace();
        }
    }

    public void addChannel(SocketChannel socketChannel) {
        queue.add(socketChannel);
    }

    @Override
    public void run() {

        while (true) {

            try {
                SocketChannel socketChannel = (SocketChannel) queue.poll();
                if (socketChannel == null)
                    continue;

                socketChannel.configureBlocking(false);
                socketChannel.register(selector, SelectionKey.OP_READ);

                int readyChannels = selector.select();

                if (readyChannels == 0)
                    continue;

                Set<SelectionKey> selectedKeys = selector.selectedKeys();

                Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

                while (keyIterator.hasNext()) {

                    SelectionKey key = keyIterator.next();

                    if (key.isReadable()) {
                        // Read the channel

                    }

                    keyIterator.remove();
                }
            } catch (IOException e) {
                //handle IOExceptions
            }
        }
    }
}

No comments:

Post a Comment