August 17, 2019
In a world where data is everything, transforming raw data into meaningful insights require the usage of certain sql aggregate functions and sql analytic functions. This post would focus on the commonly used SQL analytical functions in Hive and Spark
Introduction
SQL Analytic functions or Hive Analytic functions or SQL aggregate functions come packed with a lot of features such as computing aggregates such as moving sums, cumulative sums, averages etc. In addition to this, they are used to obtain ranking on the data, eliminating duplicates etc.
This post would cover the basics of sql analytic functions and sql aggregate functions along with detailed examples for every function. Sample datasets would be used for illustration purposes.
By the end of this post, you should be familiar with,
- Analytic Functions in Hive and Spark SQL
- SQL Aggregate Functions such as
avg
,count
,min
,max
andsum
. over()
clause and why they are used ?- How the computation changes with the window definition.
- Frequently used hive analytic functions such as
row_number
,rank
,min
,max
,lead
,lag
,first_value
,last_value
and many more.
Table of Contents
- What is an Analytic Function?
- What is the Over clause?
- Row_Number
- Rank and Dense Rank
- Sum
- Cumulative Sum
- Average
- Count
- Min and Max
- First_Value and Last_Value
- Lead and Lag
- Conclusion
What is an Analytic Function?
SQL Analytic functions or Window functions in Hive are functions used to compute an aggregate value from a table or a set of rows based on a defined partition window.
These functions usually consist of an over() keyword which helps in defining the window and an optional list of columns for ordering. This optional ordering is used to define criteria for the function evaluation.
SQL Analytic functions in Hive or Spark are usually used to compute cumulative sums, moving averages, next and previous records etc.
Examples of some analytic functions include, lead, lag, sum, min, max
etc.
What is the Over clause?
Analytic functions and aggregate functions in sql consist of an over clause which helps in defining the partitions and the sorting of data within each partition.
This Over clause defines a window or user-specified set of rows within a query result set. Based on the defined window, the aggregate function is calculated.
Example: select max(marks) over(partition by dept order by dept) as max_marks from table
Note: All the SQL queries used in these examples can be executed in both Hive and Spark.
Row_Number
Hive Row_Number or SQL Row Number helps to serve 2 main purposes,
- Generate a unique sequence number for all the rows or a group of rows.
- Helps eliminate duplicate rows
To practically illustrate the above mentioned points, let’s create a simple dataset. Let’s create a spark dataframe and save it in a temporary view.
To explore more on how to create spark dataframes and perform operations on it, click here.
df = spark.createDataFrame([('David',1, 'CS'),('Goliath',2, 'EE'), ('Goliath',2, 'EE'),
('Raven',3, 'CS'), ('David',1, 'CS'), ('Logan',4, 'CS'),
('Raven',3, 'CS')],['Student_Name', 'Roll_No', 'Dept'])
df.show()
df.createOrReplaceTempView('dataset')
+------------+-------+----+
|Student_Name|Roll_No|Dept|
+------------+-------+----+
| David| 1| CS|
| Goliath| 2| EE|
| Goliath| 2| EE|
| Raven| 3| CS|
| David| 1| CS|
| Logan| 4| CS|
| Raven| 3| CS|
+------------+-------+----+
The above dataset can also be created using plain Hive/Spark SQL as shown below,
spark.sql("drop table if exists dataset")
spark.sql("""create table dataset as
select 'David' as Student_Name, 1 as Roll_No, 'CS' as Dept union all
select 'Goliath',2, 'EE' union all
select 'Goliath',2, 'EE' union all
select 'Raven', 3, 'CS' union all
select 'David', 1, 'CS' union all
select 'Logan', 4, 'CS' union all
select 'Raven', 3, 'CS'""")
DataFrame[]
spark.sql("select * from dataset").show()
+------------+-------+----+
|Student_Name|Roll_No|Dept|
+------------+-------+----+
| David| 1| CS|
| Goliath| 2| EE|
| Goliath| 2| EE|
| Raven| 3| CS|
| David| 1| CS|
| Logan| 4| CS|
| Raven| 3| CS|
+------------+-------+----+
Syntax: row_number() over(partition by col(s)/expression(s) | order by col(s)/expression(s))
- #### Generating a Sequence Number
Hive Row_Number
function can be used to generate a unique sequence number for each row when no partition by clause is specified in the window.
spark.sql("select * ,row_number() over(order by roll_no) as row_num from dataset").show()
+------------+-------+----+-------+
|Student_Name|Roll_No|Dept|row_num|
+------------+-------+----+-------+
| David| 1| CS| 1|
| David| 1| CS| 2|
| Goliath| 2| EE| 3|
| Goliath| 2| EE| 4|
| Raven| 3| CS| 5|
| Raven| 3| CS| 6|
| Logan| 4| CS| 7|
+------------+-------+----+-------+
- #### Eliminating Duplicates using row_number
As seen in the above dataset, there are some redundant rows. Let’s go ahead and eliminate this redundancy using row_number
.
The point to keep in mind here is, row_number will return a 1 for all unique rows. Any number other than 1 is a duplicate row on the specified window.
Here, our partition by clause would be the group or set of columns to be considered to check the duplicates. Let’s proceed with our example.
spark.sql("""select * ,row_number() over(partition by student_name, roll_no, dept order by roll_no desc)
as row_num from dataset""").show()
+------------+-------+----+-------+
|Student_Name|Roll_No|Dept|row_num|
+------------+-------+----+-------+
| David| 1| CS| 1|
| David| 1| CS| 2|
| Goliath| 2| EE| 1|
| Goliath| 2| EE| 2|
| Raven| 3| CS| 1|
| Raven| 3| CS| 2|
| Logan| 4| CS| 1|
+------------+-------+----+-------+
As seen above, the unique rows are present with all 1s. Now, we can filter the rows with row numbers equal to 1.
spark.sql("""select * from (
select * ,row_number() over(partition by student_name, roll_no, dept order by roll_no desc)
as row_num from dataset
)a
where a.row_num = 1""").show()
+------------+-------+----+-------+
|Student_Name|Roll_No|Dept|row_num|
+------------+-------+----+-------+
| David| 1| CS| 1|
| Goliath| 2| EE| 1|
| Raven| 3| CS| 1|
| Logan| 4| CS| 1|
+------------+-------+----+-------+
Rank and Dense Rank
As the name suggests, Spark SQL rank
or Hive rank
is used to derive a ranking on a group of rows based on a specified column or expression.
Syntax: rank() over(partition by col(s)/expression(s) | order by col(s)/expression(s))
The difference between rank
and dense_rank
is that, the former skips the next ranking number when same rank is assigned to a certain number of rows, whereas dense_rank retains the sequence of rank even in case of same ranks.
The above statment would make more sense, by looking at the below example.
Let’s create a sample dataset for this illustration.
df2 = spark.createDataFrame([('David',1, 'CS', 55, 98),('Goliath',2, 'EE', 32, 94),
('Raven',3, 'CS', 40, 83), ('Logan',4, 'CS', 70, 99), ('Ravi',5, 'EE', 82, 83),
('Amy',7, 'EC', 63, 98), ('Johnson',8, 'EC', 25, 83),
('Shivani',12, 'EE', 67, 80)],['Student_Name', 'Roll_No', 'Dept', 'Due_Amount', 'Marks'])
df2.show()
df2.createOrReplaceTempView('dataset_2')
+------------+-------+----+----------+-----+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|
+------------+-------+----+----------+-----+
| David| 1| CS| 55| 98|
| Goliath| 2| EE| 32| 94|
| Raven| 3| CS| 40| 83|
| Logan| 4| CS| 70| 99|
| Ravi| 5| EE| 82| 83|
| Amy| 7| EC| 63| 98|
| Johnson| 8| EC| 25| 83|
| Shivani| 12| EE| 67| 80|
+------------+-------+----+----------+-----+
Now, let’s generate a rank and dense rank on this dataset,
spark.sql("""select *
, rank() over(order by marks desc) as Rank
, dense_rank() over(order by marks desc) as Dense_Rank
from dataset_2""").show()
+------------+-------+----+----------+-----+----+----------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|Rank|Dense_Rank|
+------------+-------+----+----------+-----+----+----------+
| Logan| 4| CS| 70| 99| 1| 1|
| David| 1| CS| 55| 98| 2| 2|
| Amy| 7| EC| 63| 98| 2| 2|
| Goliath| 2| EE| 32| 94| 4| 3|
| Raven| 3| CS| 40| 83| 5| 4|
| Ravi| 5| EE| 82| 83| 5| 4|
| Johnson| 8| EC| 25| 83| 5| 4|
| Shivani| 12| EE| 67| 80| 8| 5|
+------------+-------+----+----------+-----+----+----------+
Now, let’s generate the rank
and dense_rank
of each student within their respective departments.
Here, an additional partition by clause would be introduced to calculate the rank with respect to each department.
spark.sql("""select *
, rank() over(partition by dept order by marks desc) as Rank
, dense_rank() over(partition by dept order by marks desc) as Dense_Rank
from dataset_2""").show()
+------------+-------+----+----------+-----+----+----------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|Rank|Dense_Rank|
+------------+-------+----+----------+-----+----+----------+
| Goliath| 2| EE| 32| 94| 1| 1|
| Ravi| 5| EE| 82| 83| 2| 2|
| Shivani| 12| EE| 67| 80| 3| 3|
| Amy| 7| EC| 63| 98| 1| 1|
| Johnson| 8| EC| 25| 83| 2| 2|
| Logan| 4| CS| 70| 99| 1| 1|
| David| 1| CS| 55| 98| 2| 2|
| Raven| 3| CS| 40| 83| 3| 3|
+------------+-------+----+----------+-----+----+----------+
Sum
Sum
is one of the SQL aggregate functions which is used to calculate the sum of a column from all the rows of a table or a defined set of rows
Syntax: sum(column/expression) (partition by col(s)/expression(s) | order by col(s)/expression(s))
Let’s calculate the sum of all the due amounts of each student with respect to each department
spark.sql("select dept, sum(due_amount) as total_due_amount from dataset_2 group by dept").show()
+----+----------------+
|dept|total_due_amount|
+----+----------------+
| EE| 181|
| EC| 88|
| CS| 165|
+----+----------------+
Now, let’s calculate the same sum and display the it on each row. This can be achieved using the partition by clause
spark.sql("""select *, sum(due_amount) over(partition by dept order by dept) as total_due_amount
from dataset_2""").show()
+------------+-------+----+----------+-----+----------------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|total_due_amount|
+------------+-------+----+----------+-----+----------------+
| Goliath| 2| EE| 32| 94| 181|
| Ravi| 5| EE| 82| 83| 181|
| Shivani| 12| EE| 67| 80| 181|
| Amy| 7| EC| 63| 98| 88|
| Johnson| 8| EC| 25| 83| 88|
| David| 1| CS| 55| 98| 165|
| Raven| 3| CS| 40| 83| 165|
| Logan| 4| CS| 70| 99| 165|
+------------+-------+----+----------+-----+----------------+
Cumulative Sum
By changing the window definition, we can generate a cumulative sum using the sum function.
spark.sql("""select *, sum(due_amount) over(order by roll_no) as cumulative_due_amount
from dataset_2""").show()
+------------+-------+----+----------+-----+---------------------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|cumulative_due_amount|
+------------+-------+----+----------+-----+---------------------+
| David| 1| CS| 55| 98| 55|
| Goliath| 2| EE| 32| 94| 87|
| Raven| 3| CS| 40| 83| 127|
| Logan| 4| CS| 70| 99| 197|
| Ravi| 5| EE| 82| 83| 279|
| Amy| 7| EC| 63| 98| 342|
| Johnson| 8| EC| 25| 83| 367|
| Shivani| 12| EE| 67| 80| 434|
+------------+-------+----+----------+-----+---------------------+
Now, let’s generate a cumulative sum of all the marks within each department
spark.sql("""select *, sum(due_amount) over(partition by dept order by roll_no) as cumulative_due_amount
from dataset_2""").show()
+------------+-------+----+----------+-----+---------------------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|cumulative_due_amount|
+------------+-------+----+----------+-----+---------------------+
| Goliath| 2| EE| 32| 94| 32|
| Ravi| 5| EE| 82| 83| 114|
| Shivani| 12| EE| 67| 80| 181|
| Amy| 7| EC| 63| 98| 63|
| Johnson| 8| EC| 25| 83| 88|
| David| 1| CS| 55| 98| 55|
| Raven| 3| CS| 40| 83| 95|
| Logan| 4| CS| 70| 99| 165|
+------------+-------+----+----------+-----+---------------------+
Helpful Tip: Try changing the order by column from roll no to dept to obtain as cumulative sum with respect to each department. Try similar such combinations to explore more on this.
Average
This sql aggregate function is used to calculate the average of a column from all the rows of a table or a defined set of rows.
Syntax: avg(column/expression) over(partition by col(s)/expression(s) | order by col(s)/expression(s))
Similar to the above example, let’s calculate the average marks with respect to each department
spark.sql("""select *, cast(avg(marks) over(partition by dept order by dept) as decimal(10,3)) as average_marks
from dataset_2""").show()
+------------+-------+----+----------+-----+-------------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|average_marks|
+------------+-------+----+----------+-----+-------------+
| Goliath| 2| EE| 32| 94| 85.667|
| Ravi| 5| EE| 82| 83| 85.667|
| Shivani| 12| EE| 67| 80| 85.667|
| Amy| 7| EC| 63| 98| 90.500|
| Johnson| 8| EC| 25| 83| 90.500|
| David| 1| CS| 55| 98| 93.333|
| Raven| 3| CS| 40| 83| 93.333|
| Logan| 4| CS| 70| 99| 93.333|
+------------+-------+----+----------+-----+-------------+
Count
Count
is one of the sql aggregate functions in hive or spark which is used to calculate the count of all the rows of a table or a defined set of rows.
Syntax: count(column/expression) over(partition by col(s)/expression(s) | order by col(s)/expression(s))
Similar to the above example, let’s calculate the count of all students in a department
spark.sql("""select *, count(roll_no) over(partition by dept order by dept) as student_count
from dataset_2""").show()
+------------+-------+----+----------+-----+-------------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|student_count|
+------------+-------+----+----------+-----+-------------+
| Goliath| 2| EE| 32| 94| 3|
| Ravi| 5| EE| 82| 83| 3|
| Shivani| 12| EE| 67| 80| 3|
| Amy| 7| EC| 63| 98| 2|
| Johnson| 8| EC| 25| 83| 2|
| David| 1| CS| 55| 98| 3|
| Raven| 3| CS| 40| 83| 3|
| Logan| 4| CS| 70| 99| 3|
+------------+-------+----+----------+-----+-------------+
Min and Max
As the name suggests, min
and max
are SQL aggregate functions used to compute the minimum and maximum element from the entire data or a defined set of rows.
Syntax: min(column/expression) over(partition by col(s)/expression(s) | order by col(s)/expression(s))
Let’s start off by calculating the minimum and maximum marks from each department.
spark.sql("""select dept
, min(marks) as min_marks
, max(marks) as max_marks
from dataset_2
group by dept""").show()
+----+---------+---------+
|dept|min_marks|max_marks|
+----+---------+---------+
| EE| 80| 94|
| EC| 83| 98|
| CS| 83| 99|
+----+---------+---------+
Now, we can utilize the window function to display the above result on each row.
spark.sql("""select *
, min(marks) over(partition by dept order by dept) as min_marks
, max(marks) over(partition by dept order by dept) as max_marks
from dataset_2""").show()
+------------+-------+----+----------+-----+---------+---------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|min_marks|max_marks|
+------------+-------+----+----------+-----+---------+---------+
| Goliath| 2| EE| 32| 94| 80| 94|
| Ravi| 5| EE| 82| 83| 80| 94|
| Shivani| 12| EE| 67| 80| 80| 94|
| Amy| 7| EC| 63| 98| 83| 98|
| Johnson| 8| EC| 25| 83| 83| 98|
| David| 1| CS| 55| 98| 83| 99|
| Raven| 3| CS| 40| 83| 83| 99|
| Logan| 4| CS| 70| 99| 83| 99|
+------------+-------+----+----------+-----+---------+---------+
First and Last
First_Value
and Last_Value
in Hive or Spark SQL is used obtain the first value and last value from all the rows in a table or from a specified window.
Syntax: first_value(column/expression) over(partition by col(s)/expression(s) | order by col(s)/expression(s))
Syntax: last_value(column/expression) over(partition by col(s)/expression(s) | order by col(s)/expression(s))
First_Value
For this example, let’s compute the first value of marks from a group of students belonging to a department. Within each department, the data is sorted in descending order of due_amount.
spark.sql("""select *
, first_value(marks) over(partition by dept order by due_amount desc) as first_value_marks
, last_value(marks) over(partition by dept order by due_amount desc) as last_value_marks
from dataset_2""").show()
+------------+-------+----+----------+-----+-----------------+----------------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|first_value_marks|last_value_marks|
+------------+-------+----+----------+-----+-----------------+----------------+
| Ravi| 5| EE| 82| 83| 83| 83|
| Shivani| 12| EE| 67| 80| 83| 80|
| Goliath| 2| EE| 32| 94| 83| 94|
| Amy| 7| EC| 63| 98| 98| 98|
| Johnson| 8| EC| 25| 83| 98| 83|
| Logan| 4| CS| 70| 99| 99| 99|
| David| 1| CS| 55| 98| 99| 98|
| Raven| 3| CS| 40| 83| 99| 83|
+------------+-------+----+----------+-----+-----------------+----------------+
Last_Value
Last_Value function behaves a bit differently compared to it’s counterpart, the first_value.
When an order by
clause is included in the window, it becomes the last_value “until that record”. So, the value keeps changing for every record.
Below example clearly demonstrates it,
spark.sql("""select *
, last_value(marks) over(partition by dept order by due_amount desc) as last_value_marks
from dataset_2""").show()
+------------+-------+----+----------+-----+----------------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|last_value_marks|
+------------+-------+----+----------+-----+----------------+
| Ravi| 5| EE| 82| 83| 83|
| Shivani| 12| EE| 67| 80| 80|
| Goliath| 2| EE| 32| 94| 94|
| Amy| 7| EC| 63| 98| 98|
| Johnson| 8| EC| 25| 83| 83|
| Logan| 4| CS| 70| 99| 99|
| David| 1| CS| 55| 98| 98|
| Raven| 3| CS| 40| 83| 83|
+------------+-------+----+----------+-----+----------------+
To maintain a static value across the group or partition, an additional clause must be added,
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
The above clause asks the last_value function to compute the last value from the entire partition of rows which is clearly stated by the “unbounded preceding and unbounded following” phrase.
Now, let’s go ahead with an example to view this behaviour.
spark.sql("""select *
, last_value(marks) over(partition by dept order by due_amount desc
ROWS BETWEEN UNBOUNDED PRECEDING AND
UNBOUNDED FOLLOWING) as last_value_marks
from dataset_2""").show()
+------------+-------+----+----------+-----+----------------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|last_value_marks|
+------------+-------+----+----------+-----+----------------+
| Goliath| 2| EE| 32| 94| 80|
| Ravi| 5| EE| 82| 83| 80|
| Shivani| 12| EE| 67| 80| 80|
| Amy| 7| EC| 63| 98| 83|
| Johnson| 8| EC| 25| 83| 83|
| David| 1| CS| 55| 98| 99|
| Raven| 3| CS| 40| 83| 99|
| Logan| 4| CS| 70| 99| 99|
+------------+-------+----+----------+-----+----------------+
Lead and Lag
Lead
and lag
are one of the most commonly used analytic functions in SQL. As the name suggests, lead and lag are used to fetch the next and previous record respectively from the group of rows.
Syntax: lead(column/expression, offset, default_value) over(partition by col(s)/expression(s) | order by col(s)/expression(s))
Note: Offset and default_value is optional. By default, offset is 1 and default_value is null
Before we proceed with our example, let’s create a sample dataset.
ldf = spark.createDataFrame([('Leonard','12-03-2019'),('Raj','15-03-2019'), ('Cooper','17-04-2019'),
('Howard','20-04-2019'), ('Penny','29-04-2019')],
['Employee_Name', 'Joining_Date'])
ldf.createOrReplaceTempView('dataset_3')
ldf.show()
+-------------+------------+
|Employee_Name|Joining_Date|
+-------------+------------+
| Leonard| 12-03-2019|
| Raj| 15-03-2019|
| Cooper| 17-04-2019|
| Howard| 20-04-2019|
| Penny| 29-04-2019|
+-------------+------------+
Now let’s obtain the next and previous employee’s joining date from this given dataset. Let’s consider the data to be sorted based on joining date.
spark.sql("""select *
, lead(joining_date, 1, 'NA') over(order by joining_date) as next_employee_joining_date
, lag(joining_date) over(order by joining_date) as prev_employee_joining_date
from dataset_3""").show()
+-------------+------------+--------------------------+--------------------------+
|Employee_Name|Joining_Date|next_employee_joining_date|prev_employee_joining_date|
+-------------+------------+--------------------------+--------------------------+
| Leonard| 12-03-2019| 15-03-2019| null|
| Raj| 15-03-2019| 17-04-2019| 12-03-2019|
| Cooper| 17-04-2019| 20-04-2019| 15-03-2019|
| Howard| 20-04-2019| 29-04-2019| 17-04-2019|
| Penny| 29-04-2019| NA| 20-04-2019|
+-------------+------------+--------------------------+--------------------------+
Now let’s use dataset_2 and fetch the next student’s marks within each department.
Here, we will be using the partition by clause to group the data based on department and calculate lead and lag on top of it.
spark.sql("""select *
, lead(marks, 1, -1) over(partition by dept order by roll_no) as next_student_marks
, lag(marks, 1, -1) over(partition by dept order by roll_no) as prev_student_marks
from dataset_2""").show()
+------------+-------+----+----------+-----+------------------+------------------+
|Student_Name|Roll_No|Dept|Due_Amount|Marks|next_student_marks|prev_student_marks|
+------------+-------+----+----------+-----+------------------+------------------+
| Goliath| 2| EE| 32| 94| 83| -1|
| Ravi| 5| EE| 82| 83| 80| 94|
| Shivani| 12| EE| 67| 80| -1| 83|
| Amy| 7| EC| 63| 98| 83| -1|
| Johnson| 8| EC| 25| 83| -1| 98|
| David| 1| CS| 55| 98| 83| -1|
| Raven| 3| CS| 40| 83| 99| 98|
| Logan| 4| CS| 70| 99| -1| 83|
+------------+-------+----+----------+-----+------------------+------------------+
Conclusion
By now, you must be well acquainted with analytic functions in Hive/Spark SQL and SQL aggregate functions and write SQL queries using them based on your requirements.
I recommend trying out more such examples with some sample datasets to gain more expertise on the same.
Comments and feedback are welcome. Cheers!