Hive Analytic Functions

August 17, 2019

In a world where data is everything, transforming raw data into meaningful insights require the usage of certain techniques and functions. This post would focus on the commonly used SQL analytical functions in Hive and Spark

Introduction

Analytic functions come packed with a lot of features such as computing aggregates such as moving sums, cumulative sums, averages etc. In addition to this, they are used to obtain ranking on the data, eliminating duplicates etc.

This post would cover the basics of analytic functions along with detailed examples for every function. Sample datasets would be used for illustration purposes.

By the end of this post, you should be familiar with,

  • Analytic Functions in Hive/Spark SQL
  • over() clause and why they are used ?
  • how the computation changes with the window definition.
  • Frequently used analytic functions such as row_number, rank, min, max, lead, lag and many more.

Table of Contents

What is an Analytic Function?

Analytic functions or otherwise known as window functions are functions used to compute an aggregate value from a table or a set of rows based on a defined partition window.

These functions usually consist of an over() keyword which helps in defining the window and an optional list of columns for ordering. This optional ordering is used to define criteria for the function evaluation.

Analytic functions are usually used to compute cumulative sums, moving averages, next and previous records etc. Examples of some analytic functions include, lead, lag, sum, min, max etc.

What is the Over clause?

An analytic function consists of an over clause which helps in defining the partitions and the sorting of data within each partition.

This Over clause defines a window or user-specified set of rows within a query result set. Based on the defined window, the aggregate function is calculated.

Example: select max(marks) over(partition by dept order by dept) as max_marks from table

Note: All the SQL queries used in these examples can be executed in both Hive and Spark.

Row_Number

Row Number helps to serve 2 main purposes in SQL,

  • Generate a unique sequence number for all the rows or a group of rows.
  • Helps eliminate duplicate rows

To practically illustrate the above mentioned points, let’s create a simple dataset. Let’s create a spark dataframe and save it in a temporary view.

To explore more on how to create spark dataframes and perform operations on it, click here.

df = spark.createDataFrame([('David',1, 'CS'),('Goliath',2, 'EE'), ('Goliath',2, 'EE'),
                            ('Raven',3, 'CS'), ('David',1, 'CS'), ('Logan',4, 'CS'), 
                            ('Raven',3, 'CS')],['Student_Name', 'Roll_No', 'Dept'])
df.show()
df.createOrReplaceTempView('dataset')
+------------+-------+----+
|Student_Name|Roll_No|Dept|
+------------+-------+----+
|       David|      1|  CS|
|     Goliath|      2|  EE|
|     Goliath|      2|  EE|
|       Raven|      3|  CS|
|       David|      1|  CS|
|       Logan|      4|  CS|
|       Raven|      3|  CS|
+------------+-------+----+

The above dataset can also be created using plain Hive/Spark SQL as shown below,

spark.sql("drop table if exists dataset")
spark.sql("""create table dataset as
            select 'David' as Student_Name, 1 as Roll_No, 'CS' as Dept union all
            select 'Goliath',2, 'EE' union all
            select 'Goliath',2, 'EE' union all
            select 'Raven', 3, 'CS' union all
            select 'David', 1, 'CS' union all
            select 'Logan', 4, 'CS' union all
            select 'Raven', 3, 'CS'""")
DataFrame[]
spark.sql("select * from dataset").show()
+------------+-------+----+
|Student_Name|Roll_No|Dept|
+------------+-------+----+
|       David|      1|  CS|
|     Goliath|      2|  EE|
|     Goliath|      2|  EE|
|       Raven|      3|  CS|
|       David|      1|  CS|
|       Logan|      4|  CS|
|       Raven|      3|  CS|
+------------+-------+----+

Syntax: row_number() over(partition by col(s)/expression(s) | order by col(s)/expression(s))

  • #### Generating a Sequence Number

Row Number function can be used to generate a unique sequence number for each row when no partition by clause is specified in the window.

