Wednesday, December 31, 2014

Inbound Endpoints in WSO2 ESB 4.9.0


        Inbound Endpoint is an endpoint which resides in server side of the ESB. Those are introduced to achieve   some of the main features like multi tenancy support for some protocols like JMS,by passing axis2 layer in  Inbound side,deploying and undeploying inbound endpoints without server restarting.   Basically there are two  behaviours introduced. Selection of the behaviour is done according to protocol.
  • Polling behavior
  • Listening behaviour    

Polling behavior
       This behaviour is in  JMS, File and Generic Inbound Endpoints. Basically it polls periodically for data and if data is available then injects that data to given sequence. for example in JMS inbound Endpoint it checks for JMS Queue periodically whether the data is available if so then take that message and inject it to given sequence.So this kind of inbound endpoints have one way operation. It does not require a result so in nature these are asynchronous.

Listening behaviour     
      This  behaviour is in HTTP and CXF-RS inbound endpoints. Basically listener is started in given port and it listens for requests coming to that port. When request is available it injects that request to the given sequence.Here those have two way operation and in nauture those are synchrounse.


Inbound Endpoint Configuration  

<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
                name="HttpListenerEP"
                sequence="TestIn"
                onError="fault"
                protocol="http"
                suspend="false">
  <parameters>
     <parameter name="inbound.http.port">8085</parameter>
  </parameters>
</inboundEndpoint>

According to the above configuration generic parameters for  inbound endpoints are added as attributes in inboundEndpoint element and protocol specific  parameters are given in the parameters element. 
 
Thread Pools in InboundEndpoint

    We need threadpool  to execute message injectors to the sequence. Messsage Injectors are handle by thread pool which avoids blocking in the message consumers from the source or listeners for the source.
There are three options that can be taken.
  • Per inbound thread pool.
  • Protocol specific thread pool.
  • One thread pool for all inbound endpoints.
currently we are having thread pool per inbound and we need to make it configurable.

Multi Tenancy in InboundEndpoint                   
    Multi Tenancy means more tenants are operated in single physical location with logically separated manner. So each tenant can have more inbound endpoints. so when introducing this there are major problems that we need to solved. Basically one way protocols like JMS, File and Generic are run top of the task since tasks are handling multi tenancy  we don't need to separately handle it.
  But in Listening  endpoints like Http and CXF-RS need to handle multi tenancy. So basic problem that we faced is port sharing and  tenant unloading. we need to do port sharing between tenants because number of ports are limited.When tenant loading and unloading synapse environment also get loaded and unloaded. So we keep common handler and Listener always running and in runtime we get the synapse environment and get inbound endpoints through it and do the relevant operations.

Clustering    
  This is achieved by using hazelcast framework and distributed locks.        

Tuesday, July 22, 2014

WSO2 ESB Message Flow

                  
WSO2 ESB

WSO2 ESB is the world fastest lightweight open source ESB build on top of Apache Synapse. Basically this works as mediation engine between your application and backend. 
ESB handles complexity of  transforming messages and routing messages to backend. ESB is easily configurable by any non technical person without having coding knowledge.

Message Flow

Before moving to the message flow we need to understand the high level architecture of the ESB.


esb_higlevel_archi.jpg

When request received from client that is taken from synapse transports Listeners. Basically WSO2 ESB has enabled Passthru transport by default which is non blocking high performing transport. Then request is handover to the Axis2 Engine which has message builders and message formatters for convert request into processable format inside synapse engine and in Axis2 handlers. Axis2 Engine handover the message to Synapse Mediation engine through Synapse Message Receivers. Then request is gone through relevant mediation works and finally it will hand over again to Axis2 Engine for format the message back to original format and send the mediated request to backend server through transports. Response from the backend server is also follow the same path.
Next we need to have a closer look into transports how actually request is processed and its status. Basically in transports we have Nhttp transport and Passthru transport  in WSO2 ESB

NHTTP Transport

NHttp transport is build on top of HTTPCore NIO.This is non blocking implementation of transport where readiness selection approach is implemented. How ever in this transport request gone through two buffers . when request is received by listener,  request is put into the input buffer and Axis2 Engine pick request and send it to the mediation and again Axis2 Engine put mediated request to the output buffer and sender poll from buffer and send message. basically this transport has two limitations.Building the whole message irrespective  of content awareness of the request.On the other hand two buffer architecture is not scalable with   millions  of events received. Binary relay was introduced to solve message body building of non content aware messages and it wraps byte stream with soap element and inject it to the message body. So it increases the performance of the ESB. To overcome two buffer case Passthru transport was introduced.

Passthru Transport.

Passthru transport has single buffer architecture where inbound request and outbound request share same buffer. Passthru transport increases the performance   than in nhttp by avoiding two buffer architecture.

Status Of the Message

Although WSO2 ESB is stateless, Request and Responses have different status at Runtime.According to read portion of message it has been named as follows.

- Request  Ready  State
Request  is received to the transport and connection is added to the free connection pool of source configuration(Inbound Request) or target configuration(Outbound Request). Now synapse transport is ready to receive the request.

