Complex event processing (CEP) systems query events on the fly without storing them.
In CEP, we think in terms of event streams. Event stream is a logical sequence of events that become available over time. For example, stock event steam consists of events that notify changes to stock price. Users provide queries to the CEP engine, which implements the CEP logic, and the CEP engine matches those queries against events coming through event streams.
CEP differs from other paradigms like event processing, filtering etc., by its support for temporal queries that reason in terms of temporal concepts like “time windows” and “before and after relationships” etc. For example, a typical CEP query will say that
“If IBM stock value increased by more than 10% within an hour, please notify me”.
Such a CEP query has few defining characteristics.
There are many CEP Engine implementations (see CEP Players list 2012). However, mostly CEP engines run in a large box, scaling up horizontally. Vertically scaling CEP engines is still an open problem. Reminder of this post discusses what vertically scaling CEP engine means and some of the potential answers.
We use the term Scaling to describe the ability for a CEP system to hande larger or complex queries by adding more resources. Scaling CEP has several dimensions.
Let us consider each dimension separately.
This is the easiest of the four since we can use the shared nothing architecture. Here we can run multiple CEP Engine instances (each instance runs in a single host) each running a subset of queries.
For instance, a long running complex query that needs to maintain a large window and all events in the window would need a large working memory. Potential answer to this problem is to use a distributed cache to store the working memory. Reference  describes such a scenario.
We will handle the both scenarios together as both are two sides of the same coin. In both cases, we have trouble fitting a single query into a single host such that it can support the given event rate.
To handle such scenarios, we have to distribute the query across many computers.
We can do this by breaking the query into several steps in a pipeline that matches events against some conditions and republish the matching events to steps further in the pipeline. Then we can deploy different steps of the pipeline into different machines.
For example, lets consider the following query. This query matches if there are two events within 30 seconds from IBM stocks that having price greater than 70 and having prize increase more than 10%.
select a1.symbol, a1.prize, a2.prize from every a1=StockStream[price > 70 symbol =’IBM’] -> a2=StockStream[price > 70 symbol =’IBM’] [a1.price < 1.1*a2.prize][within.time=30]
As shown by the figure, we can break the query into three nodes, and each node will have to republish the matching events to the next node. (Option 1)
|CEP Query as a Pipeline|
However, queries often have other properties that allow further optimization. For example, although the last step of matching prize increase is stateful other two steps are stateless. Stateful operations remember information after processing an event so that earlier events affect the processing of later events while stateless operations only depends on the event being processed.
Therefore, we can add multiple instances in the place of those statless instances using a shared-nothing architecture. For example, we can break the query into five nodes as shown by the bottom part of the picture (Option 2).
Also another favorable fact is that CEP processing generally happens through filtering where amount of events reduce as we progress through the pipeline. Therefore, pushing stateless filter like operations (e.g. matching against symbol =”IBM”) to the first parts of the pipeline and scaling them in shared nothing manner should allow us to scale up the system for much higher event rates. For example, lets say that the StockQuote event stream generates 100,000 events per seconds, but only 5% of them are about IBM. Therefore, only 5000 events will make it past the first filter, which we can handle much easier than 100k events.
However, it is worth noting that above method only works with some queries. For example, if we have a query that has a single stateful operation like window-based pattern, we cannot use this method.
Unfortunately, there is no framework that can do this out of the box (let me know if I am wrong). So if you want to do this, you will have to code it. If you choose to do that, using a pub/sub network or stream processing system might reduce most of the complexities.
Please shared your thoughts!
Note: Above ideas are now supported on top of WSO2 Stream Processor with two nodes for smaller use cases that require HA and with Kafka for larger use cases.
|Characteristic Performance Graph’s of a Server|
Above graphs capture the characteristic behavior of a server. As shown by the graph, server performance is gauged by measuring latency and throughput against latency.
It is worth noting that these two values are often loosely related. However, a we cannot directly derive one measurement from the other.
As shown by the figure, a server has an initial range where throughput increases at a roughly linear rate and latency either remains constant or linear. As concurrency increases, the approximately linear relationship decays, and system performance rapidly degrades. Performance tuning attempts to modify the relationship between concurrency and throughput and/or latency, and maintain a linear relationship as long as possible.
For more details about latency and throughput, read the following online resources:
Unlike static server capacity measurements (e.g. CPU processing speed, memory size), performance is a dynamic measurement. Latency and throughput are strongly influenced by concurrency and work unit size. Larger work unit size usually negatively influence latency and throughput. Concurrency is the number of aggregate work units (e.g. message, business process, transformation, or rule) processed in parallel (e.g. per second). Higher concurrency values have a tendency to increase latency (wait time) and decrease throughput (units processed).
To visualize server performance across the range of possible workloads, we draw a graph of latency or throughput against concurrency or work unit size as shown by the above graph.
Your goal of running a performance test is to draw a graph like above. To do that you have to run the performance test multiple times with different concurrency and for each test measure latency and throughput.
Following are some of the common steps and a checklist.
Distributed Queue is a FIFO data structure that is accessed by entities in a distributed environment. Working of a distributed queue will be as follows.
Distributed Queues provides strict or best effort support for in-order delivery where subscribers receives messages at the same order they have been published. (It is very hard to enforce this across all subscribers, and therefore, often implementations enforce this within each subscriber. For example, if messages m1, m2 .. m100 are published in order, each subscriber will see a subset of messages in ascending order. But there are no guarantee about the global order seen across subscribers).
Scaling is handling larger workload by adding more resources. Workload can be increased in many ways, and we call those different dimensions of scale. There are three main dimensions.
There are many distributed queue implementations in JMS servers like ActiveMQ, HorentMQ etc. Focus of our discussion is that how can they scale up.
There are four choices
Replication in distributed queues is inefficient as delivering messages in-order needs replication of state immediately.
In cluster connections and broker networks, in order message delivery provides a best effort guarantee only. If a subscriber has failed or subscription has been deleted, the broker nodes are force to either drop the message or to redistribute them out of order to the other brokers in the network.
Any of the above modes do not handle scaling for large messages
|Master Slave||Support HA||No Scalability||Qpid, ActiveMQ, RabbitMQ|
|Queue Distribution||Scale to large number of Queues||Does not scale for large number of messages for a queue||RabbitMQ|
|Cluster Connections||Support HA||Might not support in-order delivery Logic runs in the client side takes local decisions.||HorentMQ|
|Broker/Queue Networks||Load balancing and distribution||Fair load balancing is hard||ActiveMQ|
|Authentication and Authorization Choices in WSO2 Platform|
Each server in the WSO2 platform is built using the Carbon platform. We use the term “Carbon server” to denote any Carbon based server like ESB, AS, BPS.
Techniques explained here are applicable across most of the WSO2 products. In the following figure, and the circles with branching out paths shows different options.
As shown by the figure, Carbon server may receive two types of messages: messages with credentials (like passwords), and messages with tokens. When a server receives a message with credentials, the server first authenticates the request and optionally authorizes the action. When the server receives a message with tokens, generally there is no authentication step, and the token is directly validated against permissions and request is either granted or denied.
Authentication needs a User store that holds the information about users and “Enforcement Point” that verifies the credentials against the User store.
Carbon Servers support two user stores.
It is a common deployment pattern for multiple carbon servers in a single deployments to point to the same user store, and this provide a single point to control and manage the users.
We can configure any Carbon server to authenticate any incoming requests. It supports many options like HTTP Basic Authentication over SSL for HTTP, WS-Security User Name Tokens, Web SAML SSO etc. This authentication is done against the users that reside the user store.
Also, each Carbon server has a Web Service called Authentication Admin Web Service, which exposes the authentication as a Web Service to the outside. The client can invoke the Authentication Admin Web Service and get a HTTP Cookie after logging in and reuse the Cookie to do authenticated calls to a Carbon Server.
In Authorization Scenarios, Carbon server receives a request that is generally already authenticated or a request that include a token. In either case, we want to check weather the authenticated user have enough permission to carry out a given action.
Using XACML terminology, we can define three roles in such a scenario. (XACML includes other roles, which we will ignore on this discussion).
Carbon servers support the Policy Enforcement Point (PEP) role using a WSO2 ESB Mediator or Apache Axis2 Handler or through a custom user code.
For Policy Definition Point (PDP), we support three ways to define permissions.
We support policy administration (PAP) through WSO2 Identity Server, which enables users to edit the permission definitions through the management console.
These gives rise to several scenarios
More information out WSO2 Identity Server can be found from http://wso2.com/products/identity-server/ and if there are any missing features, please drop a note to WSO2 architecture List.