spark.sql("select * ,row_number() over(order by roll_no) as row_num from dataset").show()
+------------+-------+----+-------+
|Student_Name|Roll_No|Dept|row_num|
+------------+-------+----+-------+
|       David|      1|  CS|      1|
|       David|      1|  CS|      2|
|     Goliath|      2|  EE|      3|
|     Goliath|      2|  EE|      4|
|       Raven|      3|  CS|      5|
|       Raven|      3|  CS|      6|
|       Logan|      4|  CS|      7|
+------------+-------+----+-------+
  • #### Eliminating Duplicates using row_number

As seen in the above dataset, there are some redundant rows. Let’s go ahead and eliminate this redundancy using row_number.

The point to keep in mind here is, row_number will return a 1 for all unique rows. Any number other than 1 is a duplicate row on the specified window.

Here, our partition by clause would be the group or set of columns to be considered to check the duplicates. Let’s proceed with our example.

spark.sql("""select * ,row_number() over(partition by student_name, roll_no, dept order by roll_no desc)
          as row_num from dataset""").show()
+------------+-------+----+-------+
|Student_Name|Roll_No|Dept|row_num|
+------------+-------+----+-------+
|       David|      1|  CS|      1|
|       David|      1|  CS|      2|
|     Goliath|      2|  EE|      1|
|     Goliath|      2|  EE|      2|
|       Raven|      3|  CS|      1|
|       Raven|      3|  CS|      2|
|       Logan|      4|  CS|      1|
+------------+-------+----+-------+

As seen above, the unique rows are present with all 1s. Now, we can filter the rows with row numbers equal to 1.

spark.sql("""select * from (
          select * ,row_number() over(partition by student_name, roll_no, dept order by roll_no desc)
          as row_num from dataset
          )a
          where a.row_num = 1""").show()
+------------+-------+----+-------+
|Student_Name|Roll_No|Dept|row_num|
+------------+-------+----+-------+
|       David|      1|  CS|      1|
|     Goliath|      2|  EE|      1|
|       Raven|      3|  CS|      1|
|       Logan|      4|  CS|      1|
+------------+-------+----+-------+

Rank and Dense Rank

As the name suggests, rank is used to derive a ranking on a group of rows based on a specified column or expression.

Syntax: rank() over(partition by col(s)/expression(s) | order by col(s)/expression(s))

The difference between rank and dense rank is that, the former skips the next ranking number when same rank is assigned to a certain number of rows, whereas dense_rank retains the sequence of rank even in case of same ranks.

The above statment would make more sense, by looking at the below example.

Let’s create a sample dataset for this illustration.

df2 = spark.createDataFrame([('David',1, 'CS', 55, 98),('Goliath',2, 'EE', 32, 94),
                            ('Raven',3, 'CS', 40, 83), ('Logan',4, 'CS', 70, 99), ('Ravi',5, 'EE', 82, 83), 
                            ('Amy',7, 'EC', 63, 98), ('Johnson',8, 'EC', 25, 83), 
                            ('Shivani',12, 'EE', 67, 80)],['Student_Name', 'Roll_No', 'Dept', 'Due_Amount', 'Marks'])
df2.show()
df2.createOrReplaceTempView('dataset_2')
+------------+-------+----+----------+-----+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|
+------------+-------+----+----------+-----+
|       David|      1|  CS|        55|   98|
|     Goliath|      2|  EE|        32|   94|
|       Raven|      3|  CS|        40|   83|
|       Logan|      4|  CS|        70|   99|
|        Ravi|      5|  EE|        82|   83|
|         Amy|      7|  EC|        63|   98|
|     Johnson|      8|  EC|        25|   83|
|     Shivani|     12|  EE|        67|   80|
+------------+-------+----+----------+-----+

Now, let’s generate a rank and dense rank on this dataset,

spark.sql("""select *
            , rank() over(order by marks desc) as Rank
            , dense_rank() over(order by marks desc) as Dense_Rank
            from dataset_2""").show()
