Saturday, July 30, 2016

Implementation of Reactor Pattern Using Java Selector.


As you already know web servers has evolved to a state where it should handle thousands of requests simultaneously.Handling thousands of connections concurrently is the main challenge faced by developers. As a result of researches went on Non Blocking IO  is invented. So Reactor Pattern is the main concept behind that. This post mainly explained how we can implement Reactor pattern using Java Selectors.


print.png


As you can see in the diagram Acceptor thread is synchronous blocking demultiplexer. Acceptor thread synchronously polls selector  and if there are ready channels which can be accepted then it accept those channels and placed accepted channels in the channel pool. If there is a  free IO worker then it will pick an accepted channel and start to listen for IO events in that channel. Listening for IO Events for accepted channel is again done via by polling Selector.Each and every accepted channel should be registered  with Selector for capture read events.

Following the actual implementation of how it can be done.

package reactor.pattern;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * An Actor class which accepts the incoming connections
 */
public class Acceptor {

    private ServerSocketChannel serverSocketChannel;
    private Reactor[] reactors;

    private Selector selector;
    private int noOfReactors;

    public Acceptor(String host, int port, int noOfWorkerThreads) {
        try {
            selector = Selector.open();
            reactors = new Reactor[noOfWorkerThreads];
            this.noOfReactors = noOfWorkerThreads;
            for (int i = 0; i < noOfWorkerThreads; i++) {
                reactors[i] = new Reactor();
                Thread thread = new Thread(reactors[i]);
                thread.start();
            }

            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(host, 9000));
            serverSocketChannel.configureBlocking(false);

            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        } catch (IOException e) {
            //handle exceptions
        }
    }

    public void start() throws IOException {

        int i = 0;

        while (true) {

            int readyChannels = selector.select();
            if (readyChannels == 0)
                continue;

            Set<SelectionKey> selectedKeys = selector.selectedKeys();

            Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

            while (keyIterator.hasNext()) {

                SelectionKey key = keyIterator.next();

                if (key.isAcceptable()) {

                    ServerSocketChannel serverSocket = (ServerSocketChannel) key.channel();
                    SocketChannel socket = serverSocket.accept();
                    reactors[i % noOfReactors].addChannel(socket);
                    i++;

                }

                keyIterator.remove();
            }
        }
    }

    public static void main(String[] args) {
        Acceptor acceptor = new Acceptor("localhost",9000,4);
        try {
            acceptor.start();
        } catch (IOException e) {
           // e.printStackTrace();
        }
    }
}


package reactor.pattern;

import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Reactor implements Runnable {

    private Queue queue = new ConcurrentLinkedQueue<SocketChannel>();

    private Selector selector;

    public Reactor() {
        try {
            this.selector = Selector.open();
        } catch (IOException e) {
            // e.printStackTrace();
        }
    }

    public void addChannel(SocketChannel socketChannel) {
        queue.add(socketChannel);
    }

    @Override
    public void run() {

        while (true) {

            try {
                SocketChannel socketChannel = (SocketChannel) queue.poll();
                if (socketChannel == null)
                    continue;

                socketChannel.configureBlocking(false);
                socketChannel.register(selector, SelectionKey.OP_READ);

                int readyChannels = selector.select();

                if (readyChannels == 0)
                    continue;

                Set<SelectionKey> selectedKeys = selector.selectedKeys();

                Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

                while (keyIterator.hasNext()) {

                    SelectionKey key = keyIterator.next();

                    if (key.isReadable()) {
                        // Read the channel

                    }

                    keyIterator.remove();
                }
            } catch (IOException e) {
                //handle IOExceptions
            }
        }
    }
}

Saturday, July 16, 2016

Architecture and Implementation of the WSO2 Carbon Transport

WSO2 has a product stack  which can be used in different integration scenarios with different architectures. Any how all of those products are built in well managed way. There is no single code which is duplicated. All our products are collection of  software libraries written by WSO2 engineers or third party open source libraries. So we have well defined libraries which are used by set of products. Carbon Transport is a such library that many of the products uses to managed transport layer.

