Apart from the pre-built functions available for data analysis, Spark enables developers to write custom user defined functions that can be applied on a single row, a group of rows or a window of rows to analyse data.
In this blog, we will explore in detail how we implemented a user defined aggregate function to solve a use case.
Background
At Cognitree, we were building a model to detect anomalies in communications between two IP endpoints by analysing patterns based on the payloads transferred.
The idea is to find the dispersion of ASCII characters in the payload for each transfer between a given pair of endpoints and then build a machine learning model to detect any deviations in this pattern.
Once the model is built, it is used to detect any anomalous payload transfers between the endpoints in real time.
Now in the training phase, for a given dataset, there are several such groups of IP endpoints. Each group has multiple rows, each representing the packet exchanged between them.
Each row in the input data set contains meta information about the endpoints like the IP and port for both the source and destination endpoints. The row also contains the dispersion results as an array of 256 elements. Each element in the array corresponds to the dispersion of the ASCII character (0-255).
We now need to calculate the mean and variance for each ASCII character for the given group that can be compared against in real time to detect an anomalous payload transfer.
Example representation of dataset with two IP source-destination endpoints.
[table id=1 /]
Now in this example, there are two groups of IP endpoints and each group has two rows of dispersion results.
IP endpoint group 1 – 10.10.1.2, 80, 20.4.2.42, 83743
IP endpoint group 2 – 11.21.15.3, 82320, 21.3.9.3, 443
User defined aggregate function
The problem is to find the mean and variance for each ASCII character for the two rows in the same group and output them as tuples.
In Spark, there are in-built aggregate functions. These functions can be applied on a group to calculate mean and variance for a single data point or even sum of all the elements in the array, but not for each element in the index across the rows in the same group.
After exploring options, we decided to write a user defined aggregate function in Spark. Spark provides a pluggable user defined aggregate function (UDAF) API to allow users to write a custom aggregate function which takes multiple rows of data and returns a single value.
A user defined aggregate function is applied on groupBy() clause. The function provides a mutable aggregate buffer to store data during the aggregation. Note that the buffer is isolated across groups, i.e. one buffer per group.
Solution
Let’s explore the user defined aggregate function (UDAF) API implementation to solve the above use-case.
Our implementation of UDAF has an intermediate object “MeanVarianceCaclulator” to capture the individual element values and incrementally calculate mean and variance for a given group.
UDAF API has three struct type members along with a boolean
- inputSchema – input schema struct type. In this case, an array of integers.
- bufferSchema – the schema struct type for the intermediate aggregation buffer. In this case, it is an array buffer of “MeanVarianceCaclulator” that captures the individual index values and calculates the mean and variance.
- dataType – the output data type. In this case, a 256 element array with each element as a tuple representing the mean and variance.
- deterministic – a boolean represents if the function is deterministic, given the same input, always returns the same output. In this case “true”.
UDAF API has four methods to implement
- initialize() – initialize the mutable aggregate buffer. An array of 256 “MeanVarianceCaclulator” objects.
[code language=”scala”]
/**
* Initializes the given aggregation buffer, i.e. the zero value of the aggregation buffer.
*
* The contract should be that applying the merge function on two initial buffers should just
* return the initial buffer itself, i.e.
* `merge(initialBuffer, initialBuffer)` should equal `initialBuffer`.
*
* @since 1.5.0
*/
def initialize(buffer: MutableAggregationBuffer): Unit
[/code] - update() – invoked for each row in the group, the update function provides the mutable aggregation buffer along with the row as arguments. Here, update the index value of the corresponding MeanVarianceCaclulator object.
[code language=”scala”]
/**
* Updates the given aggregation buffer `buffer` with new input data from `input`.
*
* This is called once per input row.
*
* @since 1.5.0
*/
def update(buffer: MutableAggregationBuffer, input: Row): Unit
[/code] - merge() – invoked depending on the shards of data, provides two mutable aggregation buffers that need to be merged and returned as a single aggregation buffer. Merge any two given MeanVarianceCaclulator array.
[code language=”scala”]
/**
* Merges two aggregation buffers and stores the updated buffer values back to `buffer1`.
*
* This is called when we merge two partially aggregated data together.
*
* @since 1.5.0
*/
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit
[/code] - evaluate() – invoked once per group at the end with the merged mutable buffer to evaluate. Returns an array of tuples, with each tuple representing mean and variance for each ASCII character .
[code language=”scala”]
/**
* Calculates the final result of this [[UserDefinedAggregateFunction]] based on the given
* aggregation buffer.
*
* @since 1.5.0
*/
def evaluate(buffer: Row): Any
[/code]
This concludes our implementation of UDAF API to solve custom aggregation use case using Spark.
In the upcoming blogs, we will discuss limitations with UDAF and other alternatives available for custom aggregations in Spark.
We would love to hear your UDAF use cases and feedback. Please drop us a note at eng@cognitree.com.