+------------+-------+----+----------+-----+----+----------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|Rank|Dense_Rank|
+------------+-------+----+----------+-----+----+----------+
|       Logan|      4|  CS|        70|   99|   1|         1|
|       David|      1|  CS|        55|   98|   2|         2|
|         Amy|      7|  EC|        63|   98|   2|         2|
|     Goliath|      2|  EE|        32|   94|   4|         3|
|       Raven|      3|  CS|        40|   83|   5|         4|
|        Ravi|      5|  EE|        82|   83|   5|         4|
|     Johnson|      8|  EC|        25|   83|   5|         4|
|     Shivani|     12|  EE|        67|   80|   8|         5|
+------------+-------+----+----------+-----+----+----------+

Now, let’s generate the rank and dense_rank of each student within their respective departments.

Here, an additional partition by clause would be introduced to calculate the rank with respect to each department.

spark.sql("""select *
            , rank() over(partition by dept order by marks desc) as Rank
            , dense_rank() over(partition by dept  order by marks desc) as Dense_Rank
            from dataset_2""").show()
+------------+-------+----+----------+-----+----+----------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|Rank|Dense_Rank|
+------------+-------+----+----------+-----+----+----------+
|     Goliath|      2|  EE|        32|   94|   1|         1|
|        Ravi|      5|  EE|        82|   83|   2|         2|
|     Shivani|     12|  EE|        67|   80|   3|         3|
|         Amy|      7|  EC|        63|   98|   1|         1|
|     Johnson|      8|  EC|        25|   83|   2|         2|
|       Logan|      4|  CS|        70|   99|   1|         1|
|       David|      1|  CS|        55|   98|   2|         2|
|       Raven|      3|  CS|        40|   83|   3|         3|
+------------+-------+----+----------+-----+----+----------+

Sum

Sum is used to calculate the sum of a column from all the rows of a table or a defined set of rows

Syntax: sum(column/expression) (partition by col(s)/expression(s) | order by col(s)/expression(s))

Let’s calculate the sum of all the due amounts of each student with respect to each department

spark.sql("select dept, sum(due_amount) as total_due_amount from dataset_2 group by dept").show()
+----+----------------+
|dept|total_due_amount|
+----+----------------+
|  EE|             181|
|  EC|              88|
|  CS|             165|
+----+----------------+

Now, let’s calculate the same sum and display the it on each row. This can be achieved using the partition by clause

spark.sql("""select *, sum(due_amount) over(partition by dept order by dept) as total_due_amount
             from dataset_2""").show()
+------------+-------+----+----------+-----+----------------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|total_due_amount|
+------------+-------+----+----------+-----+----------------+
|     Goliath|      2|  EE|        32|   94|             181|
|        Ravi|      5|  EE|        82|   83|             181|
|     Shivani|     12|  EE|        67|   80|             181|
|         Amy|      7|  EC|        63|   98|              88|
|     Johnson|      8|  EC|        25|   83|              88|
|       David|      1|  CS|        55|   98|             165|
|       Raven|      3|  CS|        40|   83|             165|
|       Logan|      4|  CS|        70|   99|             165|
+------------+-------+----+----------+-----+----------------+

Cumulative Sum

By changing the window definition, we can generate a cumulative sum using the sum function.

spark.sql("""select *, sum(due_amount) over(order by roll_no) as cumulative_due_amount
             from dataset_2""").show()
+------------+-------+----+----------+-----+---------------------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|cumulative_due_amount|
+------------+-------+----+----------+-----+---------------------+
|       David|      1|  CS|        55|   98|                   55|
|     Goliath|      2|  EE|        32|   94|                   87|
|       Raven|      3|  CS|        40|   83|                  127|
|       Logan|      4|  CS|        70|   99|                  197|
|        Ravi|      5|  EE|        82|   83|                  279|
|         Amy|      7|  EC|        63|   98|                  342|
|     Johnson|      8|  EC|        25|   83|                  367|
|     Shivani|     12|  EE|        67|   80|                  434|
+------------+-------+----+----------+-----+---------------------+

