Introducing WSO2 Analytics Platform: Note for Architects

WSO2 have had several analytics products: WSO2 BAM and WSO2 CEP for some time (or Big Data products if you prefer the term). We are added WSO2 Machine Learner, a product to create, evaluate, and deploy predictive models and renamed WSO2 BAM to WSO2 DAS ( Data Analytics Server). This post describes how all those fit within to a single story.
Following Picture summarises what you can do with the platform.
AnalyticsPlatfromQ3
Let’s look at each stage depicted the picture in detail.

Stage 1: Collecting Data

There are two things for you to do.
Define Streams – Just like you create tables before you put data into a database, first you define streams before sending events. Streams are description of how your data look like (Schema). You will use the same Streams to write queries at the second stage. You do this via CEP or BAM’s admin console (https://host:9443/carbon) or via Sensor API described in the next step.
Publish Event – Now you can publish events. We provide a one Sensor API to publish events for both batch and realtime pipelines. Sensor API available as Java clients (Thrift, JMS, Kafka), java script clients* ( Web Socket and REST) and 100s of connectors via WSO2 ESB. See “How to Publish Your own Events (Data) to WSO2 Analytics Platform (BAM, CEP) ” for details on how to write your own data publisher.

Stage 2: Analyse Data

Now time to analyse the data. We support four forms of analytics: interactive, batch, realtime, and predictive.

Interactive Analytics

While defining data streams, you can define indexes. Then, you can issue interactive queries against the data. That is you can type in queries from a console, and you will get results right away. This is a great way to get a feel for the dataset. Under the hood, we use Lucene to support interactive queries.

Batch and Realtime Analytics

For both batch and realtimeprocessing you can write SQL like queries. For batch queries, we support HIVE SQL and for realtime queries we support Siddhi Event Query Language.
Example 1: Realtime Query
Usecase: e.g. Calculate Average Temperature over 1 minute sliding window from the Temperature Stream)
    from TemperatureStream#window.time(1 min)
    select roomNo, avg(temp) as avgTemp
    insert into HotRoomsStream ;
Example 2: Batch Query
Usecase: e.g. Calculate Average Temperature per each hour from the Temperature Stream
insert overwrite table TemperatureHistory
  select hour, average(t) as avgT, buildingId
  from TemperatureStream group by buildingId, getHour(ts);

Predictive Analytics

Predictive analytics let us learn “logic” from examples where such logic is complex. For example, we can build “a model” to find fraudulent transactions. To that end, we can use machine learning algorithms to train the model with historical data about Fraudulent and non-fraudulent transactions.
 
WSO2 Analytics platform supports predictive analytics in multiple forms.
  1. Use WSO2 Machine Learner ( 2015 Q2) Wizard to build Machine Learning models, and we can use them with your Business Logic. For example, WSO2 CEP, BAM and ESB would support running those models.
  2. R is a widely used language for statistical computing, and we can build model using R, export them as PMML ( a XML description of Machine Learning Models), and use the model within WSO2 CEP. Also you can directly call R Scripts from CEP queries
  3. WSO2 CEP also includes several streaming Regression and Anomaly Detection Operators

Stage 3: Communicate the Results

OK now we have some results, and we communicate those results to users or systems that cares for these results. That communications can be done in three forms.

  • Alerts detects special conditions and cover the last mile to notify the users ( e.g. Email, SMS, and Push notifications to a Mobile App, Pager, Trigger physical Alarm ). This can be easily done with CEP.
  • Visualising data via Dashboards provide the “Overall idea” in a glance (e.g. car dashboard). They supports customising and creating user’s own dashboards. Also when there is a special condition, they draw the user’s attention to the condition and enable him to drill down and find details. Upcoming WSO2 BAM and CEP 2015 Q2 releases will have a Wizard to start from your data and build custom visualisation with the support for drill downs as well.
  • APIs expose Data as to users external to the organisational boundary, which are often used by mobile phones. WSO2 API Manager is one of the leading API solutions, and you can use it to expose your data as APIs. In the later releases, we are planning to add support to expose data as APIs via a Wizard.

