{"id":50113,"date":"2017-08-30T15:10:13","date_gmt":"2017-08-30T09:40:13","guid":{"rendered":"http:\/\/www.tothenew.com\/blog\/?p=50113"},"modified":"2017-08-30T15:10:14","modified_gmt":"2017-08-30T09:40:14","slug":"asynchronous-programming-aggregation-using-apache-camel","status":"publish","type":"post","link":"https:\/\/www.tothenew.com\/blog\/asynchronous-programming-aggregation-using-apache-camel\/","title":{"rendered":"Asynchronous Programming &amp; Aggregation using Apache Camel"},"content":{"rendered":"<p>In any user facing and real time application, data is of utmost importance as it directly affects any business&#8217;s market and revenue. Recently, we were needed to build an OTA for booking hotels; wherein User may book hotel rooms for specific dates. For this, we integrated with multiple Suppliers for fetching hotel information which resulted in a massive amount of data both from internal and external sources. We needed to process the data to add context, relevance and purpose and also show relevant data to end user along with performance being the top most priority.<\/p>\n<p>Consider the case of searching for available hotel rooms. This is something that needs to be kept updated\u00a0almost every second given the nature of the industry. With the fast changing nature of hotel&#8217;s availability, storing and retrieving data from an internal database would more often give stale results.<br \/>\nWe were required to\u00a0search multiple external data sources for availability data based on User&#8217;s search criteria (check-in\/check-out date, number of rooms, occupancy in each room, etc.)\u00a0at real-time. Also, aggregate and deduplicate similar results from different sources\u00a0along with conversion\/serialization according to system&#8217;s need and then serve it back to the end user. Phew !!<br \/>\nThen also there would be times when we wanted to split the same user request into multiple batches to execute against the same data source\/endpoint for different payloads.<\/p>\n<p>If the above was too trippy for you, the below points could provide more insights<\/p>\n<ul>\n<li><strong>Fetching Data at Runtime from External Sources: <\/strong>Availability data had to be fetched through runtime APIs from the Supplier based on User&#8217;s search criteria. This was also essential since storing Supplier Availability Data for different search parameters (check-in\/check-out dates, number of rooms, occupancy in each room) in our DBs would have been a cumbersome operation.<\/li>\n<li><strong>Aggregation and De-duplication: <\/strong>While fetching data from multiple Suppliers, there is a high probability of duplicate data being received from multiple Suppliers (same hotels\/same rooms for a single hotel). Therefore, we had to perform deduplication, wherein business logic was applied to remove duplicate data and add relevance to it as per User satisfaction.<\/li>\n<li><strong>Converters and Serializers:<\/strong> Given the multitude of sources, we would be pulling data from the number of formats that need to be understood, converted and massaged increase considerably.\u00a0Each source can have their own data definition and response format which might not be universal.<\/li>\n<\/ul>\n<p><strong>Keep in mind, all of this need to be done within the context of a delightful user experience flow, i.e., nothing that takes more than 500 ms.<\/strong><\/p>\n<p><span style=\"font-size: 1rem\">After much analysis of different routing frameworks and plug and play ESB solutions, we settled in with Apache Camel.<\/span><\/p>\n<p><a href=\"http:\/\/camel.apache.org\/\">Apache Camel\u2122<\/a> is one of the most popular Enterprise Integration Patterns that allows conditional routing in any of defined domain-specific languages, be it Java, XML or Scala. A Route is a URI that contains flow and integration logic using a specific DSL.<\/p>\n<p>Now let&#8217;s look at the above cases and\u00a0how Apache Camel would be able to provide easy out of the box techniques to achieve them. If you take the above-explained scenario of real time user search, we can take the below figure as a solution for how we can break the problem into smaller parts &#8211; aka <strong>split()<\/strong> and <strong>multicast()<\/strong><\/p>\n<p><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-full wp-image-51662\" src=\"\/blog\/wp-ttn-blog\/uploads\/2017\/08\/Screenshot-from-2017-08-28-161856.png\" alt=\"Screenshot from 2017-08-28 16:18:56\" width=\"1366\" height=\"768\" srcset=\"\/blog\/wp-ttn-blog\/uploads\/2017\/08\/Screenshot-from-2017-08-28-161856.png 1366w, \/blog\/wp-ttn-blog\/uploads\/2017\/08\/Screenshot-from-2017-08-28-161856-300x168.png 300w, \/blog\/wp-ttn-blog\/uploads\/2017\/08\/Screenshot-from-2017-08-28-161856-1024x575.png 1024w, \/blog\/wp-ttn-blog\/uploads\/2017\/08\/Screenshot-from-2017-08-28-161856-624x350.png 624w\" sizes=\"(max-width: 1366px) 100vw, 1366px\" \/><\/p>\n<p><strong><br \/>\n<span style=\"text-decoration: underline\">Part 1 ( Splitter )<\/span><\/strong><\/p>\n<p>Here, we have a URI (can be an API endpoint \/ a bean method, etc.), that takes a particular input parameter and we wish to call the URI with different input parameters parallelly and then aggregate the result.<\/p>\n<p>If we have a list of InputRequest objects and we wish to call &#8220;prepareOutput&#8221; method with each of them parallelly and then return a list of &#8220;OutputObjects&#8221;, we can use the <a href=\"http:\/\/camel.apache.org\/splitter.html\">split<\/a> functionality of Camel.<\/p>\n<p>Using Java DSL to perform split as follows:<\/p>\n<p>[code language=&#8221;java&#8221;]<br \/>\n RouteBuilder builder = new RouteBuilder() {<br \/>\n        public void configure() throws Exception {<br \/>\n            from(&#8216;direct:splitAsyncStart&#8217;)<br \/>\n                    .split(body(), new SplitAggregationClass()).parallelProcessing(true)<br \/>\n                    .to(&#8216;direct:processInputRequest&#8217;).end();<br \/>\n            from(&#8216;direct:processInputRequest&#8217;)<br \/>\n                    .bean(InputProcessor.class);<br \/>\n        }<br \/>\n    }<br \/>\n[\/code]<\/p>\n<p>In above JAVA DSL, whenever the endpoint <strong>direct:splitAsyncStart<\/strong> is called with payload as a List of &#8220;InputRequest&#8221; objects, they are split and sent to <strong>direct:processInputRequest<\/strong> endpoint as individuals. Response for each of them can be aggregated in the Aggregation class specified (SplitAggregationClass).<\/p>\n<p>[code language=&#8221;java&#8221;]<br \/>\npublic class SplitAggregationClass implements AggregationStrategy {<br \/>\n    public Exchange aggregate(Exchange exchange, Exchange exchange1) {<br \/>\n        try {<br \/>\n            String concatenatedString = &#8221;;<br \/>\n            OutputResponse outputResponse = exchange1.getIn().getBody(OutputResponse.class);<br \/>\n            int splitIndex = exchange1.getProperty(&#8216;CamelSplitIndex&#8217;, int.class);<br \/>\n            if (exchange != null) {<br \/>\n                concatenatedString = exchange.getIn().getBody(String.class);<br \/>\n                concatenatedString += &#8216; &#8216; + outputResponse.getInputRequest().getName();<br \/>\n            } else {<br \/>\n                exchange = exchange1;<br \/>\n                concatenatedString += &#8216;Hello! &#8216; + outputResponse.getInputRequest().getName();<br \/>\n            }<br \/>\n            System.out.println(&#8216;Split Index : &#8216; + splitIndex + &#8216; in Aggregation class with Response : &#8216; + outputResponse);<br \/>\n            exchange.getIn().setBody(concatenatedString);<br \/>\n        } catch (Exception e) {<br \/>\n            System.out.println(&#8216;Exception occurred while performing Aggregation&#8217;);<br \/>\n        }<br \/>\n        return exchange;<br \/>\n    }<br \/>\n}<br \/>\n[\/code]<\/p>\n<p><strong>Part 2 ( Multicast )<\/strong><\/p>\n<p>Here we have two or more different URIs that need to be hit with the same payload but probably with different request formats. To achieve this, we can use the multicast functionality of Camel. Multicast allows to route the same message to a number of endpoints and processes them in different ways.<\/p>\n<p>Let us illustrate following JAVA DSL :<\/p>\n<p>[code language=&#8221;java&#8221;]<br \/>\nRouteBuilder builder = new RouteBuilder() {<br \/>\n    public void configure() throws Exception {<br \/>\n\/\/ Multicast Route Configurations<br \/>\n        from(&#8216;direct:multicastAsyncStart&#8217;)<br \/>\n                .multicast().parallelProcessing(true).aggregationStrategy(new MultiCastAggregationClass())<br \/>\n                .to(&#8216;direct:multicastRoute1&#8217;)<br \/>\n                .to(&#8216;direct:multicastRoute2&#8217;)<br \/>\n                .to(&#8216;direct:multicastRoute3&#8217;)<br \/>\n                .end();<br \/>\n        from(&#8216;direct:multicastRoute1&#8217;)<br \/>\n                .bean(MulticastProcessing.class, &#8216;route1&#8217;);<br \/>\n        from(&#8216;direct:multicastRoute2&#8217;)<br \/>\n                .bean(MulticastProcessing.class, &#8216;route2&#8217;);<br \/>\n        from(&#8216;direct:multicastRoute3&#8217;)<br \/>\n                .bean(MulticastProcessing.class, &#8216;route3&#8217;);<br \/>\n    }<br \/>\n}<br \/>\n[\/code]<\/p>\n<p>In the above example, whenever the point <strong>direct:multicastAsyncStart<\/strong> is hit, the same payload is sent to 3 different endpoints (here <strong>direct:multicastRoute1, direct:multicastRoute2<\/strong> and <strong>direct:multicastRoute3<\/strong>). The response from all is then aggregated in the aggregation class MultiCastAggregationClass.<\/p>\n<p>[code language=&#8221;java&#8221;]<br \/>\npublic class MultiCastAggregationClass implements AggregationStrategy {<br \/>\n    public Exchange aggregate(Exchange exchange, Exchange exchange1) {<br \/>\n        if (exchange == null) {<br \/>\n            return exchange1;<br \/>\n        } else {<br \/>\n            String oldExchangeText = exchange.getIn().getBody(String.class);<br \/>\n            oldExchangeText += &#8216; &#8216; + exchange1.getIn().getBody(String.class);<br \/>\n            exchange.getIn().setBody(oldExchangeText, String.class);<br \/>\n            return exchange;<br \/>\n        }<br \/>\n    }<br \/>\n}<br \/>\n[\/code]<\/p>\n<p>To conclude, we can say that Apache Camel has many advantages over traditional ways of achieving parallel processing, both in terms of code complexity and management. \u00a0It provides us a structured approach taking care of the semantics and overheads of parallelization and aggregation.<\/p>\n<p>You can also go through a demo application which I had created for illustration of the above-explained strategies &#8211; <a href=\"https:\/\/github.com\/manali14\/camelAsyncDemo\">git@github.com:manali14\/camelAsyncDemo.git<\/a><\/p>\n<p>Hope it helps \ud83d\ude42<\/p>\n","protected":false},"excerpt":{"rendered":"<p>In any user facing and real time application, data is of utmost importance as it directly affects any business&#8217;s market and revenue. Recently, we were needed to build an OTA for booking hotels; wherein User may book hotel rooms for specific dates. For this, we integrated with multiple Suppliers for fetching hotel information which resulted [&hellip;]<\/p>\n","protected":false},"author":105,"featured_media":0,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"iawp_total_views":66},"categories":[446,1994,1],"tags":[18,4640,4641,4844,1564],"aioseo_notices":[],"_links":{"self":[{"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/posts\/50113"}],"collection":[{"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/users\/105"}],"replies":[{"embeddable":true,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/comments?post=50113"}],"version-history":[{"count":28,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/posts\/50113\/revisions"}],"predecessor-version":[{"id":51677,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/posts\/50113\/revisions\/51677"}],"wp:attachment":[{"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/media?parent=50113"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/categories?post=50113"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/tags?post=50113"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}