Now, let’s generate a cumulative sum of all the marks within each department

spark.sql("""select *, sum(due_amount) over(partition by dept order by roll_no) as cumulative_due_amount
             from dataset_2""").show()
+------------+-------+----+----------+-----+---------------------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|cumulative_due_amount|
+------------+-------+----+----------+-----+---------------------+
|     Goliath|      2|  EE|        32|   94|                   32|
|        Ravi|      5|  EE|        82|   83|                  114|
|     Shivani|     12|  EE|        67|   80|                  181|
|         Amy|      7|  EC|        63|   98|                   63|
|     Johnson|      8|  EC|        25|   83|                   88|
|       David|      1|  CS|        55|   98|                   55|
|       Raven|      3|  CS|        40|   83|                   95|
|       Logan|      4|  CS|        70|   99|                  165|
+------------+-------+----+----------+-----+---------------------+

Helpful Tip: Try changing the order by column from roll no to dept to obtain as cumulative sum with respect to each department. Try similar such combinations to explore more on this.

Average

This function is used to calculate the average of a column from all the rows of a table or a defined set of rows.

Syntax: avg(column/expression) over(partition by col(s)/expression(s) | order by col(s)/expression(s))

Similar to the above example, let’s calculate the average marks with respect to each department

spark.sql("""select *, cast(avg(marks) over(partition by dept order by dept) as decimal(10,3)) as average_marks 
             from dataset_2""").show()
+------------+-------+----+----------+-----+-------------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|average_marks|
+------------+-------+----+----------+-----+-------------+
|     Goliath|      2|  EE|        32|   94|       85.667|
|        Ravi|      5|  EE|        82|   83|       85.667|
|     Shivani|     12|  EE|        67|   80|       85.667|
|         Amy|      7|  EC|        63|   98|       90.500|
|     Johnson|      8|  EC|        25|   83|       90.500|
|       David|      1|  CS|        55|   98|       93.333|
|       Raven|      3|  CS|        40|   83|       93.333|
|       Logan|      4|  CS|        70|   99|       93.333|
+------------+-------+----+----------+-----+-------------+

Count

This function is used to calculate the count of all the rows of a table or a defined set of rows.

Syntax: count(column/expression) over(partition by col(s)/expression(s) | order by col(s)/expression(s))

Similar to the above example, let’s calculate the count of all students in a department

spark.sql("""select *, count(roll_no) over(partition by dept order by dept) as student_count 
             from dataset_2""").show()
+------------+-------+----+----------+-----+-------------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|student_count|
+------------+-------+----+----------+-----+-------------+
|     Goliath|      2|  EE|        32|   94|            3|
|        Ravi|      5|  EE|        82|   83|            3|
|     Shivani|     12|  EE|        67|   80|            3|
|         Amy|      7|  EC|        63|   98|            2|
|     Johnson|      8|  EC|        25|   83|            2|
|       David|      1|  CS|        55|   98|            3|
|       Raven|      3|  CS|        40|   83|            3|
|       Logan|      4|  CS|        70|   99|            3|
+------------+-------+----+----------+-----+-------------+

Min and Max

As the name suggests, min and max functions are used to compute the minimum and maximum element from the entire data or a defined set of rows.

Syntax: min(column/expression) over(partition by col(s)/expression(s) | order by col(s)/expression(s))

Let’s start off by calculating the minimum and maximum marks from each department.

spark.sql("""select dept
          , min(marks) as min_marks
          , max(marks) as max_marks
          from dataset_2
          group by dept""").show()
+----+---------+---------+
|dept|min_marks|max_marks|
+----+---------+---------+
|  EE|       80|       94|
|  EC|       83|       98|
|  CS|       83|       99|
+----+---------+---------+

Now, we can utilize the window function to display the above result on each row.

spark.sql("""select *
          , min(marks) over(partition by dept order by dept) as min_marks
          , max(marks) over(partition by dept order by dept) as max_marks
          from dataset_2""").show()