When it comes to middleware layer definitely it should have interface for  receive and send messages which are coming through networking layer. So receiving and sending is the main objective of the Carbon-Transport. It is written on top of the Netty.Netty is a lightweight high performing network application library which is based on pipeline architecture.  I have explained the concepts of Netty in my previous article. Following is the architecture of the Carbon Transport.

gw_thread_model(1).png

Basic architectural components

  • Netty Pipeline
As I mentioned earlier we have used Netty as a underlying library for create interface in between networking layer and application layer. So at the bootstrapping phase we are parsing the configuration file and pump relevant configuration parameters into Netty ServerBootstrap and Client Bootstrap for listener and sender side initialization.

When creating Server or Client side Bootstraps we need to implement ChannelInitializer which has the set of Netty Handlers which are used to create Netty Pipeline. So Pipeline represents each and every accepted socket and all the events related to that socket are handled through that pipeline.So if we have 100 connections then 100 pipleine objects are created .  

So in carbon transport we are using HTTPRequest and Response encoders , decoders as well streamers and object aggregators. In order to support SSL Netty has a inbuilt SSLHandler which can be used by passing SSLEngine according to our customizations.  As a final handler our custom handler implementation is engaged and it is used as the interface between Netty Pipeline and the Messaging Engine.


  • Application Level Threads

We have used both WorkerPool,  based on Executor Services and a Disruptor thread model. WorkerPool is used as a default thread model and for specific scenarios like for passthru scenarios we can enable the disruptor thread model. Disruptor is a locking free ring buffer which can be used to  communicate in between two threads in lock free manner.  Actually Disruptor performs well compared with Queue based implementations but there should be restricted environment such as Disruptor thread should run in high priority and other threads should not interrupt Disruptor threads. I will discuss Disruptor based thread models  in incoming articles. So we have threadpool or Disruptor as application level threads for where messaging engine is operated.


  • Connection Pools
We have used connection pools for sender side so by using that we can reused already created connections with the back end which avoids unnecessary connection creation with in BackEnd endpoints. We are using three connection pooling strategies

  • Per Source Channel Connection Pool
  • Pool of Pools
  • One Pool

According to the configuration we can select out of one of the above and for high speed messaging we can use Per Source Channel Connection Pool where we maintained the target connections for each and every incoming  connection.So scope is limited to source channel and it will reduce the contention for access connections.

Pool of Pools has a  bit higher contention when retrieving connections and we are using Apache commons pool as the Source for Pool implementation.  This keeps a set of isolated pools and those pools can have duplicate entries for same endpoint .So contention is only within the pool.

When we used One pool performance may be reduced but you can keep minimum number of connections with the backend endpoints.

So those are the basic concepts behind the implementation of the Carbon Transport.

Sunday, October 11, 2015

Introduction to Netty


What is Netty ?

Netty is an open source Java NIO framework which can be used to develop network related applications.In this tutuorial I am going to exaplain basic architecture of the netty and how it is used to develop basic appilcations.  

Netty Concepts

  • Bootstraps
                  Bootstrapping is used to create netty channels which is  used to handle incoming  and outgoing connections and events through it. Basically inbound side consists of ServerBootstrap. ServerBootstrap is used to configure and create channels (TCP , HTTP .. etc)  for incoming client connections.  ServerBootstrap is extended from Bootstrap which can be used to configure client Bootstrap as well.  Client Bootstrap is used to  make connections  with backend servers.
