Jul 14

Flume sink plugin for Elasticsearch 5.x

Cognitree has open sourced a Flume sink plugin for Elasticsearch 5.4. The sink plugin is compatible with Flume version 1.7. To avoid dealing with versioning hell for dependencies we highly recommend to use this plugin with Cognitree’s fork of Flume.


We were looking to analyze large amounts of streaming data received from a variety of sources. A data pipeline was designed to ingest the data into Kafka, index them into Elasticsearch and then visualize using Kibana. The data was formatted in three different types: JSON, CSV and Avro (structured). There was no need for any transformations on the data and thus we considered Flume 1.7 as the tool to make this transfer.


Indexing into elasticsearch requires that the data is in JSON format and the Elasticsearch sink plugin available out of the box was out-dated. There were also too many conflicts in versions of dependencies between Flume and the latest Elasticsearch java client. Thus the need to write a custom flume sink plugin.

The Sink Plugin

The flume sink plugin uses Elasticsearch BulkProcessor to achieve high throughput and also to avoid overwhelming the Elasticsearch with a lot of concurrent requests. The bulk processor can be configured as:

agent.sinks.es_sink.es.bulkActions = 1000

agent.sinks.es_sink.es.concurrent.request = 1

agent.sinks.es_sink.es.flush.interval.time = 10s

agent.sinks.es_sink.es.bulkSize = 5

agent.sinks.es_sink.es.bulkSize.unit = MB


Since different data sources produced records in different formats, we wanted a “Serializer” to be plugged in at runtime to convert the records into JSON documents. The plugin provides the following serializers out of the box:

JSON records

The default serializer assumes that the incoming records are in JSON format. There is no need to configure anything.

CSV records

To convert CSV records into JSON, we provide an out of the box CsvSerializer. The conversion of each field in the record to JSON can be configured using a property “es.serializer.csv.fields” in the form {field1}:{type},{field2}:{type} where type is one of boolean, string, int or float.

agent.sinks.es_sink.es.serializer = com.cognitree.flume.sink.elasticsearch.CsvSerializer

agent.sinks.es_sink.es.serializer.csv.fields = name:string,age:int,eligible:boolean,salary:float

Avro records

For use cases where there are well structured records in Avro format we provide an AvroSerializer. The serializer uses the Avro schema to decode the records and convert it into JSON.

agent.sinks.es_sink.es.serializer = com.cognitree.flume.sink.elasticsearch.AvroSerializer

agent.sinks.es_sink.es.serializer.avro.schema.file = /home/user/my_avro.schema


The binaries can be downloaded from our GitHub repository.

Leave a reply

Your email address will not be published.