Saturday, July 30, 2016

Implementation of Reactor Pattern Using Java Selector.


As you already know web servers has evolved to a state where it should handle thousands of requests simultaneously.Handling thousands of connections concurrently is the main challenge faced by developers. As a result of researches went on Non Blocking IO  is invented. So Reactor Pattern is the main concept behind that. This post mainly explained how we can implement Reactor pattern using Java Selectors.


print.png


As you can see in the diagram Acceptor thread is synchronous blocking demultiplexer. Acceptor thread synchronously polls selector  and if there are ready channels which can be accepted then it accept those channels and placed accepted channels in the channel pool. If there is a  free IO worker then it will pick an accepted channel and start to listen for IO events in that channel. Listening for IO Events for accepted channel is again done via by polling Selector.Each and every accepted channel should be registered  with Selector for capture read events.

Following the actual implementation of how it can be done.

package reactor.pattern;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * An Actor class which accepts the incoming connections
 */
public class Acceptor {

    private ServerSocketChannel serverSocketChannel;
    private Reactor[] reactors;

    private Selector selector;
    private int noOfReactors;

    public Acceptor(String host, int port, int noOfWorkerThreads) {
        try {
            selector = Selector.open();
            reactors = new Reactor[noOfWorkerThreads];
            this.noOfReactors = noOfWorkerThreads;
            for (int i = 0; i < noOfWorkerThreads; i++) {
                reactors[i] = new Reactor();
                Thread thread = new Thread(reactors[i]);
                thread.start();
            }

            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(host, 9000));
            serverSocketChannel.configureBlocking(false);

            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        } catch (IOException e) {
            //handle exceptions
        }
    }

    public void start() throws IOException {

        int i = 0;

        while (true) {

            int readyChannels = selector.select();
            if (readyChannels == 0)
                continue;

            Set<SelectionKey> selectedKeys = selector.selectedKeys();

            Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

            while (keyIterator.hasNext()) {

                SelectionKey key = keyIterator.next();

                if (key.isAcceptable()) {

                    ServerSocketChannel serverSocket = (ServerSocketChannel) key.channel();
                    SocketChannel socket = serverSocket.accept();
                    reactors[i % noOfReactors].addChannel(socket);
                    i++;

                }

                keyIterator.remove();
            }
        }
    }

    public static void main(String[] args) {
        Acceptor acceptor = new Acceptor("localhost",9000,4);
        try {
            acceptor.start();
        } catch (IOException e) {
           // e.printStackTrace();
        }
    }
}


package reactor.pattern;

import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Reactor implements Runnable {

    private Queue queue = new ConcurrentLinkedQueue<SocketChannel>();

    private Selector selector;

    public Reactor() {
        try {
            this.selector = Selector.open();
        } catch (IOException e) {
            // e.printStackTrace();
        }
    }

    public void addChannel(SocketChannel socketChannel) {
        queue.add(socketChannel);
    }

    @Override
    public void run() {

        while (true) {

            try {
                SocketChannel socketChannel = (SocketChannel) queue.poll();
                if (socketChannel == null)
                    continue;

                socketChannel.configureBlocking(false);
                socketChannel.register(selector, SelectionKey.OP_READ);

                int readyChannels = selector.select();

                if (readyChannels == 0)
                    continue;

                Set<SelectionKey> selectedKeys = selector.selectedKeys();

                Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

                while (keyIterator.hasNext()) {

                    SelectionKey key = keyIterator.next();

                    if (key.isReadable()) {
                        // Read the channel

                    }

                    keyIterator.remove();
                }
            } catch (IOException e) {
                //handle IOExceptions
            }
        }
    }
}

Saturday, July 16, 2016

Architecture and Implementation of the WSO2 Carbon Transport

WSO2 has a product stack  which can be used in different integration scenarios with different architectures. Any how all of those products are built in well managed way. There is no single code which is duplicated. All our products are collection of  software libraries written by WSO2 engineers or third party open source libraries. So we have well defined libraries which are used by set of products. Carbon Transport is a such library that many of the products uses to managed transport layer.

When it comes to middleware layer definitely it should have interface for  receive and send messages which are coming through networking layer. So receiving and sending is the main objective of the Carbon-Transport. It is written on top of the Netty.Netty is a lightweight high performing network application library which is based on pipeline architecture.  I have explained the concepts of Netty in my previous article. Following is the architecture of the Carbon Transport.

gw_thread_model(1).png

Basic architectural components

  • Netty Pipeline
As I mentioned earlier we have used Netty as a underlying library for create interface in between networking layer and application layer. So at the bootstrapping phase we are parsing the configuration file and pump relevant configuration parameters into Netty ServerBootstrap and Client Bootstrap for listener and sender side initialization.

When creating Server or Client side Bootstraps we need to implement ChannelInitializer which has the set of Netty Handlers which are used to create Netty Pipeline. So Pipeline represents each and every accepted socket and all the events related to that socket are handled through that pipeline.So if we have 100 connections then 100 pipleine objects are created .  

So in carbon transport we are using HTTPRequest and Response encoders , decoders as well streamers and object aggregators. In order to support SSL Netty has a inbuilt SSLHandler which can be used by passing SSLEngine according to our customizations.  As a final handler our custom handler implementation is engaged and it is used as the interface between Netty Pipeline and the Messaging Engine.


  • Application Level Threads

We have used both WorkerPool,  based on Executor Services and a Disruptor thread model. WorkerPool is used as a default thread model and for specific scenarios like for passthru scenarios we can enable the disruptor thread model. Disruptor is a locking free ring buffer which can be used to  communicate in between two threads in lock free manner.  Actually Disruptor performs well compared with Queue based implementations but there should be restricted environment such as Disruptor thread should run in high priority and other threads should not interrupt Disruptor threads. I will discuss Disruptor based thread models  in incoming articles. So we have threadpool or Disruptor as application level threads for where messaging engine is operated.


  • Connection Pools
We have used connection pools for sender side so by using that we can reused already created connections with the back end which avoids unnecessary connection creation with in BackEnd endpoints. We are using three connection pooling strategies

  • Per Source Channel Connection Pool
  • Pool of Pools
  • One Pool

According to the configuration we can select out of one of the above and for high speed messaging we can use Per Source Channel Connection Pool where we maintained the target connections for each and every incoming  connection.So scope is limited to source channel and it will reduce the contention for access connections.

Pool of Pools has a  bit higher contention when retrieving connections and we are using Apache commons pool as the Source for Pool implementation.  This keeps a set of isolated pools and those pools can have duplicate entries for same endpoint .So contention is only within the pool.

When we used One pool performance may be reduced but you can keep minimum number of connections with the backend endpoints.

So those are the basic concepts behind the implementation of the Carbon Transport.