example :-
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(4);
try {
  ServerBootstrap b = new ServerBootstrap();
  b.option(ChannelOption.SO_BACKLOG, 1024);
  b.group(bossGroup, workerGroup)
   .channel(NioServerSocketChannel.class)
   .handler(new LoggingHandler(LogLevel.INFO))
   .childHandler(new ServerInitializer());

  Channel ch = b.bind(PORT).sync().channel();

  System.err.println("Open your web browser and navigate to " +
          (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');

  ch.closeFuture().sync();
} finally {
  bossGroup.shutdownGracefully();
  workerGroup.shutdownGracefully();
}

Here we are creating ServerBootstrap and configure with transport level parameters. You can see that it has given with two eventloop groups called master and worker. It is explained in the following sections. Basically it used for handle incoming connections.

  • EventLoop and EventLoopGroup

EventLoop is like a Java thread which is used to handle connections. There are varoiuse EventLoop implementations in netty such NioEventLoop , UDPDatagramEventLoop .. etc. EventLoopGroup is a collection of Event Loops. Basically we can define how many Event Loops should be included in the EventLoopGroup when it is creating.  When creating ServerBootstrap we have given that Master and Worker EventLoopGroups. Master EventLoopGroup is responsible for accepting incoming connections and Worker EventLoopGroup is responsible for handling IO operations of the accepted connections.Each accepted connection is bound to particular Worker EventLoop for its life time.

  • Channel and ChannelHandlerContext
 Netty channel is a  representation of an accepted connection.Netty channel is bound to a one particular  EventLoop for it is life time.  For each channel netty creates a channel pipeline and I will discuss it in the following sections. we can save attributes in channel  which can be retrieved in later calls of the channel. ChannelHandlerContext is unique for each handler in the pipeline. handler concept also describes in later part of this tutorial.  

  • Channel Handlers

Basically netty has Inbound and Outbound handlers.   Inbound handlers are executed when IO events are read from socket to application and Outbound handlers are executed when IO events are written from application to socket. Netty Handlers can be shared with netty pipelines but the recommended approach is not share handlers .Handler  has a specific logic to do some applications specific logic with incoming messages it can be a byte to message conversion , message to message conversion or message to byte conversion. There can be multiple handlers executed inside a netty pipeline.


  • Netty Pipeline
              Pipeline is a representation of a message flow of particular accepted connection. Handlers can be  integrated into pipeline and for each connection it creates a pipeline.  All the handlers in pipeline are thread safe instead if  handler is shared. Following is the sample code for pipeline creation.

public class ServerInitializer extends ChannelInitializer<SocketChannel> {

  
   @Override
  public void initChannel(SocketChannel ch) {
      ChannelPipeline p = ch.pipeline();
            p.addLast(new HttpServerCodec());
      p.addLast(new ServerHandler());
  }
}


Handlers in the pipeline are executed according to the integrated positions in the pipeline. But in Inbound path Inbound Handlers are only executed accordingly and in Outbound direction Outbound Handlers are executed.

  • Important Facts

   Do not block the handler execution throughout the handler implementation code. If handler is got blocked,  the underlying Event Loop which is executing that pipeline will be blocked. So it will affect the IO WorkerPool of netty to be get  overloaded and ultimate connection processing will be delayed.

You can execute Netty Handler in a separate WorkerPool  by providing executorservice when adding it to the channel pipeline as follows.

p.addLast(“executorservice” ,new ServerHandler());




       










Tuesday, October 6, 2015

Introduction to Java NIO

                              

           IO Operations plays an important role when it comes to integrate network level operations into software applications.Assume you are going to develop an application which analysis some incoming event stream and output important relationship between the events in the event stream and generate new events send them as separate event streams to different applications which are decoupled through network level or hardware level.


Definitely your application should listen for incoming events from the network level and write to network. So before developing the application developer should have a basic understanding of the network level operations. So in this post I would like to introduce some JAVA related IO Operation handling strategies .

Socket

A socket is one endpoint of a two-way communication link between two programs running on the network. Socket classes are used to represent the connection between a client program and a server program. The java.net package provides two classes--Socket and ServerSocket--that implement the client side of the connection and the server side of the connection, respectively.

Socket Channel

A Java NIO SocketChannel is a channel that is connected to a TCP network socket. There can be more than one SocketChannel connected to a Socket which means there can be many open connections  in one endpoint.


Blocking IO

At the beginning  of  network programming this model is used. Separate connection need to be handle by separate thread . So this method is not scalable with high number of connections for a socket.

blocking.png

Non Blocking IO

With the introduction of Java 1.4  they have introduced Non-Blocking IO which means connection processing threads are not wait for a socketchannel. so we can processed IO events in Asynchronous manner .

nonblocking.png




Reactor Pattern

Reactor pattern in the domain of Java NIO is used  to handle incoming connections and their  related operations in more scalable and accepted manner with high concurrency scenarios. Basically there is manager thread which manage connections and handover connections to worker threads carefully.  Thereafter worker threads are responsible for handling IO events for connections lifetime.

Selector
Selector is the way to  make system calls to the operating system for detect events through OS. Instead of using multiple threads , using selector which is one thread responsible for  handling multiple concurrent channels is minimized thread switching overhead and increase scalability with increasing connections. Channels are registered to selector based on operations which called Selection Keys. There are four operations where channel can fire event to Selector They are as

  • Connect
  • Accept
  • Read
  • Write
while registering a channel to selector you can specify I am interesting in this  kind of event of the channel and  as well as you can attached an object to the selector for identify the channel uniquely.

Byte Buffers
Another  interesting component in the IO Operations is Byte Buffers. Basically we have
  • Direct Byte Buffers
  • Heap Byte Buffers

Direct Byte Buffers are the  Byte Buffers which are allocated inside the memory .So allocation and deallocation has  high cost  and need to re use. But if using direct byte buffers zero copy capability can be enabled.

Heap byte buffers can be fastly allocate and deallocate in JVM heap space.

There are common Bytebuffer operations that are heavily used with Byte Buffers.ByteBuffer has index  , limit and capacity.  index is the current position of read or write. limit is the position of new data written in read mode . capacity is the maximum size  that data can be written.

  • Flip ()
                  Calling this method will be caused to switch to read mode of the byte buffer. then index will be came to lastest  read position and limit is set to maximum index of newly written data.

  • Compact()
         Enables write mode and move data to beginning of the buffer.   and update index latest value of the read position and limit to capacity.           

  • Clear()
   Enables write mode and move index to beginning of the buffer and limit to capacity.

  • hasRemaining()
before calling this buffer need to be  flip and then this will written difference between  index and limit.

Sample Http Reverse Proxy Written Using HttpCore

For better understanding and investigation of the Java NIO implementations and how to use it actually for read and write data can be seen at  [1] . This basically read the data from socket and write data to socket using Java NIO library called Http Core.







Wednesday, July 22, 2015

Dynamic Address Resolution For Endpoints used in WSO2 ESB

 In real business situations , there are different environments (DEV , QA , PREPROD , PRODUCTION) and endpoints may get changed accordingly.  But it is very hard to change endpoint urls from one environment to another.

We can introduce some workarounds for dynamic address resolution .We can load endpoint urls from registry resources and substitute those values at the run time. Following is an example on how to configure it.

<inSequence>
        <property name="AppServer"
                  expression="get-property('registry','gov:/serverdetails/servers.xml')"
                  scope="default"
                  type="OM"/>
        <property name="AppServerIP"
                  expression="$ctx:AppServer//as/ip/text()"
                  scope="default"
                  type="STRING"/>
        <property name="AppServerPort"
                  expression="$ctx:AppServer//as/port/text()"
                  scope="default"
                  type="STRING"/>
        <property name="service_ep"
                  expression="fn:concat('http://' ,get-property('AppServerIP'), ':' , get-property('AppServerPort') ,'//services/TargetService')"/>
        <header name="To" expression="get-property('service_ep')"/>
        <send>
           <endpoint key="TargetEndpoint"/>
        </send>
     </inSequence>


<endpoint xmlns="http://ws.apache.org/ns/synapse" name="TargetEndpoint">
  <default>
     <suspendOnFailure>
        <progressionFactor>1.0</progressionFactor>
     </suspendOnFailure>
     <markForSuspension>
        <retriesBeforeSuspension>0</retriesBeforeSuspension>
        <retryDelay>0</retryDelay>
     </markForSuspension>
  </default>
</endpoint>

It will read servers.xml file stored in registry and load the values at runtime.