+------------+-------+----+----------+-----+---------+---------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|min_marks|max_marks|
+------------+-------+----+----------+-----+---------+---------+
|     Goliath|      2|  EE|        32|   94|       80|       94|
|        Ravi|      5|  EE|        82|   83|       80|       94|
|     Shivani|     12|  EE|        67|   80|       80|       94|
|         Amy|      7|  EC|        63|   98|       83|       98|
|     Johnson|      8|  EC|        25|   83|       83|       98|
|       David|      1|  CS|        55|   98|       83|       99|
|       Raven|      3|  CS|        40|   83|       83|       99|
|       Logan|      4|  CS|        70|   99|       83|       99|
+------------+-------+----+----------+-----+---------+---------+

First and Last

First_Value and Last_Value is used obtain the first value and last value from all the rows in a table or from a specified window.

Syntax: first_value(column/expression) over(partition by col(s)/expression(s) | order by col(s)/expression(s))
Syntax: last_value(column/expression) over(partition by col(s)/expression(s) | order by col(s)/expression(s))

First_Value

For this example, let’s compute the first value of marks from a group of students belonging to a department. Within each department, the data is sorted in descending order of due_amount.

spark.sql("""select *
            , first_value(marks) over(partition by dept order by due_amount desc) as first_value_marks
            , last_value(marks) over(partition by dept order by due_amount desc) as last_value_marks
              from dataset_2""").show()
+------------+-------+----+----------+-----+-----------------+----------------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|first_value_marks|last_value_marks|
+------------+-------+----+----------+-----+-----------------+----------------+
|        Ravi|      5|  EE|        82|   83|               83|              83|
|     Shivani|     12|  EE|        67|   80|               83|              80|
|     Goliath|      2|  EE|        32|   94|               83|              94|
|         Amy|      7|  EC|        63|   98|               98|              98|
|     Johnson|      8|  EC|        25|   83|               98|              83|
|       Logan|      4|  CS|        70|   99|               99|              99|
|       David|      1|  CS|        55|   98|               99|              98|
|       Raven|      3|  CS|        40|   83|               99|              83|
+------------+-------+----+----------+-----+-----------------+----------------+

Last_Value

Last Value function behaves a bit differently to it’s counterpart, the first_value.

When an order by clause is included in the window, it becomes the last_value “until that record”. So, the value keeps changing for every record.

Below example clearly demonstrates it,

spark.sql("""select *
            , last_value(marks) over(partition by dept order by due_amount desc) as last_value_marks
              from dataset_2""").show()
+------------+-------+----+----------+-----+----------------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|last_value_marks|
+------------+-------+----+----------+-----+----------------+
|        Ravi|      5|  EE|        82|   83|              83|
|     Shivani|     12|  EE|        67|   80|              80|
|     Goliath|      2|  EE|        32|   94|              94|
|         Amy|      7|  EC|        63|   98|              98|
|     Johnson|      8|  EC|        25|   83|              83|
|       Logan|      4|  CS|        70|   99|              99|
|       David|      1|  CS|        55|   98|              98|
|       Raven|      3|  CS|        40|   83|              83|
+------------+-------+----+----------+-----+----------------+

To maintain a static value across the group or partition, an additional clause must be added,
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING

The above clause asks the last_value function to compute the last value from the entire partition of rows which is clearly stated by the “unbounded preceding and unbounded following” phrase.

Now, let’s go ahead with an example to view this behaviour.

spark.sql("""select *
            , last_value(marks) over(partition by dept order by due_amount desc
            ROWS BETWEEN UNBOUNDED PRECEDING AND 
              UNBOUNDED FOLLOWING) as last_value_marks
              from dataset_2""").show()
