Semi-Structured Data in Spark (pyspark) - JSON

November 22, 2018

In this post we discuss how to read semi-structured data from different data sources and store it as a spark dataframe. The spark dataframe can in turn be used to perform aggregations and all sorts of data manipulations.

Introduction

Previously we saw how to create and work with spark dataframes. In post we discuss how to read semi-structured data from different data sources and store it as a spark dataframe and how to do further data manipulations.

In addition, Spark provides you the power to read semi-structured data and convert the same into a flattened structure which can be stored as a Structured Table or textfile.

By the end of this post we will be covering:

What are the Complex Datatypes in Spark

Before we dive into reading a JSON File in spark, let’s refresh ourselves with the complex datatypes present in Spark and get a basic understanding of the same.
There are 3 complex Types in Spark,

  • Array
  • Struct
  • Map
  • Array

    An Array in spark consists of a list of homogenous elements (i.e) elements of the same datatype together.

    input_json = """
    {
      "numbers": [1, 2, 3, 4, 5, 6]
    }
    """
    adf = spark.read.json(sc.parallelize([input_json]))
    adf.printSchema()
    
    root
     |-- numbers: array (nullable = true)
     |    |-- element: long (containsNull = true)
    


    Let’s have a look at the data

    adf.show(truncate=False)
    
    +------------------+
    |numbers           |
    +------------------+
    |[1, 2, 3, 4, 5, 6]|
    +------------------+
    


    Flatten the Array using Explode
    Now, what if you wish to display the elements in a more structured form with the elements present in individual rows.
    Now here comes the usage of the “explode” function. The explode, as the name suggests breaks the array into rows containing one element each. Below is a simple usage of the explode function, to explode this array.

    from pyspark.sql.functions import explode
    adf.select(explode('numbers').alias('number')).show()
    
    +------+
    |number|
    +------+
    |     1|
    |     2|
    |     3|
    |     4|
    |     5|
    |     6|
    +------+
    


    Struct

    Struct data type is grouped list of variables which can be accessed via a single parent pointer.
    The elements inside a struct type can be accessed via the dot “.” notation as discussed below,

    input_json = """
    {
      "car_details": {
         "model": "Tesla S",
         "year": 2018
      }
    }
    """
    sdf = spark.read.json(sc.parallelize([input_json]))
    sdf.printSchema()
    
    root
     |-- car_details: struct (nullable = true)
     |    |-- model: string (nullable = true)
     |    |-- year: long (nullable = true)
    


    This is how the data looks.

    sdf.show()
    
    +---------------+
    |    car_details|
    +---------------+
    |[Tesla S, 2018]|
    +---------------+
    


    Methods to access the elements inside a struct type are shown below,

    sdf.select(sdf.car_details.model, sdf.car_details.year).show()
    
    +-----------------+----------------+
    |car_details.model|car_details.year|
    +-----------------+----------------+
    |          Tesla S|            2018|
    +-----------------+----------------+
    


    An Alternate Method for the same is present below,

    from pyspark.sql.functions import col
    sdf.select(col('car_details.model'), col('car_details.year')).show()
    
    +-------+----+
    |  model|year|
    +-------+----+
    |Tesla S|2018|
    +-------+----+
    


    Map

    Map is an element consisting of a key value pair. It is similar to a dictionary in python.

    Let’s see how map elements can be accessed from a JSON record,

    from pyspark.sql.types import StructType, MapType, StringType, IntegerType
    input_json = """
    {
      "Car": {
        "model_id": 835,
        "year": 2008
      }
    }
    """
    schema = StructType().add("Car", MapType(StringType(), IntegerType()))
    mdf = spark.read.json(sc.parallelize([input_json]), schema=schema)
    mdf.printSchema()
    
    root
     |-- Car: map (nullable = true)
     |    |-- key: string
     |    |-- value: integer (valueContainsNull = true)
    


    This is how the data looks when displayed,

    mdf.show(truncate=False)
    
    +-------------------------------+
    |Car                            |
    +-------------------------------+
    |[model_id -> 835, year -> 2008]|
    +-------------------------------+
    


    Accessing elements individually can also be done using a dictionary type access in python, as shown below,

    mdf.select(mdf.Car['model_id'], mdf.Car['year']).show()
    
    +-------------+---------+
    |Car[model_id]|Car[year]|
    +-------------+---------+
    |          835|     2008|
    +-------------+---------+
    


    How to read a JSON file

    A JSON File can be read using a simple dataframe json reader method.

    Note: Spark accepts JSON data in the new-line delimited JSON Lines format, which basically means the JSON file must meet the below 3 requirements,

  • Each Line of the file is a JSON Record
  • Line Separator must be ‘\n’ or ‘\r\n’
  • Data must be UTF-8 Encoded
  • A Simple Example of a JSON Lines Formatted data is shown below,

    {
       {“id” : “1201”, “name” : “satish”, “age” : “25”}
       {“id” : “1202”, “name” : “krishna”, “age” : “28”}
    }


    As seen from above, each JSON record spans a new line with a new line separator.

    Let’s take a sample JSON File consisting of data about the different Pokemon present in a Pokedex. The Dataset can be downloaded here, pokedex_dataset

    Before we read this file in spark, let’s strip out the first header element using Python’s json package.

    We can use the json.load method to load the data which can be accessed in python the same way dict elements can be accessed.

    import json
    with open('/path-to-file/pokedex.json', 'r') as f:
        vals = json.load(f)
    vals = vals['pokemon']
    

    This is how the values look now

    vals[0]
    
    {'id': 1,
     'num': '001',
     'name': 'Bulbasaur',
     'img': 'http://www.serebii.net/pokemongo/pokemon/001.png',
     'type': ['Grass', 'Poison'],
     'height': '0.71 m',
     'weight': '6.9 kg',
     'candy': 'Bulbasaur Candy',
     'candy_count': 25,
     'egg': '2 km',
     'spawn_chance': 0.69,
     'avg_spawns': 69,
     'spawn_time': '20:00',
     'multipliers': [1.58],
     'weaknesses': ['Fire', 'Ice', 'Flying', 'Psychic'],
     'next_evolution': [{'num': '002', 'name': 'Ivysaur'},
      {'num': '003', 'name': 'Venusaur'}]}
    


    Now’ let’s save this in a new file called pokedex_edited.json and proceed with our dataframe operations

    with open('/path-to-file/pokedex_edited.json', 'w') as f:
        f.write(json.dumps(vals)) # json.dumps converts the list into a JSON String.
    


    The method described aboved to strip the header element can also be achieved via spark as well. So if the data volume is very large, the operation can be simply done in spark.

    This alternate method using spark is discussed at the end of this post.

    Now, lets read our data in spark,

    df = spark.read.json('/path-to-file/pokedex_edited.json', multiLine=True)
    


    An alternative for the above dataframe reader method is shown below,

    df = spark.read.format('json').option('multiLine', 'True').load('/path-to-file/pokedex_edited.json')
    

    This is how a single JSON record is present in the file,

        {'id': 1,
         'num': '001',
         'name': 'Bulbasaur',
         'img': 'http://www.serebii.net/pokemongo/pokemon/001.png',
         'type': ['Grass', 'Poison'],  # Array Element
         'height': '0.71 m',
         'weight': '6.9 kg',
         'candy': 'Bulbasaur Candy',
         'candy_count': 25,
         'egg': '2 km',
         'spawn_chance': 0.69,
         'avg_spawns': 69,
         'spawn_time': '20:00',
         'multipliers': [1.58],
         'weaknesses': ['Fire', 'Ice', 'Flying', 'Psychic'],  # Array Element
         'next_evolution': [{'num': '002', 'name': 'Ivysaur'},  # Array of Struct Elements
          {'num': '003', 'name': 'Venusaur'}]}
    

    As you can see from above, there are arrays, structs and even array of structs present in the data.

    Spark infers this schema dynamically and parses the data retaining the complex datatypes, (i.e), arrays, structs etc.

    Note: If you wish to maintain a static schema and would like to pass the same, the schema parameter can be set with an optional pyspark.sql.types.StructType schema.

    However, when spark dynamically infers the schema, the input column order isn’t maintained. If you want to keep the same order as the source, either add a schema to the read.json method or select the columns from the generated df as per your required column order.

    Let’s print the Schema of our Dataframe.

    df.printSchema()
    
    root
     |-- avg_spawns: double (nullable = true)
     |-- candy: string (nullable = true)
     |-- candy_count: long (nullable = true)
     |-- egg: string (nullable = true)
     |-- height: string (nullable = true)
     |-- id: long (nullable = true)
     |-- img: string (nullable = true)
     |-- multipliers: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- name: string (nullable = true)
     |-- next_evolution: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- name: string (nullable = true)
     |    |    |-- num: string (nullable = true)
     |-- num: string (nullable = true)
     |-- prev_evolution: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- name: string (nullable = true)
     |    |    |-- num: string (nullable = true)
     |-- spawn_chance: double (nullable = true)
     |-- spawn_time: string (nullable = true)
     |-- type: array (nullable = true)
     |    |-- element: string (containsNull = true)
     |-- weaknesses: array (nullable = true)
     |    |-- element: string (containsNull = true)
     |-- weight: string (nullable = true)
    


    This is how the data looks inside the dataframe.

    df.show(5,False)
    
    +----------+----------------+-----------+-----------+------+---+------------------------------------------------+-----------+----------+-------------------------------------+---+----------------------------------+------------+----------+---------------+----------------------------+--------+
    |avg_spawns|candy           |candy_count|egg        |height|id |img                                             |multipliers|name      |next_evolution                       |num|prev_evolution                    |spawn_chance|spawn_time|type           |weaknesses                  |weight  |
    +----------+----------------+-----------+-----------+------+---+------------------------------------------------+-----------+----------+-------------------------------------+---+----------------------------------+------------+----------+---------------+----------------------------+--------+
    |69.0      |Bulbasaur Candy |25         |2 km       |0.71 m|1  |http://www.serebii.net/pokemongo/pokemon/001.png|[1.58]     |Bulbasaur |[[Ivysaur, 002], [Venusaur, 003]]    |001|null                              |0.69        |20:00     |[Grass, Poison]|[Fire, Ice, Flying, Psychic]|6.9 kg  |
    |4.2       |Bulbasaur Candy |100        |Not in Eggs|0.99 m|2  |http://www.serebii.net/pokemongo/pokemon/002.png|[1.2, 1.6] |Ivysaur   |[[Venusaur, 003]]                    |002|[[Bulbasaur, 001]]                |0.042       |07:00     |[Grass, Poison]|[Fire, Ice, Flying, Psychic]|13.0 kg |
    |1.7       |Bulbasaur Candy |null       |Not in Eggs|2.01 m|3  |http://www.serebii.net/pokemongo/pokemon/003.png|null       |Venusaur  |null                                 |003|[[Bulbasaur, 001], [Ivysaur, 002]]|0.017       |11:30     |[Grass, Poison]|[Fire, Ice, Flying, Psychic]|100.0 kg|
    |25.3      |Charmander Candy|25         |2 km       |0.61 m|4  |http://www.serebii.net/pokemongo/pokemon/004.png|[1.65]     |Charmander|[[Charmeleon, 005], [Charizard, 006]]|004|null                              |0.253       |08:45     |[Fire]         |[Water, Ground, Rock]       |8.5 kg  |
    |1.2       |Charmander Candy|100        |Not in Eggs|1.09 m|5  |http://www.serebii.net/pokemongo/pokemon/005.png|[1.79]     |Charmeleon|[[Charizard, 006]]                   |005|[[Charmander, 004]]               |0.012       |19:00     |[Fire]         |[Water, Ground, Rock]       |19.0 kg |
    +----------+----------------+-----------+-----------+------+---+------------------------------------------------+-----------+----------+-------------------------------------+---+----------------------------------+------------+----------+---------------+----------------------------+--------+
    only showing top 5 rows
    


    There are 151 records in total in the JSON File, the dataframe count also yields the same result as shown below.

    df.count()
    
    151
    


    Now let’s take an array element and flatten the same into a structured form. For this example, we can select the id, name and weaknesses column.
    This is how the data looks before flattening.

    df.select('id', 'name', 'weaknesses').show(5,False)
    
    +---+----------+----------------------------+
    |id |name      |weaknesses                  |
    +---+----------+----------------------------+
    |1  |Bulbasaur |[Fire, Ice, Flying, Psychic]|
    |2  |Ivysaur   |[Fire, Ice, Flying, Psychic]|
    |3  |Venusaur  |[Fire, Ice, Flying, Psychic]|
    |4  |Charmander|[Water, Ground, Rock]       |
    |5  |Charmeleon|[Water, Ground, Rock]       |
    +---+----------+----------------------------+
    only showing top 5 rows
    


    How to Flatten the Data ?

    Approach 1: Using Spark Dataframe

    The Explode function can be used to explode the elements into individual rows thereby obtaining the data in a more structured fashion.

    from pyspark.sql.functions import explode
    df.select('id', 'name', explode('weaknesses').alias('weakness')).show(10)
    
    +---+---------+--------+
    | id|     name|weakness|
    +---+---------+--------+
    |  1|Bulbasaur|    Fire|
    |  1|Bulbasaur|     Ice|
    |  1|Bulbasaur|  Flying|
    |  1|Bulbasaur| Psychic|
    |  2|  Ivysaur|    Fire|
    |  2|  Ivysaur|     Ice|
    |  2|  Ivysaur|  Flying|
    |  2|  Ivysaur| Psychic|
    |  3| Venusaur|    Fire|
    |  3| Venusaur|     Ice|
    +---+---------+--------+
    only showing top 10 rows
    


    Approach 2 - Using Spark SQL

    The Lateral View Explode function can be used to explode the data using Spark SQL.

    df.createOrReplaceTempView('pokedex')
    spark.sql("""select id,
    name,
    weakness
    from pokedex
    lateral view explode(weaknesses)tmp as weakness""").show(10)
    
    +---+---------+--------+
    | id|     name|weakness|
    +---+---------+--------+
    |  1|Bulbasaur|    Fire|
    |  1|Bulbasaur|     Ice|
    |  1|Bulbasaur|  Flying|
    |  1|Bulbasaur| Psychic|
    |  2|  Ivysaur|    Fire|
    |  2|  Ivysaur|     Ice|
    |  2|  Ivysaur|  Flying|
    |  2|  Ivysaur| Psychic|
    |  3| Venusaur|    Fire|
    |  3| Venusaur|     Ice|
    +---+---------+--------+
    only showing top 10 rows
    


    How to Flatten Array consisting of Struct Elements ?

    Let’s pick id,name, next_evolution and prev_evolution columns from the dataset. This is how the data looks before flattening,

    df.select('id', 'name', 'next_evolution', 'prev_evolution').show(5,False)
    
    +---+----------+-------------------------------------+----------------------------------+
    |id |name      |next_evolution                       |prev_evolution                    |
    +---+----------+-------------------------------------+----------------------------------+
    |1  |Bulbasaur |[[Ivysaur, 002], [Venusaur, 003]]    |null                              |
    |2  |Ivysaur   |[[Venusaur, 003]]                    |[[Bulbasaur, 001]]                |
    |3  |Venusaur  |null                                 |[[Bulbasaur, 001], [Ivysaur, 002]]|
    |4  |Charmander|[[Charmeleon, 005], [Charizard, 006]]|null                              |
    |5  |Charmeleon|[[Charizard, 006]]                   |[[Charmander, 004]]               |
    +---+----------+-------------------------------------+----------------------------------+
    only showing top 5 rows
    


    Now, let’s flatten the above data. Our data consists of array elements which in turn have a struct in them. So we can use the explode function to initially explode the array and then use the dot “.” notation to pick the struct elements from the array.

    df.select('id', 'name', 'next_evolution', 'prev_evolution').printSchema()
    
    root
     |-- id: long (nullable = true)
     |-- name: string (nullable = true)
     |-- next_evolution: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- name: string (nullable = true)
     |    |    |-- num: string (nullable = true)
     |-- prev_evolution: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- name: string (nullable = true)
     |    |    |-- num: string (nullable = true)
    


    The Spark Dataframes Method allows only a single explode call in a select method, so multiple select methods must be called for calling multiple explode methods.

    exp_df = df.select('id','name',explode('next_evolution').alias('next_evolution'), 'prev_evolution')\
    .select('id','name', 'next_evolution', explode('prev_evolution').alias('prev_evolution'))
    exp_df.show()
    
    +---+----------+-----------------+--------------------+
    | id|      name|   next_evolution|      prev_evolution|
    +---+----------+-----------------+--------------------+
    |  2|   Ivysaur|  [Venusaur, 003]|    [Bulbasaur, 001]|
    |  5|Charmeleon| [Charizard, 006]|   [Charmander, 004]|
    |  8| Wartortle| [Blastoise, 009]|     [Squirtle, 007]|
    | 11|   Metapod|[Butterfree, 012]|     [Caterpie, 010]|
    | 14|    Kakuna|  [Beedrill, 015]|       [Weedle, 013]|
    | 17| Pidgeotto|   [Pidgeot, 018]|       [Pidgey, 016]|
    | 30|  Nidorina| [Nidoqueen, 031]|[Nidoran(Female),...|
    | 33|  Nidorino|  [Nidoking, 034]|[Nidoran(Male), 032]|
    | 44|     Gloom| [Vileplume, 045]|       [Oddish, 043]|
    | 61| Poliwhirl| [Poliwrath, 062]|      [Poliwag, 060]|
    | 64|   Kadabra|  [Alakazam, 065]|         [Abra, 063]|
    | 67|   Machoke|   [Machamp, 068]|       [Machop, 066]|
    | 70|Weepinbell|[Victreebel, 071]|   [Bellsprout, 069]|
    | 75|  Graveler|     [Golem, 076]|      [Geodude, 074]|
    | 93|   Haunter|    [Gengar, 094]|       [Gastly, 092]|
    |148| Dragonair| [Dragonite, 149]|      [Dratini, 147]|
    +---+----------+-----------------+--------------------+
    


    Final Exploded Data looks like this,

    exp_df.select('id','name', col('next_evolution.name').alias('next_name'), col('next_evolution.num')\
                  .alias('next_num'), col('prev_evolution.name').alias('prev_name')\
                  , col('prev_evolution.num').alias('prev_num')).show(10)
    
    +---+----------+----------+--------+---------------+--------+
    | id|      name| next_name|next_num|      prev_name|prev_num|
    +---+----------+----------+--------+---------------+--------+
    |  2|   Ivysaur|  Venusaur|     003|      Bulbasaur|     001|
    |  5|Charmeleon| Charizard|     006|     Charmander|     004|
    |  8| Wartortle| Blastoise|     009|       Squirtle|     007|
    | 11|   Metapod|Butterfree|     012|       Caterpie|     010|
    | 14|    Kakuna|  Beedrill|     015|         Weedle|     013|
    | 17| Pidgeotto|   Pidgeot|     018|         Pidgey|     016|
    | 30|  Nidorina| Nidoqueen|     031|Nidoran(Female)|     029|
    | 33|  Nidorino|  Nidoking|     034|  Nidoran(Male)|     032|
    | 44|     Gloom| Vileplume|     045|         Oddish|     043|
    | 61| Poliwhirl| Poliwrath|     062|        Poliwag|     060|
    +---+----------+----------+--------+---------------+--------+
    only showing top 10 rows
    


    explode_outer

    Note: explode ignores elements which are null. Hence you might’ve noticed some elements missing from the above flattened data. This can be mitigated by using explode_outer to include even the nulls if you wish to. Below example illustrates the usage of explode_outer

    from pyspark.sql.functions import explode_outer, col
    exp_out_df = df.select('id','name',explode_outer('next_evolution').alias('next_evolution'), 'prev_evolution')\
    .select('id','name', 'next_evolution', explode_outer('prev_evolution').alias('prev_evolution'))
    exp_out_df.show()
    
    +---+----------+-----------------+-----------------+
    | id|      name|   next_evolution|   prev_evolution|
    +---+----------+-----------------+-----------------+
    |  1| Bulbasaur|   [Ivysaur, 002]|             null|
    |  1| Bulbasaur|  [Venusaur, 003]|             null|
    |  2|   Ivysaur|  [Venusaur, 003]| [Bulbasaur, 001]|
    |  3|  Venusaur|             null| [Bulbasaur, 001]|
    |  3|  Venusaur|             null|   [Ivysaur, 002]|
    |  4|Charmander|[Charmeleon, 005]|             null|
    |  4|Charmander| [Charizard, 006]|             null|
    |  5|Charmeleon| [Charizard, 006]|[Charmander, 004]|
    |  6| Charizard|             null|[Charmander, 004]|
    |  6| Charizard|             null|[Charmeleon, 005]|
    |  7|  Squirtle| [Wartortle, 008]|             null|
    |  7|  Squirtle| [Blastoise, 009]|             null|
    |  8| Wartortle| [Blastoise, 009]|  [Squirtle, 007]|
    |  9| Blastoise|             null|  [Squirtle, 007]|
    |  9| Blastoise|             null| [Wartortle, 008]|
    | 10|  Caterpie|   [Metapod, 011]|             null|
    | 10|  Caterpie|[Butterfree, 012]|             null|
    | 11|   Metapod|[Butterfree, 012]|  [Caterpie, 010]|
    | 12|Butterfree|             null|  [Caterpie, 010]|
    | 12|Butterfree|             null|   [Metapod, 011]|
    +---+----------+-----------------+-----------------+
    only showing top 20 rows
    


    Flattening Array of Struct - Spark SQL - Simpler way

    The Spark SQL Approach to flatten multiple array of struct elements is a much simpler and cleaner way to explode and select the struct elements.
    Here, we will use the lateral view outer explode function to pick all the elements including the nulls.

    df.createOrReplaceTempView('pokedex')
    flat_df = spark.sql("""select id,
    name,
    a.name as next_name,
    a.num as next_num,
    b.name as prev_name,
    b.num as prev_num
    from pokedex
    lateral view outer explode(next_evolution)tmp1 as a
    lateral view outer explode(prev_evolution)tmp2 as b
    """)
    flat_df.show()
    
    +---+----------+----------+--------+----------+--------+
    | id|      name| next_name|next_num| prev_name|prev_num|
    +---+----------+----------+--------+----------+--------+
    |  1| Bulbasaur|   Ivysaur|     002|      null|    null|
    |  1| Bulbasaur|  Venusaur|     003|      null|    null|
    |  2|   Ivysaur|  Venusaur|     003| Bulbasaur|     001|
    |  3|  Venusaur|      null|    null| Bulbasaur|     001|
    |  3|  Venusaur|      null|    null|   Ivysaur|     002|
    |  4|Charmander|Charmeleon|     005|      null|    null|
    |  4|Charmander| Charizard|     006|      null|    null|
    |  5|Charmeleon| Charizard|     006|Charmander|     004|
    |  6| Charizard|      null|    null|Charmander|     004|
    |  6| Charizard|      null|    null|Charmeleon|     005|
    |  7|  Squirtle| Wartortle|     008|      null|    null|
    |  7|  Squirtle| Blastoise|     009|      null|    null|
    |  8| Wartortle| Blastoise|     009|  Squirtle|     007|
    |  9| Blastoise|      null|    null|  Squirtle|     007|
    |  9| Blastoise|      null|    null| Wartortle|     008|
    | 10|  Caterpie|   Metapod|     011|      null|    null|
    | 10|  Caterpie|Butterfree|     012|      null|    null|
    | 11|   Metapod|Butterfree|     012|  Caterpie|     010|
    | 12|Butterfree|      null|    null|  Caterpie|     010|
    | 12|Butterfree|      null|    null|   Metapod|     011|
    +---+----------+----------+--------+----------+--------+
    only showing top 20 rows
    


    How to Save the Flattened Data ?

    You can save the above flattened data into any type of structured source such as a table or delimited text files or even parquet files. Below example illustrates how the above dataframe can be written to a single pipe delimited text file.

    flat_df.repartition(1).write.format('csv').option('delimiter', '|').save('/path-to-file/pokedex_flat.csv')
    


    The above flattened data can be saved into a Hive Table, as well, using the below method,

    df.write.saveAsTable('pokedex')
    

    Alternate method to strip first few elements from JSON in Spark

    As discussed in the start of this post, we can remove the initial root element from the JSON, using the below steps.
    Let’s read our JSON File first.

    df2 = spark.read.json('/path-to-file/pokedex.json', multiLine=True)
    df2.printSchema()
    
    root
     |-- pokemon: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- avg_spawns: double (nullable = true)
     |    |    |-- candy: string (nullable = true)
     |    |    |-- candy_count: long (nullable = true)
     |    |    |-- egg: string (nullable = true)
     |    |    |-- height: string (nullable = true)
     |    |    |-- id: long (nullable = true)
     |    |    |-- img: string (nullable = true)
     |    |    |-- multipliers: array (nullable = true)
     |    |    |    |-- element: double (containsNull = true)
     |    |    |-- name: string (nullable = true)
     |    |    |-- next_evolution: array (nullable = true)
     |    |    |    |-- element: struct (containsNull = true)
     |    |    |    |    |-- name: string (nullable = true)
     |    |    |    |    |-- num: string (nullable = true)
     |    |    |-- num: string (nullable = true)
     |    |    |-- prev_evolution: array (nullable = true)
     |    |    |    |-- element: struct (containsNull = true)
     |    |    |    |    |-- name: string (nullable = true)
     |    |    |    |    |-- num: string (nullable = true)
     |    |    |-- spawn_chance: double (nullable = true)
     |    |    |-- spawn_time: string (nullable = true)
     |    |    |-- type: array (nullable = true)
     |    |    |    |-- element: string (containsNull = true)
     |    |    |-- weaknesses: array (nullable = true)
     |    |    |    |-- element: string (containsNull = true)
     |    |    |-- weight: string (nullable = true)
    


    As you can see above, the root element ‘pokemon’ can be removed so that our data flattening becomes easier.
    Since this element is an array, a simple explode function can be called on it to break the array into different individual rows (i.e) 151 rows would be generated. Also the pokemon element would have a struct datatype now since the data has been exploded.

    df3 = df2.select(explode('pokemon').alias('pokemon'))
    df3.printSchema()
    
    root
     |-- pokemon: struct (nullable = true)
     |    |-- avg_spawns: double (nullable = true)
     |    |-- candy: string (nullable = true)
     |    |-- candy_count: long (nullable = true)
     |    |-- egg: string (nullable = true)
     |    |-- height: string (nullable = true)
     |    |-- id: long (nullable = true)
     |    |-- img: string (nullable = true)
     |    |-- multipliers: array (nullable = true)
     |    |    |-- element: double (containsNull = true)
     |    |-- name: string (nullable = true)
     |    |-- next_evolution: array (nullable = true)
     |    |    |-- element: struct (containsNull = true)
     |    |    |    |-- name: string (nullable = true)
     |    |    |    |-- num: string (nullable = true)
     |    |-- num: string (nullable = true)
     |    |-- prev_evolution: array (nullable = true)
     |    |    |-- element: struct (containsNull = true)
     |    |    |    |-- name: string (nullable = true)
     |    |    |    |-- num: string (nullable = true)
     |    |-- spawn_chance: double (nullable = true)
     |    |-- spawn_time: string (nullable = true)
     |    |-- type: array (nullable = true)
     |    |    |-- element: string (containsNull = true)
     |    |-- weaknesses: array (nullable = true)
     |    |    |-- element: string (containsNull = true)
     |    |-- weight: string (nullable = true)
    
    df3.count()
    
    151
    


    Now let’s strip this element off and get our desired schema. We can use the dot “.” notation to get the elements of the struct. Since we are going to pick all elements here, we can use a simple ‘*’ to do that.

    df3.select('pokemon.*').printSchema()
    
    root
     |-- avg_spawns: double (nullable = true)
     |-- candy: string (nullable = true)
     |-- candy_count: long (nullable = true)
     |-- egg: string (nullable = true)
     |-- height: string (nullable = true)
     |-- id: long (nullable = true)
     |-- img: string (nullable = true)
     |-- multipliers: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- name: string (nullable = true)
     |-- next_evolution: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- name: string (nullable = true)
     |    |    |-- num: string (nullable = true)
     |-- num: string (nullable = true)
     |-- prev_evolution: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- name: string (nullable = true)
     |    |    |-- num: string (nullable = true)
     |-- spawn_chance: double (nullable = true)
     |-- spawn_time: string (nullable = true)
     |-- type: array (nullable = true)
     |    |-- element: string (containsNull = true)
     |-- weaknesses: array (nullable = true)
     |    |-- element: string (containsNull = true)
     |-- weight: string (nullable = true)
    

    Conclusion

    By now you should be familiar with the complex datatypes in spark and how to perform operations on the same.

    In addition to this, hope this post gave a lucid understanding of json parsing and flattening in spark using both the Dataframe and Spark SQL. Please write down your comments or any feedback on the comment section present below. Cheers!

    comments powered by Disqus