Chapter 4 : Data Types & Schema

Chapter Learning Objectives

  • Various Data types in Spark.

  • Use of Schema.

Chapter Outline

# import panel as pn
# css = """
# div.special_table + table, th, td {
#   border: 3px solid orange;
# }
# """
# pn.extension(raw_css=[css])
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
from IPython.display import display_html
import pandas as pd 
import numpy as np
def display_side_by_side(*args,space):
    html_str=''
    for df in args:
        html_str+=df.to_html(index=False)
        html_str+= "\xa0\xa0\xa0"*space
    display_html(html_str.replace('table','table style="display:inline"'),raw=True)
space = "\xa0" * 10

Common data types in spark

Numeric types

Name

Description

Example

IntegerType

Represents 4-byte signed integer numbers.

64

LongType

Represents 8-byte signed integer numbers

1000

FloatType

Represents 4-byte single-precision floating point numbers.

1000.45

DoubleType

Represents 8-byte double-precision floating point numbers.

10000000.45

String type

Name

Description

Example

StringType

Represents character string values.

“tony”

Boolean type

Name

Description

Example

BooleanType

Represents boolean values.

“true”

Datetime type

Name

Description

Example

TimestampType

Represents values comprising values of fields year, month, day, hour, minute, and second, with the session local time-zone. The timestamp value represents an absolute point in time

2019-11-03 05:30:00 UTC-05:00

DateType

Represents values comprising values of fields year, month and day, without a time-zone.

2019-11-03

Complex types

Name

Definition

Description

Example

ArrayType

ArrayType(elementType, containsNull)

Represents values comprising a sequence of elements with the type of elementType. containsNull is used to indicate if elements in a ArrayType value can have null values.

[“tony”,”kent”, “mike”]

MapType

MapType(keyType, valueType, valueContainsNull):

Represents values comprising a set of key-value pairs. The data type of keys is described by keyType and the data type of values is described by valueType. For a MapType value, keys are not allowed to have null values. valueContainsNull is used to indicate if values of a MapType value can have null values.

{“name”:”tony”}

StructType

StructType(fields)

Represents values with the structure described by a sequence of StructFields (fields).StructField(name, dataType, nullable): Represents a field in a StructType. The name of a field is indicated by name. The data type of a field is indicated by dataType. nullable is used to indicate if values of these fields can have null values.

{“name”:”tony”,”age”:30,”city”:””seattle”}

1a. How to get the data type of a column/data frame?

Lets first understand the syntax

Syntax

pyspark.sql.DataFrame.dtypes

Returns all column names and their data types as a list.

‘’’

Input: Spark data frame

df_mul = spark.createDataFrame([('John',  60, True, 1.7, '1960-01-01'), 
('Tony', 30, False, 1.8, '1990-01-01'), 
('Mike',  40, True, 1.65, '1980-01-01')],['name',  'age', 'smoker','height', 'birthdate'])
df_mul.show()
+----+---+------+------+----------+
|name|age|smoker|height| birthdate|
+----+---+------+------+----------+
|John| 60|  true|   1.7|1960-01-01|
|Tony| 30| false|   1.8|1990-01-01|
|Mike| 40|  true|  1.65|1980-01-01|
+----+---+------+------+----------+

Output : Spark data frame column types

df_mul.dtypes
[('name', 'string'),
 ('age', 'bigint'),
 ('smoker', 'boolean'),
 ('height', 'double'),
 ('birthdate', 'string')]

Summary:

print("Input                                        ",            "Output")
display_side_by_side(df_mul.toPandas(),pd.DataFrame([str(df_mul.dtypes[0:6])],columns=["."]),space=1)
Input                                         Output
name age smoker height birthdate
John 60 True 1.70 1960-01-01
Tony 30 False 1.80 1990-01-01
Mike 40 True 1.65 1980-01-01
   
.
[('name', 'string'), ('age', 'bigint'), ('smoker', 'boolean'), ('height', 'double'), ('birthdate', 'string')]
   

1b. How to change the data type of a column?

Lets first understand the syntax

Syntax

