Realtime Event processing with Esper

21 / Jan / 2015 by Mohit Garg 2 comments

In one of the recent use case, we had to implement a complex event processing in real time mode. Storm is used as real time processing engine, but since It doesn’t provide batching of events therefore we took upon Esper to do the required job.

Esper can be thought as a complex event processing (CEP) component generally used for event series analysis. Complex event processing (CEP) delivers high-speed processing of many events across all the layers identifying the most meaningful events within the event cloud, analyzing their impact, and taking subsequent action in real time.

In this use case, we get a real time stream from Kafka, which passes through Esper for event processing.

Steps to configure esper:

  • Create KafkaSpout which will accept the real time streams from Kafka
  • Create BeanClass which will define the fields that used in Esper for processing the data using query language.
  • Create KafkaBolt which will emits the stream to the Esper bolt using bean class.
  • Create EsperBolt which will accept the stream from KafkaBolt, do event processing using the query language

Here is the source code of bean class and bolt.

EsperTestBean

public class EsperTestBean {

    String line;

    public String getLine() {
        return line;
    }

    public void setLine(String line) {
        this.line = line;
    }

    public static EsperTestBean parse(String line) {
        EsperTestBean esperTestBean = new EsperTestBean();
        esperTestBean.setLine(line);
        return esperTestBean;
    }
}

EsperBolt:

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import com.espertech.esper.client.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.List;
import java.util.Map;

public class ProcessLineEsperBolt extends BaseRichBolt {

    private static final long serialVersionUID = 1L;
    private static final Log log = LogFactory.getLog(ProcessLineEsperBolt.class);

    private OutputCollector collector;
    private EPServiceProvider epService;

    @SuppressWarnings("rawtypes")
    @Override
    public void prepare(Map stormConf, TopologyContext context,
                        OutputCollector collector) {
        this.collector = collector;
        this.setUpEsper();
    }

    private void setUpEsper() {

        //Register the BeanClass as an EventType 
        Configuration configuration = new Configuration();
        configuration.addEventType("EsperTestBean", EsperTestBean.class.getName());

        epService = EPServiceProviderManager.getDefaultProvider(configuration);
        epService.initialize();

        //Processing events in batch of every 1 minute using bean class fields
        EPStatement visitorsStatement = epService.getEPAdministrator().
                createEPL("select  line as found from EsperTestBean.win:time(1 min) output snapshot every 1 minute");
        visitorsStatement.addListener(new UpdateListener() {

           // Listener that provides the new events and old events array.
            @Override
            public void update(EventBean[] newEvents, EventBean[] oldEvents) {
                if (newEvents != null) {
                    System.out.println("Batch Length::::::::::::::::::::::" + newEvents.length);
                    for (EventBean e : newEvents) {
                        System.out.println("online ----------------------------------------------line: " + e.get("found"));
                    }
                }
            }

        });
    }

    @Override
    public void execute(Tuple input) {
        List<Object> values = input.getValues();
        epService.getEPRuntime().sendEvent(values.get(0));
        collector.ack(input);
    }

      @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

}

In above mentioned code, we process stream for every 1 minute. Esper provide the different type of event processing method like window, time based and many more.

Hope this blog will help you to implement the event processing using Esper.

For Complete Source Code: https://github.com/IntelliGrape/bigdata-poc/tree/master/storm-batch-processing

FOUND THIS USEFUL? SHARE IT

comments (2)

Leave a comment -