Skip to content
xinglang edited this page Feb 11, 2015 · 1 revision

Metrics calculator is the stage to calculate multi dimensional metrics in realtime and store the metrics to a metric store for query. Metriccalculator receives Pulsar events and session mark events from its previous stage Distributor. The metric store currently we use is Cassandra.

Metrics are aggregated in two stages. We do first level short aggregations in a short time window, for example 10 seconds, we can take this stage as a mapper stage. And then we have second stage to do long window aggregations, like a reducer stage. Metriccalculator configures an Esper Processor, users can use EPLs to configure aggregations rules. A typical aggregation rules like below:

create context MCContext start @now end pattern [timer:interval(10) or EsperEndEvent];                                                                                    
context MCContext
    insert into MC_groupMetric
        select count(*) as count, _cn as groupId, 'pageviewspercountry' as metricName from PulsarEvent(_cn is not null) group by _cn output snapshot when terminated;

It means do aggregations(count event numbers) in a 10 seconds window over a dimension(_cn is country).

@OutputTo("OutboundMessageChannel")
@ClusterAffinityTag(dimension=@CreateDimension(name="groupdimen", dimensionspan="groupId, metricName"))
@PublishOn(topics="Pulsar.FirstMC/firstMcCounter")
select * from MC_groupMetric;

This is the first level aggregation. The result are output to OutboundMessageChannel and flow to Jetstream topic "Pulsar.FirstMC/firstMcCounter" with affinity key based scheduler. The affinity key is the aggregation dimensions plus the metric name. It ensures the counters of the same dimensions are routed to the same node and make them ready for second level aggregations(reduce) in a distributed environment.

The second level aggregation looks like this:

@OutputTo("SummingProcessor")
select * from MC_groupMetric;

The second level aggregation is done in the summing processor. The default window is 1 minute. The long term aggregation window is configurable in EPL. And then finally summing processor will output the reducer result to a metric store, now we use Cassandra, the metrics will be stored to Cassandra metric tables.

The Cassandra metric table looks like this:

CREATE TABLE pulsar.mc_groupmetric (
	metricname text,
	groupid text,
	metrictime timestamp,
	"value" int,
	PRIMARY KEY ((metricname, groupid), metrictime)
) WITH CLUSTERING ORDER BY (metrictime ASC) AND
bloom_filter_fp_chance=0.010000 AND
caching='KEYS_ONLY' AND
comment='' AND
dclocal_read_repair_chance=0.000000 AND
gc_grace_seconds=864000 AND
read_repair_chance=0.100000 AND
replicate_on_write='true' AND
populate_io_cache_on_flush='false' AND
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'SnappyCompressor'};

This table's scheme is very straightforward and easy to understand, the column names map to the elements in the EPL. The row key of the table is 'metricname' + 'groupid', column key is metrictime. Making a query is also very simple.

Now, let's extend the above sample to a more complex and realistic example:

context MCContext
    insert into MC_geodevicegroupMetric
        select count(*) as count, _cn as groupId, _con as tag_con, _cty as tag_cty, _dd_d as tag_device, _dd_os as tag_os, _dd_bf as tag_bf, 'pageviewspergeoanddevice' as metricName, 10 as frequencyInMin from PulsarEvent(_cn is not null and _con is not null and _cty is not null and _dd_d is not null and _dd_os is not null and _dd_bf is not null) group by _cn, _con, _cty, _dd_d, _dd_os, _dd_bf output snapshot when terminated;

@OutputTo("OutboundMessageChannel")
@ClusterAffinityTag(dimension=@CreateDimension(name="groupdimen", dimensionspan="groupId, metricName, tag_con, tag_cty, tag_device, tag_os, tag_bf"))
@PublishOn(topics="Pulsar.FirstMC/firstMcCounter")
select * from MC_geodevicegroupMetric;

The above EPLs do aggregations over 6 dimensions(_cn, _con, _cty, _dd_d, _dd_os, _dd_bf). The first level aggregation is doing in 10 seconds window as defined in MCContext. The second level aggregation is doing in 10 minutes window as defined in EPL itself: '10 as frequencyInMin'. The result are output to OutboundMessageChannel and flow to Jetstream topic "Pulsar.FirstMC/firstMcCounter" again. Affinity key is the combination of all the dimensions we do aggregations over.

The second level aggregation EPL is blow:

@OutputTo("SummingProcessor")
select * from MC_geodevicegroupMetric;

And finally summing processor will reduce the result in 10 minutes window and store result to Cassandra.

The Cassandra metric table for this use case is below:

CREATE TABLE pulsar.mc_geodevicegroupmetric (
	metricname text,
	groupid text,
	metrictime timestamp,
	tag_con text,
	tag_cty text,
	tag_device text,
	tag_os text,
	tag_bf text,
	"value" int,
	PRIMARY KEY ((metricname, groupid), metrictime, tag_con,tag_cty,tag_device,tag_os,tag_bf)
) WITH CLUSTERING ORDER BY (metrictime ASC,tag_con ASC,tag_cty ASC,tag_device ASC,tag_os ASC,tag_bf ASC) AND
bloom_filter_fp_chance=0.010000 AND
caching='KEYS_ONLY' AND
comment='' AND
dclocal_read_repair_chance=0.000000 AND
gc_grace_seconds=864000 AND
read_repair_chance=0.100000 AND
replicate_on_write='true' AND
populate_io_cache_on_flush='false' AND
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'SnappyCompressor'};

The column names map to the elements in the EPL. The row key of this table is 'metricname' + 'groupid', column key is metrictime + other dimensions(tags). We know in Cassandra, row key is the sharding key, how we choose the row key is based on the data. In this example, we define _cn(country) as groupid, but we can also choose other dimensions as row key. We can also just use 'metricname' as the row key and store all the time serials data in a long Cassandra row. But it will lead to a hot spot to the Cassandra cluster.
Choosing a suitable row key and let the metrics sharded evenly in the Cassandra cluster and make it can serve the query requirements easily. The definition of the column key is also decided by the query pattern.

There are advanced configurations to the summing processor to tune the performance, these configurations are optional, whether to enable them or not depends on the data size and cardinality of the aggregation result. The 'metricsThreshold' is similar to the 'having' clause, it can restrict the data size to Cassandra and only when the reducer metrics result value > predefined threshold, metrics will be inserted to Cassandra.

'offheapMetricConf' enables offheap to store the metrics in summing processor. Summing processor internally need to consume memory to store and aggregate on the metrics. If the cardinality of the metrics is very high, for example over 1M, and you want to do aggregations within a long time window, it's recommended to enable offheap configuration.

<bean id="SummingProcessorConfig"
    class="com.ebay.pulsar.metriccalculator.processor.configuration.MCSummingConfiguration">
    <property name="metricsThreshold">
        <map>
            <entry key="pageviewspergeoanddevice" value="1" />
        </map>
    </property>
    <property name="offheapMetricConf">
        <map>
            <entry key="pageviewspergeoanddevice">
                <!-- User default configuration -->
                <bean id="offHeapCacheConfigForPVGEODevice"
                    class="com.ebay.pulsar.metriccalculator.cache.OffHeapCacheConfig"></bean>
            </entry>
        </map>
    </property>
</bean>