{"id":16803,"date":"2015-01-21T10:47:11","date_gmt":"2015-01-21T05:17:11","guid":{"rendered":"http:\/\/www.tothenew.com\/blog\/?p=16803"},"modified":"2015-01-21T10:47:11","modified_gmt":"2015-01-21T05:17:11","slug":"realtime-event-processing-with-esper","status":"publish","type":"post","link":"https:\/\/www.tothenew.com\/blog\/realtime-event-processing-with-esper\/","title":{"rendered":"Realtime Event processing with Esper"},"content":{"rendered":"<p>In one of the recent use case, we had to implement a complex event processing in real time mode. Storm is used as real time processing engine, but since It doesn&#8217;t provide batching of events therefore we took upon Esper to do the required job.<\/p>\n<p>Esper can be thought as a complex event processing (CEP) component generally used for event series analysis. Complex event processing (CEP) delivers high-speed processing of many events across all the layers identifying the most meaningful events within the event cloud, analyzing their impact, and taking subsequent action in real time.<\/p>\n<p>In this use case, we get a real time stream from Kafka, which passes through Esper for event processing.<\/p>\n<p>Steps to configure esper:<\/p>\n<ul>\n<li>Create KafkaSpout which will accept the real time streams from Kafka<\/li>\n<li>Create BeanClass which will define the fields that used in Esper for processing the data using query language.<\/li>\n<li>Create KafkaBolt which will emits the stream to the Esper bolt using bean class.<\/li>\n<li>Create EsperBolt which will accept the stream from KafkaBolt, do event processing using the query language<\/li>\n<\/ul>\n<p>Here is the source code of bean class and bolt.<\/p>\n<p>EsperTestBean<\/p>\n<p>[java]<br \/>\npublic class EsperTestBean {<\/p>\n<p>    String line;<\/p>\n<p>    public String getLine() {<br \/>\n        return line;<br \/>\n    }<\/p>\n<p>    public void setLine(String line) {<br \/>\n        this.line = line;<br \/>\n    }<\/p>\n<p>    public static EsperTestBean parse(String line) {<br \/>\n        EsperTestBean esperTestBean = new EsperTestBean();<br \/>\n        esperTestBean.setLine(line);<br \/>\n        return esperTestBean;<br \/>\n    }<br \/>\n}<br \/>\n[\/java]<\/p>\n<p>EsperBolt:<\/p>\n<p>[java]<br \/>\nimport backtype.storm.task.OutputCollector;<br \/>\nimport backtype.storm.task.TopologyContext;<br \/>\nimport backtype.storm.topology.OutputFieldsDeclarer;<br \/>\nimport backtype.storm.topology.base.BaseRichBolt;<br \/>\nimport backtype.storm.tuple.Tuple;<br \/>\nimport com.espertech.esper.client.*;<br \/>\nimport org.apache.commons.logging.Log;<br \/>\nimport org.apache.commons.logging.LogFactory;<\/p>\n<p>import java.util.List;<br \/>\nimport java.util.Map;<\/p>\n<p>public class ProcessLineEsperBolt extends BaseRichBolt {<\/p>\n<p>    private static final long serialVersionUID = 1L;<br \/>\n    private static final Log log = LogFactory.getLog(ProcessLineEsperBolt.class);<\/p>\n<p>    private OutputCollector collector;<br \/>\n    private EPServiceProvider epService;<\/p>\n<p>    @SuppressWarnings(&quot;rawtypes&quot;)<br \/>\n    @Override<br \/>\n    public void prepare(Map stormConf, TopologyContext context,<br \/>\n                        OutputCollector collector) {<br \/>\n        this.collector = collector;<br \/>\n        this.setUpEsper();<br \/>\n    }<\/p>\n<p>    private void setUpEsper() {<\/p>\n<p>        \/\/Register the BeanClass as an EventType<br \/>\n        Configuration configuration = new Configuration();<br \/>\n        configuration.addEventType(&quot;EsperTestBean&quot;, EsperTestBean.class.getName());<\/p>\n<p>        epService = EPServiceProviderManager.getDefaultProvider(configuration);<br \/>\n        epService.initialize();<\/p>\n<p>        \/\/Processing events in batch of every 1 minute using bean class fields<br \/>\n        EPStatement visitorsStatement = epService.getEPAdministrator().<br \/>\n                createEPL(&quot;select  line as found from EsperTestBean.win:time(1 min) output snapshot every 1 minute&quot;);<br \/>\n        visitorsStatement.addListener(new UpdateListener() {<\/p>\n<p>           \/\/ Listener that provides the new events and old events array.<br \/>\n            @Override<br \/>\n            public void update(EventBean[] newEvents, EventBean[] oldEvents) {<br \/>\n                if (newEvents != null) {<br \/>\n                    System.out.println(&quot;Batch Length::::::::::::::::::::::&quot; + newEvents.length);<br \/>\n                    for (EventBean e : newEvents) {<br \/>\n                        System.out.println(&quot;online &#8212;&#8212;&#8212;&#8212;&#8212;&#8212;&#8212;&#8212;&#8212;&#8212;&#8212;&#8212;&#8212;&#8212;&#8212;-line: &quot; + e.get(&quot;found&quot;));<br \/>\n                    }<br \/>\n                }<br \/>\n            }<\/p>\n<p>        });<br \/>\n    }<\/p>\n<p>    @Override<br \/>\n    public void execute(Tuple input) {<br \/>\n        List&lt;Object&gt; values = input.getValues();<br \/>\n        epService.getEPRuntime().sendEvent(values.get(0));<br \/>\n        collector.ack(input);<br \/>\n    }<\/p>\n<p>      @Override<br \/>\n    public void declareOutputFields(OutputFieldsDeclarer declarer) {<\/p>\n<p>    }<\/p>\n<p>}<br \/>\n[\/java]<\/p>\n<p>In above mentioned code, we process stream for every 1 minute. Esper provide the different type of event processing method like window, time based and many more.<\/p>\n<p>Hope this blog will help you to implement the event processing using Esper.<\/p>\n<p>For Complete Source Code: <a title=\"https:\/\/github.com\/IntelliGrape\/bigdata-poc\/tree\/master\/storm-batch-processing\" href=\"https:\/\/github.com\/IntelliGrape\/bigdata-poc\/tree\/master\/storm-batch-processing\">https:\/\/github.com\/IntelliGrape\/bigdata-poc\/tree\/master\/storm-batch-processing<\/a><\/p>\n","protected":false},"excerpt":{"rendered":"<p>In one of the recent use case, we had to implement a complex event processing in real time mode. Storm is used as real time processing engine, but since It doesn&#8217;t provide batching of events therefore we took upon Esper to do the required job. Esper can be thought as a complex event processing (CEP) [&hellip;]<\/p>\n","protected":false},"author":47,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"iawp_total_views":14},"categories":[1395],"tags":[1197,1602,1605,1604,1603],"aioseo_notices":[],"_links":{"self":[{"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/posts\/16803"}],"collection":[{"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/users\/47"}],"replies":[{"embeddable":true,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/comments?post=16803"}],"version-history":[{"count":0,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/posts\/16803\/revisions"}],"wp:attachment":[{"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/media?parent=16803"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/categories?post=16803"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/tags?post=16803"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}