- Request/Response Head State
State where Request/Response  is received and creating the Source Request/Response or Target Request/Response  and reading Http Headers of the Request/Response.

- Request/Response Body State
State where actually content of the Message is written to the buffer or read from buffer.

- Request/Response Done  State
All portions of the Message is read or written.


Reactor Pattern
  
Reactor pattern is for event driven systems where to handle too many connections from a clients using single listening thread independently. Conventional approach is create a separate thread for each connection.This is not a scalable solution  because when number of connections got increased number of threads created are increased and creates overhead by context switching. On the other hand it wastes CPU cycles by listening to non event receiving connections. To overcome those problems this pattern was introduced.  Basically it has IOReactor which is single thread and any number of handlers can be registered with it. IO Reactor has a map with registered handlers and  eventtype.According to event type it receives reactor trigger the relevant handler.


Message Flow Implementation

 Server Startup

When starting WSO2 ESB it initiates  and starts  the PassThroughHttpListener, PassThroughHttpSSLListenr .PassThroughHttpSender and PassThroughHttpSSLSender.

  • PassThroughHttpListner/PassThroughHttpSSLListner
          You can configure the Listner in axis2.xml Transport In section. Then instance of this class             is created and init method will trigger.In the init method following class are get initiated.
  • Source Configuration (keeps information about passthrough.property files and axis2.xml transport In related information)
  • ServerIODispathcer (Wrapping class of the SourceHandler class)
  • SourceHandler (Handler which is registered in the IOReactor)
  • AxisServiceTrackerListner(Responsible for updating Services  )                                 
  
 After execution of init method start method will be executed.
  • Start the service tracker.
  • Initiate the DefaultListiningIOReactor  
  • Run IOReactor.execute(IODispath) in a separate thread.       
                   
Then here onwards DefaultListningIOReactor  is capable of receiving connections and requests from clients and handover it to the ServerIODispatcher.

  • PassThroughHttpSender/PassThroughHttpSSLSender
         You can configure the Sender in axis2.xml Transport out section. Then instance of this                    class is created and init method will trigger.In the init method following class are get                        initiated.
  • TargetConfiguration(keeps information about passthrough.property files and axis2.xml transport Out related information)
  • ClientIODisptach(Wrapping class of the TargetHandler class)
  • TargetHandler (Handler which is registered in the IOReactor)
  • DefaultConnectingIOReactor
  • Run IOReactor.execute(IODispath) in a separate thread.  


Inbound Request
Inbound message is the request from client until it delivers to Axis2Engine.

inbound_request.jpg

  • When creating new connection SourceHandler Connected method get executed and SourceContext is created for that connection which has connection information and status.
  • Http Connection is add to Source Configuration free connection list.
  • Then when request received SourceRequest is created and Start it .
  • In the Source Request Start() method New Pipe is created by passing Buffer factory and name as “source” and Base configuration.
  • Then Source request is passed to Server Worker object which is executed by thread pool.
  • SourceWorker is responsible for  create Axis2 Message Context and passed it to the Axis2 Engine.

   
OutBound Request
Outbound Request is mediated message after going through mediation engine and about to send actual backend endpoint.

outboundrequest.jpg

  • Mediated Axis2 message is received to PassThroughHttpSender  invoke method and it created Endpoint Reference from message Context if it is null submit Message Context  to submitResponse method. else sends it to  DeliveryAgent submit method
  • After receiving message to DeliveryAgent submit method it creates http route and get queue related to that http route.if it is not full insert message in to the queue.  and then call tryNextMessage method.
  • Then request passed to the http Nio core and TargetHandler connected method get executed and new TargetContext is created and add connection to the pool then calls the Dilivery agent Connected method.
  • In DiliveryAgent connected method it poll the message from queue and send it to the tryNextMessage method.
  • In tryNextMessage method it saves the Message Context in the TargetContext and  calls the submitRequest in the DiliveryAgent.
  • submitRequest method creates TargetRequest and extract source pipe and set it to the TargetRequest.
  • TargetHandler requestReady method will be called  as soon as HttpRequest Headers are availiable and it writes HttpHeaders to TagertRequest.
  • Then TargetHandler outputReady method will fire and write content to the wire from buffer.


Receiving Response

response_recived.jpg
  • Response is first received to the TargetHandler  responseRecived method and then it creates TargetResponse object and call start method of it.
  • TargetResponse start method creates pipe called target.
  • Then TargetResponse is passed to the ClientWorker and it creates response message context  and passed it to the Axis2.
  • After calling inputReady method it writes the content of the response to the target buffer.
  • After submitting ResponseMsgCtx to the Axis2 SyanspecallBack Receiver receives the message and send it to the mediation by creating SynapseMessageContext.

Sending Response

sending_response.jpg

  • Axis2 MessageContext is received to the  PassThroughHttpSender submitResponse  method and it creates source response and set it to the Source Context and update its status .It extracts the target pipe and set as writer in source context.
  • Then as soon as Headers are available  SourceHandler responseReady method will hit and connection is passed. then it starts the SourceResponse.
  • In the Start method it will create the HttpResponse and write avaliable Headers to it.
  • Then when body is available  content is write from buffer to httpresponse and write to the wire.

    References