Zohar Arad. © 2015
At first glance, using ES for analytics seems a bit odd.
However, ES has a very compelling feature, already used in its sister product Kibana.
It's called Aggregations.
{"vid":"a443cdd","ts":1422557501013,"action":"click","_type":"interaction"}
{"vid":"b423vdd","ts":1422567508017,"action":"search","_type":"interaction"}
{"vid":"ts63crw","ts":1422577503019,"action":"comment","_type":"interaction"}
We're going to store that data in the events index (similar to DB), with the type interaction (similar to table - has a schema called Mapping)
"query":{
"filtered":{
"query": {"term":{"action":"click"}},
"filter":{
"range":{
"ts":{ "gte": 1422557501013, "lte": 1422567501013 }
}
}
}
}
...
},
"aggs":{
"num_of_clicks":{
"value_count": {"field": "action"}
}
}
"query":{
"filtered":{
"query": {"match_all":{}},
"filter":{
"range":{ "ts":{ "gte": 1422557501013, "lte": 1422567501013 } }
}}},
"aggs":{
"num_of_clicks":{"value_count": {"field": "action"}},
"num_of_searches":{"value_count": {"field": "search"}}
}
...
},
"aggs":{
"visitor":{
"terms": {"field": "vid"},
"aggs": {
"num_of_clicks":{"value_count": {"field": "action"}},
"num_of_searches":{"value_count": {"field": "search"}}
}
}
}
Mastering ES aggregations is not too complicated, once you get used to the query API and docs.
Remember our interaction document? Up until now, we naively counted the number of documents for our aggregation results.
Imagine we have 500M documents in our poor little index, with 100K added every hour.
Suddenly, our hourly aggregation needs to go over 100K documents in the N shards we assigned to the index.
{"vid":"a443cdd","ts":"2015-01-23T06:00:00.000Z,
"clicks":1.0,"searches":3.0,"_type":"interaction"}
{"vid":"b423vdd","ts":"2015-01-23T06:00:00.000Z,
"clicks":1.0,"searches":2.0,"_type":"interaction"}
{"vid":"ts63crw","ts":"2015-01-23T06:00:00.000Z,
"clicks":0.0,"searches":2.0,"_type":"interaction"}
// bulk API - row header
{"update":{"_index":"events-2015-01-23","_type":"interaction",
"_id":"fd35b9758e1880cce370b2d55c471517"}}
// bulk API - row update
{"doc":{"timestamp":"2015-01-23T06:00:00.000Z","vid":"ts63crw"}
, "doc_as_upsert" : true}
{"update":{"_index":"events-2015-01-23","_type":"interaction",
"_id":"fd35b9758e1880cce370b2d55c471517"}}
{"script":"if (!ctx._source.containsKey(\"clicks\")){
ctx._source.clicks = 0.0}"
}
{"update":{"_index":"events-2015-01-23","_type":"interaction",
"_id":"fd35b9758e1880cce370b2d55c471517"}}
{"script":"ctx._source.clicks += count","params":{"count":1.00},
"upsert":{"clicks":1}}
...
},
"aggs":{
"visitor":{
"terms": {"field": "vid"},
"aggs": {
"num_of_clicks":{"sum": {"field": "action"}},
"num_of_searches":{"sum": {"field": "search"}}
}
}
}
So, using counters, we reduced the number of documents in our index, so our aggregations are more efficient.
But... We can improve even further and create a daily index with aliasing, so each day's events are kept in a separate index.
In fact, this is how Logstash feeds data for Kibana in the ELK framework.
curl -XPUT "localhost:9200/_template/events" -d '{
"template": "events-*",
"order": 1,
"settings": {
"number_of_shards": 1, "number_of_replicas": 1
},
"aliases": {
"interactions": {
"filter" : { "type" : { "value" : "interaction" } }
}
}
}'