Jul 14

Flume sink plugin for Elasticsearch 5.x

Cognitree has open sourced a Flume sink plugin for Elasticsearch 5.4. The sink plugin is compatible with Flume version 1.7. To avoid dealing with versioning hell for dependencies we highly recommend to use this plugin with Cognitree’s fork of Flume.

Motivation

We were looking to analyze large amounts of streaming data received from a variety of sources. A data pipeline was designed to ingest the data into Kafka, index them into Elasticsearch and then visualize using Kibana. The data was formatted in three different types: JSON, CSV and Avro (structured). There was no need for any transformations on the data and thus we considered Flume 1.7 as the tool to make this transfer.

 

Indexing into elasticsearch requires that the data is in JSON format and the Elasticsearch sink plugin available out of the box was out-dated. There were also too many conflicts in versions of dependencies between Flume and the latest Elasticsearch java client. Thus the need to write a custom flume sink plugin.

The Sink Plugin

The flume sink plugin uses Elasticsearch BulkProcessor to achieve high throughput and also to avoid overwhelming the Elasticsearch with a lot of concurrent requests. The bulk processor can be configured as:

agent.sinks.es_sink.es.bulkActions = 1000

agent.sinks.es_sink.es.concurrent.request = 1

agent.sinks.es_sink.es.flush.interval.time = 10s

agent.sinks.es_sink.es.bulkSize = 5

agent.sinks.es_sink.es.bulkSize.unit = MB

Serializers

Since different data sources produced records in different formats, we wanted a “Serializer” to be plugged in at runtime to convert the records into JSON documents. The plugin provides the following serializers out of the box:

JSON records

The default serializer assumes that the incoming records are in JSON format. There is no need to configure anything.

CSV records

To convert CSV records into JSON, we provide an out of the box CsvSerializer. The conversion of each field in the record to JSON can be configured using a property “es.serializer.csv.fields” in the form {field1}:{type},{field2}:{type} where type is one of boolean, string, int or float.

agent.sinks.es_sink.es.serializer = com.cognitree.flume.sink.elasticsearch.CsvSerializer

agent.sinks.es_sink.es.serializer.csv.fields = name:string,age:int,eligible:boolean,salary:float

Avro records

For use cases where there are well structured records in Avro format we provide an AvroSerializer. The serializer uses the Avro schema to decode the records and convert it into JSON.

agent.sinks.es_sink.es.serializer = com.cognitree.flume.sink.elasticsearch.AvroSerializer

agent.sinks.es_sink.es.serializer.avro.schema.file = /home/user/my_avro.schema

Downloads

The binaries can be downloaded from our GitHub repository.

2 Comments

  1. nuaabuaa07
    March 30, 2018 at 8:29 am · Reply

    Iuse your jars and start my flume but got err log as below. Is where any configuration I missed?
    30 Mar 2018 01:33:13,028 INFO [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:183) – Starting Source r1
    30 Mar 2018 01:33:13,028 INFO [lifecycleSupervisor-1-0] (org.apache.flume.source.ExecSource.start:168) – Exec source starting with command:tail -F /home/jinzhiliang/testdata/syslog.log
    30 Mar 2018 01:33:13,032 INFO [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.register:119) – Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
    30 Mar 2018 01:33:13,032 INFO [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:95) – Component type: SOURCE, name: r1 started
    30 Mar 2018 01:33:16,123 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.cognitree.flume.sink.elasticsearch.ElasticSearchSink.process:115) – transaction rolled back.
    java.lang.NullPointerException
    at com.cognitree.flume.sink.elasticsearch.ElasticSearchSink.process(ElasticSearchSink.java:88)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
    at java.lang.Thread.run(Thread.java:748)
    30 Mar 2018 01:33:17,146 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.cognitree.flume.sink.elasticsearch.ElasticSearchSink.process:115) – transaction rolled back.
    java.lang.NullPointerException
    at com.cognitree.flume.sink.elasticsearch.ElasticSearchSink.process(ElasticSearchSink.java:88)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
    at java.lang.Thread.run(Thread.java:748)
    30 Mar 2018 01:33:19,147 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.cognitree.flume.sink.elasticsearch.ElasticSearchSink.process:115) – transaction rolled back.
    java.lang.NullPointerException
    at com.cognitree.flume.sink.elasticsearch.ElasticSearchSink.process(ElasticSearchSink.java:88)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
    at java.lang.Thread.run(Thread.java:748)

    • Cognitree Technologies
      March 30, 2018 at 10:32 am · Reply

      Hi nuaabuaa07,

      By default we use

      com.cognitree.flume.sink.elasticsearch.StaticIndexBuilder

      as the index builder.
      It appears that you have configured to use an alternate index builder which the plugin is not able to instantiate.

      Check the following:
      1. The value of “es.index.builder” is a fully qualified class name
      2. The index builder is in class path (copy to lib of plugin)
      3. There are no errors/exceptions in initialising an instance of the specified index builder.

Leave a reply

Your email address will not be published. Required fields are marked *