+------------+-------+----+----------+-----+----------------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|last_value_marks|
+------------+-------+----+----------+-----+----------------+
|     Goliath|      2|  EE|        32|   94|              80|
|        Ravi|      5|  EE|        82|   83|              80|
|     Shivani|     12|  EE|        67|   80|              80|
|         Amy|      7|  EC|        63|   98|              83|
|     Johnson|      8|  EC|        25|   83|              83|
|       David|      1|  CS|        55|   98|              99|
|       Raven|      3|  CS|        40|   83|              99|
|       Logan|      4|  CS|        70|   99|              99|
+------------+-------+----+----------+-----+----------------+

Lead and Lag

Lead and lag are one of the most commonly used functions in SQL. As the name suggests, lead and lag are used to fetch the next and previous record respectively from the group of rows.

Syntax: lead(column/expression, offset, default_value) over(partition by col(s)/expression(s) | order by col(s)/expression(s))

Note: Offset and default_value is optional. By default, offset is 1 and default_value is null

Before we proceed with our example, let’s create a sample dataset.

ldf = spark.createDataFrame([('Leonard','12-03-2019'),('Raj','15-03-2019'), ('Cooper','17-04-2019'),
                            ('Howard','20-04-2019'), ('Penny','29-04-2019')],
                            ['Employee_Name', 'Joining_Date'])
ldf.createOrReplaceTempView('dataset_3')
ldf.show()
+-------------+------------+
|Employee_Name|Joining_Date|
+-------------+------------+
|      Leonard|  12-03-2019|
|          Raj|  15-03-2019|
|       Cooper|  17-04-2019|
|       Howard|  20-04-2019|
|        Penny|  29-04-2019|
+-------------+------------+

Now let’s obtain the next and previous employee’s joining date from this given dataset. Let’s consider the data to be sorted based on joining date.

spark.sql("""select *
          , lead(joining_date, 1, 'NA') over(order by joining_date) as next_employee_joining_date
          , lag(joining_date) over(order by joining_date) as prev_employee_joining_date
          from dataset_3""").show()
+-------------+------------+--------------------------+--------------------------+
|Employee_Name|Joining_Date|next_employee_joining_date|prev_employee_joining_date|
+-------------+------------+--------------------------+--------------------------+
|      Leonard|  12-03-2019|                15-03-2019|                      null|
|          Raj|  15-03-2019|                17-04-2019|                12-03-2019|
|       Cooper|  17-04-2019|                20-04-2019|                15-03-2019|
|       Howard|  20-04-2019|                29-04-2019|                17-04-2019|
|        Penny|  29-04-2019|                        NA|                20-04-2019|
+-------------+------------+--------------------------+--------------------------+

Now let’s use dataset_2 and fetch the next student’s marks within each department.

Here, we will be using the partition by clause to group the data based on department and calculate lead and lag on top of it.

spark.sql("""select *
          , lead(marks, 1, -1) over(partition by dept order by roll_no) as next_student_marks
          , lag(marks, 1, -1) over(partition by dept order by roll_no) as prev_student_marks
          from dataset_2""").show()
+------------+-------+----+----------+-----+------------------+------------------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|next_student_marks|prev_student_marks|
+------------+-------+----+----------+-----+------------------+------------------+
|     Goliath|      2|  EE|        32|   94|                83|                -1|
|        Ravi|      5|  EE|        82|   83|                80|                94|
|     Shivani|     12|  EE|        67|   80|                -1|                83|
|         Amy|      7|  EC|        63|   98|                83|                -1|
|     Johnson|      8|  EC|        25|   83|                -1|                98|
|       David|      1|  CS|        55|   98|                83|                -1|
|       Raven|      3|  CS|        40|   83|                99|                98|
|       Logan|      4|  CS|        70|   99|                -1|                83|
+------------+-------+----+----------+-----+------------------+------------------+

Conclusion

By now, you must be well acquainted with analytic functions in Hive/Spark SQL and write SQL queries using them based on your requirements.

I recommend trying out more such examples with some sample datasets to gain more expertise on the same.

Comments and feedback are welcome. Cheers!

comments powered by Disqus