pyspark.sql.Column.cast

Convert the column into type dataType.

‘’’

Input: Spark data frame with a column “age” of integer type

df_mul = spark.createDataFrame([('John',  60, True, 1.7), 
('Tony', 30, False, 1.8, ), 
('Mike',  40, True, 1.65, )],['name',  'age', 'smoker','height'])
df_mul.show()
+----+---+------+------+
|name|age|smoker|height|
+----+---+------+------+
|John| 60|  true|   1.7|
|Tony| 30| false|   1.8|
|Mike| 40|  true|  1.65|
+----+---+------+------+
df_mul.dtypes
[('name', 'string'),
 ('age', 'bigint'),
 ('smoker', 'boolean'),
 ('height', 'double')]

Output : Spark data frame with a column with a split string

df_cast = df_mul.select("name",df_mul.age.cast("string").alias('age'), "smoker", "height")
df_cast.show()
+----+---+------+------+
|name|age|smoker|height|
+----+---+------+------+
|John| 60|  true|   1.7|
|Tony| 30| false|   1.8|
|Mike| 40|  true|  1.65|
+----+---+------+------+
df_cast.dtypes
[('name', 'string'),
 ('age', 'string'),
 ('smoker', 'boolean'),
 ('height', 'double')]

Summary:

print("Input                                                       ",            "Output")
display_side_by_side(df_mul.toPandas(),df_cast.toPandas(),space=25)
print("Data types                                                      ",            "Data types")
display_side_by_side(pd.DataFrame([str(df_mul.dtypes)],columns=["."]),pd.DataFrame([str(df_cast.dtypes)],columns=["."]),space=5)
Input                                                        Output
name age smoker height
John 60 True 1.70
Tony 30 False 1.80
Mike 40 True 1.65
                                                                           
name age smoker height
John 60 True 1.70
Tony 30 False 1.80
Mike 40 True 1.65
                                                                           
Data types                                                       Data types
.
[('name', 'string'), ('age', 'bigint'), ('smoker', 'boolean'), ('height', 'double')]
               
.
[('name', 'string'), ('age', 'string'), ('smoker', 'boolean'), ('height', 'double')]
               

what is Schema ?

A schema is the description of the structure of your data

2a. How to get the schema of a data frame?

Lets first understand the syntax

Converts a string expression to upper case.

Syntax

pyspark.sql.DataFrame.schema Returns the schema of this DataFrame as a pyspark.sql.types.StructType.

‘’’

Input: Spark data frame

df_mul = spark.createDataFrame([('John', 'Seattle', 60, True, 1.7, '1960-01-01'), 
('Tony', 'Cupertino', 30, False, 1.8, '1990-01-01'), 
('Mike', 'New York', 40, True, 1.65, '1980-01-01')],['name', 'city', 'age', 'smoker','height', 'birthdate'])
df_mul.show()
+----+---------+---+------+------+----------+
|name|     city|age|smoker|height| birthdate|
+----+---------+---+------+------+----------+
|John|  Seattle| 60|  true|   1.7|1960-01-01|
|Tony|Cupertino| 30| false|   1.8|1990-01-01|
|Mike| New York| 40|  true|  1.65|1980-01-01|
+----+---------+---+------+------+----------+

Output : Schema of Spark data frame

df_mul.schema
StructType(List(StructField(name,StringType,true),StructField(city,StringType,true),StructField(age,LongType,true),StructField(smoker,BooleanType,true),StructField(height,DoubleType,true),StructField(birthdate,StringType,true)))

How to print the schema in a tree format?

df_mul.printSchema()
root
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- age: long (nullable = true)
 |-- smoker: boolean (nullable = true)
 |-- height: double (nullable = true)
 |-- birthdate: string (nullable = true)

2b. How to define a schema?

Lets first understand the syntax

Syntax

pyspark.sql.functions.slice(x, start, length)

Collection function: returns an array containing all the elements in x from index start (array indices start at 1, or from the end if start is negative) with the specified length.

Parameters:

  • x : the array to be sliced

  • start : the starting index

  • length : the length of the slice ‘’’