Why choose WSO2 Analytics Platform?

  • Reason 1: One Platform for both Realtime, Batch, and Combined Processing – with Single API for publish events, and with support to implement combined usecases like following
    • Run the similar query in batch pipeline and realtime pipeline ( a.k.a Lambda Architecture)
    • Train a Machine Learning model (e.g. Fraud Detection Model) in the batch pipeline, and use it in the realtime pipeline (usecases: Fraud Detections, Segmentation, Predict next value, Predict Churn)
    • Detect conditions in the realtime pipeline, but switch to detail analysis using the data stored in the batch pipeline (e.g. Fraud, giving deals to customers in a e-commerce site)
  • Reason 2: Performance – WSO2 CEP can process 100K+ events per second and one of the fastest realtime processing engines around. WSO2 CEP was a Finalist for DEBS Grand Challenge 2014 where it processed 0.8 Million events per second with 4 nodes.
  • Reason 3: Scalable Realtime Pipeline with support for running SQL like CEP Queries Running on top of Storm. – Users can provide queries using SQL like Siddhi Event Query Language. SQL like query language provides higher level operators to build complex realtime queries. See the post “SQL-like Query Language for Real-time Streaming Analytics” for more details.
    For batch processing, we use Apache Spark ( 2015 Q2 release forward), and for realtime processing, users can run those queries in one of the two modes.
    Run those queries using a two CEP nodes, one nodes as the HA backup for the other. Since WSO2 CEP can process in excess of hundred thousand events per second, this choice is sufficient for many usecases.
    Partition the queries and streams, build an Apache Storm topology running CEP nodes as Storm Sprouts, and run it on top of Apache Storm. Please see the slide deck “Scalable Realtime Analytics with declarative SQL like Complex Event Processing Scripts“. This enables users to do complex queries as supported by Complex Event Processing, but still scale the computations for large data streams.
  • Reason 4: Support for Predictive analytics support building Machine learning models, comparing them and selecting the best model, and using them within real life distributed deployments.

Almost forgot, all these are opensource under Apache Licence. Most design decisions are discussed publicly at architecture@wso2.org. For more details, please refer to the wso2con Europe for more details. ( slides).

If you find this interesting, please try it out.

You can find the packs from http://wso2.com/analytics.  Please reach out to me or through http://wso2.com/contact/ if you want to know more information.

Also see

How to Publish Your own Events (Data) to WSO2 Analytics Platform (BAM, CEP)

We collect data via a Sensor API (a.k.a. agents), send them to servers: WSO2 CEP and WSO2 BAM, process them, and do something with the results. You can find more information about the big picture from the slide deck http://www.slideshare.net/hemapani/introduction-to-large-scale-data-analysis-with-wso2-analytics-platform.  This post describes how you can collect data.

We provide a one Sensor API to publish events for both batch and realtime pipelines. The Sensor API is available as Java clients (Thrift, JMS, Kafka), java script clients* ( Web Socket and REST) and 100s of connectors via WSO2 ESB. Let’s see how we can use the java thrift client to publish events.

  1. First of all, you need CEP or BAM running. Download, unzip, and run WSO2 CEP or WSO2 BAM (via bin/wso2server.sh).
  2. Now, let’s write a client. Add the jars given in Appendix A or add POM Dependancies given in Appendix B to your Maven POM file to setup the classpath.
  3. The Java client would look like following. 
  4. Just like you create tables before you put data into a database, first you define streams before sending events to WSO2 Analytic Platform. Streams are a description of how your data look like (a.k.a. Schema). Then you can publish events. In the code, the “Event Data” is an array of objects, and it must match the types and parameters given in the event stream definition.
You can find an example client from samples/producers/pizza-shop from WSO2 CEP distribution. 

Appendix

Appendix A: Dependency Jars
You can find the jars from the location ${cep.home}/repository/components/plugins/ of CEP or BAM pack.
  1. org.wso2.carbon.logging_4.2.0.jar
  2. commons-pool_1.5.6.*.jar
  3. httpclient_4.2.5.*.jar
  4. httpcore_4.3.0.*.jar
  5. commons-httpclient_3.1.0.*.jar
  6. commons-codec_1.4.0.*.jar
  7. slf4j.log4j*.jar
  8. slf4j.api_*.jar
  9. axis2_1.6.1.*.jar
  10. axiom_1.2.11.*.jar
  11. wsdl4j_1.6.2.*.jar
  12. XmlSchema_1.4.7.*.jar
  13. neethi_*.jar
  14. org.wso2.securevault_*.jar
  15. org.wso2.carbon.databridge.agent.thrift_*.jar
  16. org.wso2.carbon.databridge.commons.thrift_*.jar
  17. org.wso2.carbon.databridge.commons_*.jar
  18. com.google.gson_*.jar
  19. libthrift_*.jar
