In this blog post, we’d like to outline how we defined policies for time series data management in Elasticsearch.
Background
As part of an IoT security solution built for a startup client, we have a typical real-time data processing pipeline: data from sensors is received into Kafka topics and consumed by a Spark streaming application which performs a chain of ETL operations before persisting them onto Elasticsearch.
The solution is multi-tenant and the data is segregated by tenants both in motion and at rest. Over a day, each tenant could possibly stream millions of events resulting in GBs of indexed data. The data is then replicated into HDFS on a daily basis (more on that in subsequent posts) for machine learning and batch processing workloads.
In this post, we will limit our focus on data management in Elasticsearch and not delve into other operational aspects like security, backup and restore, etc.
We will look at three data management aspects:
- Index Creation
- Index Mapping
- Index Management
Index
Elasticsearch is used as a time series store of all the sensor events. We model the index as time-based indices by naming indices in data_{tenant id}_{yyyyMMdd} format as compared to one large monolithic index. The advantage of this approach is that deleting older data which is no longer relevant is as simple as dropping an index. It also helps in restricting searches to certain time ranges using patterns (as in multi-index queries) to improve overall query performance and results in lesser load.
Index Creation
The time-based indices are automatically created. You can read more about automatic index creation here. The index patterns are whitelisted to restrict the auto-creation to the indices following the pattern.
[code language=”css”]
action.auto_create_index: data_*
[/code]
The Spark streaming application would continuously add the incoming documents into Elasticsearch after assigning an index name to each of them. If the index did not exist, it would get automatically created by Elasticsearch.
Index Mapping
We wanted to control the field mappings but also leverage dynamic mappings functionality. In particular, we wanted to disable textual analysis on string fields which is turned on by default. This setting results in a significant performance gain both in indexing and search latencies since none of our requirements needed textual analysis. We also wanted to pre-configure mapping for some standard fields like timestamps and numbers.
We used index templates to automatically configure the indices with settings and mappings as they get created. Index templates help in setting up the indices without any external orchestration.
Here’s a sample of the template that we used across all the time-based indices:
[code language=”css”]
{
"order" : 5,
"template" : "data_*",
"settings" : {
"index" : {
"number_of_shards" : "5",
"number_of_replicas" : "1"
}
},
"mappings" : {
"_default_": {
"dynamic_templates": [
{
"strings": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
}
]
}
},
"aliases" : { }
}
[/code]
To pre-configure mapping for certain fields, we used another index template. A sample is as below:
[code language=”css”]
{
"order": 11,
"template": "data_*",
"settings" : { },
"mappings": {
"_default_": {
"properties": {
"timestamp": {
"type": "date",
"format": "yyyy-MM-dd’T’HH:mm:ssZ"
}
}
}
},
"aliases" : { }
}
[/code]
Index Management
As mentioned earlier, Elasticsearch was an intermediate store targeted primarily for real-time data analysis use cases and we needed at most 3 months’ worth of data to be available in this store.
Instead of tracking manual efforts to prune the old indices we decided to automate the purging process using Kronos to schedule Curator tasks. Curator provides a configurable interface to various tasks without having to interact with the REST APIs directly and Kronos helps in scheduling periodic tasks easily. The combination of these tools helped in automating the entire process.
To purge old indices we used the following Curator action configuration:
[code language=”css”]
actions:
1:
action: delete_indices
description:
Delete indices older than 3 months,
options:
ignore_empty_list: True
disable_action: False
filters:
– filtertype: age
source: creation_date
direction: older
unit: months
unit_count: 3
[/code]
Since we are using time-based indices, there are no writes to an older index. Hence we optimize the data in these indices by using forcemerge. The benefits of forcemerge can be seen in the conversation here.
Below is the curator action file example which will force merge indices.
[code language=”css”]
actions:
1:
action: forcemerge
description:
forceMerge reports- prefixed indices older than 60 minutes (based on index
creation_date) to 1 segments per shard.
options:
max_num_segments: 1
delay: 120
timeout_override:
continue_if_exception: False
disable_action: False
filters:
– filtertype: pattern
kind: prefix
value: reports-
exclude:
– filtertype: age
source: creation_date
direction: older
unit: minutes
unit_count: 60
exclude:
[/code]
This concludes our write-up on managing Elasticsearch data to enable efficient storage and high-performance query capability for our solution. In the upcoming blogs, we will talk more about the other operational aspects.
We would love to hear your feedback. Do drop us an email at eng@cognitree.com.