Example#1

from pyspark.sql.types import *

schema1 = StructType([
StructField("name", StringType(), True), 
StructField("city", StringType(), True), 
StructField("age", IntegerType(), True), 
StructField("smoker", BooleanType(), True), 
StructField("height", FloatType(), True), 
StructField("birthdate", StringType(), True), 
])
schema1
StructType(List(StructField(name,StringType,true),StructField(city,StringType,true),StructField(age,IntegerType,true),StructField(smoker,BooleanType,true),StructField(height,FloatType,true),StructField(birthdate,StringType,true)))
schema1.fieldNames()
['name', 'city', 'age', 'smoker', 'height', 'birthdate']

Example#2

schema2 = StructType([
    StructField("name", StringType()),
    StructField("weight", LongType()),
    StructField("smoker", BooleanType()),
    StructField("height", DoubleType()),
    StructField("birthdate", StringType()),
    StructField("phone_nos", MapType(StringType(),LongType(),True),True),  
    StructField("favorite_colors", ArrayType(StringType(),True),True),  
    StructField("address", StructType([
    StructField("houseno", IntegerType(),True),
    StructField("street", StringType(),True),
    StructField("city", StringType(),True),
    StructField("zipcode", IntegerType(),True),
    ])) 
    
])
print(schema2)
StructType(List(StructField(name,StringType,true),StructField(weight,LongType,true),StructField(smoker,BooleanType,true),StructField(height,DoubleType,true),StructField(birthdate,StringType,true),StructField(phone_nos,MapType(StringType,LongType,true),true),StructField(favorite_colors,ArrayType(StringType,true),true),StructField(address,StructType(List(StructField(houseno,IntegerType,true),StructField(street,StringType,true),StructField(city,StringType,true),StructField(zipcode,IntegerType,true))),true)))
schema2.fieldNames()
['name',
 'weight',
 'smoker',
 'height',
 'birthdate',
 'phone_nos',
 'favorite_colors',
 'address']

2c. How to use the schema?

from pyspark.sql.types import *
from pyspark.sql import functions as func
schema = StructType([
    StructField("name", StringType()),
    StructField("weight", LongType()),
    StructField("smoker", BooleanType()),
    StructField("height", DoubleType()),
    StructField("birthdate", StringType()),
    StructField("phone_nos", MapType(StringType(),LongType(),True),True),  
    StructField("favorite_colors", ArrayType(StringType(),True),True),  
    StructField("address", StructType([
    StructField("houseno", IntegerType(),True),
    StructField("street", StringType(),True),
    StructField("city", StringType(),True),
    StructField("zipcode", IntegerType(),True),
    ])) 
    
])

