Real-Time Analytics with Elastic Search

Zohar Arad. © 2015

About Me

Elastic Search - Quick Recap

Analytics with a Search Engine?

At first glance, using ES for analytics seems a bit odd.

However, ES has a very compelling feature, already used in its sister product Kibana.

It's called Aggregations.

Let's assume we have some data


{"vid":"a443cdd","ts":1422557501013,"action":"click","_type":"interaction"}
{"vid":"b423vdd","ts":1422567508017,"action":"search","_type":"interaction"}
{"vid":"ts63crw","ts":1422577503019,"action":"comment","_type":"interaction"}

We're going to store that data in the events index (similar to DB), with the type interaction (similar to table - has a schema called Mapping)

Some interesting queries we might have

The ES Aggregation Approach

  1. Query an index and a document type
  2. Filter results to match your conditions
  3. Grab result-set and aggregate into a bucket
  4. Rinse and repeat - sub-aggregations are supported

How many clicks in the past hour?

"query":{
  "filtered":{
    "query": {"term":{"action":"click"}},
    "filter":{
      "range":{
        "ts":{ "gte": 1422557501013, "lte": 1422567501013 }
      }
    }
  }
}

How many clicks in the past hour?

...
},
"aggs":{
  "num_of_clicks":{
    "value_count": {"field": "action"}
  }
}

How many unique actions in the past hour?

"query":{
  "filtered":{
    "query": {"match_all":{}},
    "filter":{
      "range":{ "ts":{ "gte": 1422557501013, "lte": 1422567501013 } }
    }}},
"aggs":{
  "num_of_clicks":{"value_count": {"field": "action"}},
  "num_of_searches":{"value_count": {"field": "search"}}
}

Or, we can group by visitor ID

...
},
"aggs":{
  "visitor":{
    "terms": {"field": "vid"},
    "aggs": {
      "num_of_clicks":{"value_count": {"field": "action"}},
      "num_of_searches":{"value_count": {"field": "search"}}
    }
  }
}

Some other aggregation types

Oh Goodie!
I want to start using ES

Not so fast,
Sparky

Aggregations are only half of the story

Mastering ES aggregations is not too complicated, once you get used to the query API and docs.

Remember our interaction document? Up until now, we naively counted the number of documents for our aggregation results.

What happens when we grow?

Imagine we have 500M documents in our poor little index, with 100K added every hour.

Suddenly, our hourly aggregation needs to go over 100K documents in the N shards we assigned to the index.

Designing for growth

{"vid":"a443cdd","ts":"2015-01-23T06:00:00.000Z,
  "clicks":1.0,"searches":3.0,"_type":"interaction"}

{"vid":"b423vdd","ts":"2015-01-23T06:00:00.000Z,
  "clicks":1.0,"searches":2.0,"_type":"interaction"}

{"vid":"ts63crw","ts":"2015-01-23T06:00:00.000Z,
  "clicks":0.0,"searches":2.0,"_type":"interaction"}

What just happened?

Let's see how we can upsert data into our

index using ES's bulk API

Upsert document with timestamp and ID

// bulk API - row header
{"update":{"_index":"events-2015-01-23","_type":"interaction",
  "_id":"fd35b9758e1880cce370b2d55c471517"}}

// bulk API - row update
{"doc":{"timestamp":"2015-01-23T06:00:00.000Z","vid":"ts63crw"}
  , "doc_as_upsert" : true}

Add "clicks" counter

{"update":{"_index":"events-2015-01-23","_type":"interaction",
  "_id":"fd35b9758e1880cce370b2d55c471517"}}

{"script":"if (!ctx._source.containsKey(\"clicks\")){
  ctx._source.clicks = 0.0}"
}

Increment "clicks" counter

{"update":{"_index":"events-2015-01-23","_type":"interaction",
  "_id":"fd35b9758e1880cce370b2d55c471517"}}

{"script":"ctx._source.clicks += count","params":{"count":1.00},
  "upsert":{"clicks":1}}
      

Let's take it slowly

Summing it up

...
},
"aggs":{
  "visitor":{
    "terms": {"field": "vid"},
      "aggs": {
      "num_of_clicks":{"sum": {"field": "action"}},
      "num_of_searches":{"sum": {"field": "search"}}
    }
  }
}

Index Management

So, using counters, we reduced the number of documents in our index, so our aggregations are more efficient.

But... We can improve even further and create a daily index with aliasing, so each day's events are kept in a separate index.

In fact, this is how Logstash feeds data for Kibana in the ELK framework.

Daily Indices

curl -XPUT "localhost:9200/_template/events" -d '{
  "template": "events-*",
    "order": 1,
    "settings": {
      "number_of_shards": 1, "number_of_replicas": 1
    },
    "aliases": {
      "interactions": {
        "filter" : { "type" : { "value" : "interaction" } }
      }
    }
}'

Wrapping Things Up

Wrapping Things Up

Thank You!
Questions?