Appendix B: Maven POM Dependancies 
 Add the following WSO2 nexus repo and dependencies to pom.xml at corresponding sections.
<repository>
<id>wso2-nexus</id>
<url>http://maven.wso2.org/nexus/content/groups/wso2-public/</url>
</repository>
<dependency>
    <groupId>org.wso2.carbon</groupId>
    <artifactId>org.wso2.carbon.databridge.agent.thrift</artifactId>
    <version>4.2.0</version>
</dependency>
<dependency>
    <groupId>org.wso2.carbon</groupId>
    <artifactId>org.wso2.carbon.databridge.commons.thrift</artifactId>
    <version>4.2.0</version>
</dependency>

Embedding WSO2 Siddhi from Java

Update: For CEP 4.0 and later, API has changed. You can find new sample (Siddhi 4.0) from https://github.com/wso2/siddhi/blob/master/modules/siddhi-samples/quick-start-samples/. Look at simple filter sample. Look for pom file for dependencies.

Siddhi is the CEP Engine that powers WSO2 CEP. WSO2 CEP is the server, that can accepts messages over the network via long list of protocols  such as Thrift, HTTP/JSON, JMS, Kafka, and Web Socket.

Siddhi, in contrast, is a java Library. That means you can use it from a Java class, or a java main method. I personally do this to debug CEP queries before putting them into WSO2 CEP. Following Describes how to do it. However, you can embedded it and create your own apps.

First, add following jars into class path. ( You can find them from WSO2 CEP pack, http://wso2.com/products/complex-event-processor/ and from http://mvnrepository.com/artifact/log4j/log4j/1.2.14 ). The Jar versions might change with new packs, but what ever in the same CEP pack will work.

  1. siddhi-api-2.1.0-wso2v1.jar (from CEP_PACK/repository/components/plugins/)
  2. antlr-runtime-3.4.jar (from CEP_PACK/repository/components/plugins/)
  3. log4j-1.2.14.jar ( download from http://mvnrepository.com/artifact/log4j/log4j/1.2.14)
  4. siddhi-query-2.1.0-wso2v1.jar (from CEP_PACK/repository/components/plugins/)
  5. siddhi-core-2.1.0-wso2v1.jar (from CEP_PACK/repository/components/plugins/)

Now you can use Siddhi using the following code. You define a Siddhi engine, add queries, register callbacks to receive results, and send events.

SiddhiManager siddhiManager = new SiddhiManager();
//define stream
siddhiManager.defineStream(&quot;define stream
StockQuoteStream (symbol string,
value double, time long, count long); &quot;);
//add CEP queries
siddhiManager.addQuery(&quot;from StockQuoteStream[value&amp;gt;20]
insert into HighValueQuotes;&quot;);
//add Callbacks to see results
siddhiManager.addCallback(&quot;HighValueQuotes&quot;,
new StreamCallback() {
public void receive(Event[] events) {
EventPrinter.print(events);
}
});
//send events in to Siddhi
InputHandler inputHandler =
siddhiManager.getInputHandler(&quot;StockQuoteStream&quot;);
inputHandler.send(new Object[]{&quot;IBM&quot;, 34.0,
System.currentTimeMillis(), 10});

Here events you sent in must agree with the event streams you have defined. For example, StockQuoteStream must have a string, double, long, and a long as per event stream definition.

See my earlier blog for example of more queries.

Please see  [1] and [2] for more information about the Siddhi query language. If you create a complicated query, you can check intermediate results by adding callbacks to intermediate streams.

Enjoy! reach us via wso2 tag at stackoverflow if you have any questions or send a mail to dev@wso2.org.

  1. http://www.slideshare.net/suho/wso2-complex-event-processor-wso2-coneu2014
  2. https://docs.wso2.com/display/CEP310/Siddhi+Language+Specification