df = spark.createDataFrame((
    [["john",180,True,1.7,'1960-01-01',{'office': 123456789, 'home': 223456789},["blue","red"],(100,'street1','city1',12345)],
    ["tony",180,True,1.8,'1990-01-01',{'office': 223456789, 'home': 323456789},["green","purple"],(200,'street2','city2',22345)],
    ["mike",180,True,1.65,'1980-01-01',{'office': 323456789, 'home': 423456789},["yellow","orange"],(300,'street3','city3',32345)]]
),schema=schema)
df.toPandas()#(3,False)
df.printSchema()
df.toPandas()
root
 |-- name: string (nullable = true)
 |-- weight: long (nullable = true)
 |-- smoker: boolean (nullable = true)
 |-- height: double (nullable = true)
 |-- birthdate: string (nullable = true)
 |-- phone_nos: map (nullable = true)
 |    |-- key: string
 |    |-- value: long (valueContainsNull = true)
 |-- favorite_colors: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- address: struct (nullable = true)
 |    |-- houseno: integer (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- zipcode: integer (nullable = true)
name weight smoker height birthdate phone_nos favorite_colors address
0 john 180 True 1.70 1960-01-01 {'office': 123456789, 'home': 223456789} [blue, red] (100, street1, city1, 12345)
1 tony 180 True 1.80 1990-01-01 {'office': 223456789, 'home': 323456789} [green, purple] (200, street2, city2, 22345)
2 mike 180 True 1.65 1980-01-01 {'office': 323456789, 'home': 423456789} [yellow, orange] (300, street3, city3, 32345)

2d. How to save the schema?

df = spark.createDataFrame((
    [["john",180,True,1.7,'1960-01-01',{'office': 123456789, 'home': 223456789},["blue","red"],(100,'street1','city1',12345)],
    ["tony",180,True,1.8,'1990-01-01',{'office': 223456789, 'home': 323456789},["green","purple"],(200,'street2','city2',22345)],
    ["mike",180,True,1.65,'1980-01-01',{'office': 323456789, 'home': 423456789},["yellow","orange"],(300,'street3','city3',32345)]]
),schema=schema)
df.printSchema()
root
 |-- name: string (nullable = true)
 |-- weight: long (nullable = true)
 |-- smoker: boolean (nullable = true)
 |-- height: double (nullable = true)
 |-- birthdate: string (nullable = true)
 |-- phone_nos: map (nullable = true)
 |    |-- key: string
 |    |-- value: long (valueContainsNull = true)
 |-- favorite_colors: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- address: struct (nullable = true)
 |    |-- houseno: integer (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- zipcode: integer (nullable = true)
df.schema
StructType(List(StructField(name,StringType,true),StructField(weight,LongType,true),StructField(smoker,BooleanType,true),StructField(height,DoubleType,true),StructField(birthdate,StringType,true),StructField(phone_nos,MapType(StringType,LongType,true),true),StructField(favorite_colors,ArrayType(StringType,true),true),StructField(address,StructType(List(StructField(houseno,IntegerType,true),StructField(street,StringType,true),StructField(city,StringType,true),StructField(zipcode,IntegerType,true))),true)))
spark.conf.set("spark.hadoop.validateOutputSpecs", "false")
rdd_schema = spark.sparkContext.parallelize(df.schema)
#rdd_schema.coalesce(1).saveAsPickleFile("data/text/schema_file")

2e. How to load the saved schema?

schema_rdd = spark.sparkContext.pickleFile("data/text/schema_file")
schema = StructType(schema_rdd.collect())
print(schema)    
StructType(List(StructField(name,StringType,true),StructField(weight,LongType,true),StructField(smoker,BooleanType,true),StructField(height,DoubleType,true),StructField(birthdate,StringType,true),StructField(phone_nos,MapType(StringType,LongType,true),true),StructField(favorite_colors,ArrayType(StringType,true),true),StructField(address,StructType(List(StructField(houseno,IntegerType,true),StructField(street,StringType,true),StructField(city,StringType,true),StructField(zipcode,IntegerType,true))),true)))

2f. How to get the names of all fields in the schema?

df_mul = spark.createDataFrame([('John', 'Seattle', 60, True, 1.7, '1960-01-01'), 
('Tony', 'Cupertino', 30, False, 1.8, '1990-01-01'), 
('Mike', 'New York', 40, True, 1.65, '1980-01-01')],['name', 'city', 'age', 'smoker','height', 'birthdate'])
df_mul.show()
+----+---------+---+------+------+----------+
|name|     city|age|smoker|height| birthdate|
+----+---------+---+------+------+----------+
|John|  Seattle| 60|  true|   1.7|1960-01-01|
|Tony|Cupertino| 30| false|   1.8|1990-01-01|
|Mike| New York| 40|  true|  1.65|1980-01-01|
+----+---------+---+------+------+----------+
df_mul.schema.fieldNames()
['name', 'city', 'age', 'smoker', 'height', 'birthdate']

Summary:

print("input                                    ",            "output")
display_side_by_side(df_mul.toPandas(),pd.DataFrame([[df_mul.schema.fieldNames()]],columns=["."]),space=2)
input                                     output
name city age smoker height birthdate
John Seattle 60 True 1.70 1960-01-01
Tony Cupertino 30 False 1.80 1990-01-01
Mike New York 40 True 1.65 1980-01-01
      
.
[name, city, age, smoker, height, birthdate]