{
"cells": [
{
"cell_type": "markdown",
"id": "postal-marketing",
"metadata": {},
"source": [
"```{figure} ../images/banner.png\n",
"---\n",
"align: center\n",
"name: banner\n",
"---\n",
"```"
]
},
{
"cell_type": "markdown",
"id": "nervous-tension",
"metadata": {},
"source": [
""
]
},
{
"cell_type": "markdown",
"id": "lesbian-discussion",
"metadata": {},
"source": [
"# Chapter 2 : DataFrames"
]
},
{
"cell_type": "markdown",
"id": "arctic-sympathy",
"metadata": {},
"source": [
"## Learning Objectives"
]
},
{
"cell_type": "markdown",
"id": "closing-program",
"metadata": {},
"source": [
"- Understand dataframe basics.\n",
"- Create syntax to create spark dataframe from different data sources.\n",
"- Basic dataframe operations."
]
},
{
"cell_type": "markdown",
"id": "violent-suicide",
"metadata": {},
"source": [
"## Chapter Outline\n",
"\n",
"- [1. What is spark dataframe](#1)\n",
"- [2. Creating a spark dataframe](#2)\n",
" - [2a. from RDD](#3)\n",
" - [2b. from List](#4)\n",
" - [2c. from pandas dataframe](#5)\n",
"- [3. Basic dataframe operations](#6)\n",
" - [3a. Selecting columns](#7)\n",
" - [3b. Renaming a column](#8)\n",
" - [3c. Deleting a column](#9) \n",
" - [3d. Renaming a column](#10)\n",
" - [3e. Changing the data type of a column](#11)\n",
" - [3f. Filtering the data](#12)"
]
},
{
"cell_type": "code",
"execution_count": 66,
"id": "appropriate-worst",
"metadata": {
"hide_input": true,
"tags": [
"hide_input"
]
},
"outputs": [],
"source": [
"\n",
"from IPython.display import display_html\n",
"import pandas as pd \n",
"import numpy as np\n",
"def display_side_by_side(*args):\n",
" html_str=''\n",
" for df in args:\n",
" html_str+=df.to_html(index=False)\n",
" html_str+= \"\\xa0\\xa0\\xa0\"*10\n",
" display_html(html_str.replace('table','table style=\"display:inline\"'),raw=True)\n",
"space = \"\\xa0\" * 10"
]
},
{
"cell_type": "markdown",
"id": "incorporated-struggle",
"metadata": {},
"source": [
"## Chapter Outline - Gallery"
]
},
{
"cell_type": "markdown",
"id": "mediterranean-copper",
"metadata": {},
"source": [
"click on | any image\n",
"---: |:--- \n",
"[](#3) | [](#4)\n",
"[](#5) | [](#7)\n",
"[](#8) | [](#9)\n",
"[](#10) | [](#11)\n",
"[](#12) |\n"
]
},
{
"cell_type": "markdown",
"id": "express-criticism",
"metadata": {},
"source": [
""
]
},
{
"cell_type": "markdown",
"id": "dramatic-spare",
"metadata": {},
"source": [
"## 1. What is spark dataframe?\n",
"A DataFrame simply represents a table of data with rows and columns. A simple analogy would be a spreadsheet with named columns.\n",
"\n",
"Spark DataFrame is a distributed collection of data organized into named columns. It can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDD, Lists, Pandas data frame. \n",
"\n",
"- Immutable in nature : We can create DataFrame once but can’t change it. And we can transform a DataFrame after applying transformations.\n",
"- Lazy Evaluations: This means that a task is not executed until an action is performed.\n",
"- Distributed: DataFrame is distributed in nature.\n",
"\n",
"All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently.\n",
"\n",
"\n",
"Advantages of lazy evaluation.\n",
"\n",
"- It is an optimization technique i.e. it provides optimization by reducing the number of queries.\n",
"\n",
"- It saves the round trips between driver and cluster, thus speeds up the process. \n",
"- Saves Computation and increases Speed\n",
" \n",
"### Why are DataFrames Useful ?\n",
"\n",
"- DataFrames are designed for processing large collections of structured or semi-structured data.\n",
"Observations in Spark DataFrame are organised under named columns, which helps Apache Spark to understand the schema of a DataFrame. This helps Spark optimize execution plan on these queries.\n",
"- DataFrame in Apache Spark has the ability to handle petabytes of data.\n",
"- DataFrame has a support for a wide range of data formats and sources.\n",
"```{figure} img/chapter2/0.png\n",
"---\n",
"align: center\n",
"---\n",
"```\n",
"\n",
"All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). \n"
]
},
{
"cell_type": "markdown",
"id": "grateful-letters",
"metadata": {},
"source": [
""
]
},
{
"cell_type": "markdown",
"id": "studied-arrow",
"metadata": {},
"source": [
"## 2. Creating a spark dataframe \n",
"\n",
"Lets first understand the syntax\n",
"\n",
"```{admonition} Syntax\n",
"createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)\n",
"\n",
"Parameters:\n",
"\n",
"data – RDD,list, or pandas.DataFrame.\n",
"\n",
"schema – a pyspark.sql.types.DataType or a datatype string or a list of column names, default is None. \n",
"\n",
"samplingRatio – the sample ratio of rows used for inferring\n",
"\n",
"verifySchema – verify data types of every row against schema.\n",
"```"
]
},
{
"cell_type": "markdown",
"id": "seven-haven",
"metadata": {},
"source": [
""
]
},
{
"cell_type": "markdown",
"id": "local-latest",
"metadata": {},
"source": [
"### 2a. from RDD\n",
"\n",
"\n",
"```{figure} img/chapter2/1.png\n",
"---\n",
"align: center\n",
"---\n",
"```\n",
"\n",
"What is RDD?\n",
"\n",
"Resilient Distributed Datasets (RDDs)\n",
"\n",
"At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. \n",
"\n",
"The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. \n",
"\n",
"RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. \n",
"\n",
"Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.\n",
"\n",
"How to create RDD?"
]
},
{
"cell_type": "code",
"execution_count": 67,
"id": "stopped-violin",
"metadata": {},
"outputs": [],
"source": [
"import pyspark\n",
"from pyspark.sql import SparkSession\n",
"spark = SparkSession \\\n",
" .builder \\\n",
" .appName(\"Python Spark SQL basic example\") \\\n",
" .config(\"spark.some.config.option\", \"some-value\") \\\n",
" .getOrCreate()\n",
"rdd_spark = spark.sparkContext.parallelize([('John', 'Seattle', 60, True, 1.7, '1960-01-01'),\n",
" ('Tony', 'Cupertino', 30, False, 1.8, '1990-01-01'),\n",
" ('Mike', 'New York', 40, True, 1.65, '1980-01-01')]).collect()\n"
]
},
{
"cell_type": "code",
"execution_count": 68,
"id": "found-auditor",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('John', 'Seattle', 60, True, 1.7, '1960-01-01'), ('Tony', 'Cupertino', 30, False, 1.8, '1990-01-01'), ('Mike', 'New York', 40, True, 1.65, '1980-01-01')]\n"
]
}
],
"source": [
"print(rdd_spark)"
]
},
{
"cell_type": "code",
"execution_count": 69,
"id": "surprising-fraud",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" 0 | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" [(John, Seattle, 60, True, 1.7, 1960... | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" 0\n",
"0 [(John, Seattle, 60, True, 1.7, 1960..."
]
},
"execution_count": 69,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"pd.set_option('display.max_colwidth', 40)\n",
"pd.DataFrame([[rdd_spark]])"
]
},
{
"cell_type": "markdown",
"id": "devoted-saturday",
"metadata": {},
"source": [
"Creating a spark dataframe from RDD:"
]
},
{
"cell_type": "code",
"execution_count": 70,
"id": "limited-planning",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---------+---+-----+----+----------+\n",
"| _1| _2| _3| _4| _5| _6|\n",
"+----+---------+---+-----+----+----------+\n",
"|John| Seattle| 60| true| 1.7|1960-01-01|\n",
"|Tony|Cupertino| 30|false| 1.8|1990-01-01|\n",
"|Mike| New York| 40| true|1.65|1980-01-01|\n",
"+----+---------+---+-----+----+----------+\n",
"\n"
]
}
],
"source": [
"df = spark.createDataFrame(rdd_spark)\n",
"df.show()"
]
},
{
"cell_type": "code",
"execution_count": 71,
"id": "annual-holly",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Input Output\n"
]
},
{
"data": {
"text/html": [
"\n",
" \n",
" \n",
" 0 | \n",
"
\n",
" \n",
" \n",
" \n",
" [(John, Seattle, 60, True, 1.7, 1960-01-01), (Tony, Cupertino, 30, False, 1.8, 1990-01-01), (Mike, New York, 40, True, 1.65, 1980-01-01)] | \n",
"
\n",
" \n",
"
\n",
" \n",
" \n",
" _1 | \n",
" _2 | \n",
" _3 | \n",
" _4 | \n",
" _5 | \n",
" _6 | \n",
"
\n",
" \n",
" \n",
" \n",
" John | \n",
" Seattle | \n",
" 60 | \n",
" True | \n",
" 1.70 | \n",
" 1960-01-01 | \n",
"
\n",
" \n",
" Tony | \n",
" Cupertino | \n",
" 30 | \n",
" False | \n",
" 1.80 | \n",
" 1990-01-01 | \n",
"
\n",
" \n",
" Mike | \n",
" New York | \n",
" 40 | \n",
" True | \n",
" 1.65 | \n",
" 1980-01-01 | \n",
"
\n",
" \n",
"
"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"pd.set_option('display.max_colwidth', 20)\n",
"print(\"Input \", \"Output\")\n",
"display_side_by_side(pd.DataFrame([[rdd_spark]]),df.toPandas())"
]
},
{
"cell_type": "markdown",
"id": "colonial-giant",
"metadata": {},
"source": [
""
]
},
{
"cell_type": "markdown",
"id": "developmental-secretariat",
"metadata": {},
"source": [
"### 2b. from List"
]
},
{
"cell_type": "markdown",
"id": "flexible-outside",
"metadata": {},
"source": [
"\n",
"```{figure} img/chapter2/2.png\n",
"---\n",
"align: center\n",
"---\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 72,
"id": "lyric-liability",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---------+---+-----+----+----------+\n",
"| _1| _2| _3| _4| _5| _6|\n",
"+----+---------+---+-----+----+----------+\n",
"|John| Seattle| 60| true| 1.7|1960-01-01|\n",
"|Tony|Cupertino| 30|false| 1.8|1990-01-01|\n",
"|Mike| New York| 40| true|1.65|1980-01-01|\n",
"+----+---------+---+-----+----+----------+\n",
"\n"
]
}
],
"source": [
"alist = [('John', 'Seattle', 60, True, 1.7, '1960-01-01'), \n",
"('Tony', 'Cupertino', 30, False, 1.8, '1990-01-01'), \n",
"('Mike', 'New York', 40, True, 1.65, '1980-01-01')]\n",
"spark.createDataFrame(alist).show()"
]
},
{
"cell_type": "markdown",
"id": "starting-basket",
"metadata": {},
"source": [
""
]
},
{
"cell_type": "markdown",
"id": "bigger-accuracy",
"metadata": {},
"source": [
"### 2c. from pandas dataframe"
]
},
{
"cell_type": "markdown",
"id": "beautiful-parts",
"metadata": {},
"source": [
"\n",
"```{figure} img/chapter2/3.png\n",
"---\n",
"align: center\n",
"---\n",
"```"
]
},
{
"cell_type": "markdown",
"id": "crucial-binary",
"metadata": {},
"source": [
"How to create pandas dataframe?"
]
},
{
"cell_type": "code",
"execution_count": 73,
"id": "cubic-services",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" name | \n",
" city | \n",
" age | \n",
" smoker | \n",
" height | \n",
" birthdate | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" John | \n",
" Seattle | \n",
" 60 | \n",
" True | \n",
" 1.70 | \n",
" 1960-01-01 | \n",
"
\n",
" \n",
" 1 | \n",
" Tony | \n",
" Cupertino | \n",
" 30 | \n",
" False | \n",
" 1.80 | \n",
" 1990-01-01 | \n",
"
\n",
" \n",
" 2 | \n",
" Mike | \n",
" New York | \n",
" 40 | \n",
" True | \n",
" 1.65 | \n",
" 1980-01-01 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" name city age smoker height birthdate\n",
"0 John Seattle 60 True 1.70 1960-01-01\n",
"1 Tony Cupertino 30 False 1.80 1990-01-01\n",
"2 Mike New York 40 True 1.65 1980-01-01"
]
},
"execution_count": 73,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import pandas as pd\n",
"df_pd = pd.DataFrame([('John', 'Seattle', 60, True, 1.7, '1960-01-01'), \n",
"('Tony', 'Cupertino', 30, False, 1.8, '1990-01-01'), \n",
"('Mike', 'New York', 40, True, 1.65, '1980-01-01')],columns=[\"name\",\"city\",\"age\",\"smoker\",\"height\", \"birthdate\"])\n",
"df_pd"
]
},
{
"cell_type": "markdown",
"id": "formed-occasions",
"metadata": {},
"source": [
"How to create spark dataframe from pandas dataframe"
]
},
{
"cell_type": "code",
"execution_count": 74,
"id": "neutral-decimal",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---------+---+------+------+----------+\n",
"|name| city|age|smoker|height| birthdate|\n",
"+----+---------+---+------+------+----------+\n",
"|John| Seattle| 60| true| 1.7|1960-01-01|\n",
"|Tony|Cupertino| 30| false| 1.8|1990-01-01|\n",
"|Mike| New York| 40| true| 1.65|1980-01-01|\n",
"+----+---------+---+------+------+----------+\n",
"\n"
]
}
],
"source": [
"spark.createDataFrame(df_pd).show()"
]
},
{
"cell_type": "markdown",
"id": "signed-fruit",
"metadata": {},
"source": [
""
]
},
{
"cell_type": "markdown",
"id": "ignored-audit",
"metadata": {},
"source": [
"## 3. Basic data frame operations"
]
},
{
"cell_type": "markdown",
"id": "configured-samoa",
"metadata": {},
"source": [
""
]
},
{
"cell_type": "markdown",
"id": "regional-cheese",
"metadata": {},
"source": [
""
]
},
{
"cell_type": "markdown",
"id": "aggregate-husband",
"metadata": {},
"source": [
"### 3a. How to select columns?"
]
},
{
"cell_type": "markdown",
"id": "exact-discussion",
"metadata": {},
"source": [
"\n",
"```{figure} img/chapter2/4.png\n",
"---\n",
"align: center\n",
"---\n",
"```"
]
},
{
"cell_type": "markdown",
"id": "piano-custom",
"metadata": {},
"source": [
"Input: Spark dataframe"
]
},
{
"cell_type": "code",
"execution_count": 75,
"id": "separated-newspaper",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---------+---+------+------+----------+\n",
"|name| city|age|smoker|height| birthdate|\n",
"+----+---------+---+------+------+----------+\n",
"|John| Seattle| 60| true| 1.7|1960-01-01|\n",
"|Tony|Cupertino| 30| false| 1.8|1990-01-01|\n",
"|Mike| New York| 40| true| 1.65|1980-01-01|\n",
"+----+---------+---+------+------+----------+\n",
"\n"
]
}
],
"source": [
"df = spark.createDataFrame([('John', 'Seattle', 60, True, 1.7, '1960-01-01'), \n",
"('Tony', 'Cupertino', 30, False, 1.8, '1990-01-01'), \n",
"('Mike', 'New York', 40, True, 1.65, '1980-01-01')],['name', 'city', 'age', 'smoker','height', 'birthdate'])\n",
"df.show()"
]
},
{
"cell_type": "markdown",
"id": "fixed-homeless",
"metadata": {},
"source": [
"Output: spark dataframe with selected columns"
]
},
{
"cell_type": "markdown",
"id": "transsexual-behavior",
"metadata": {},
"source": [
" Method#1"
]
},
{
"cell_type": "code",
"execution_count": 76,
"id": "visible-plane",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---------+---+\n",
"|name| city|age|\n",
"+----+---------+---+\n",
"|John| Seattle| 60|\n",
"|Tony|Cupertino| 30|\n",
"|Mike| New York| 40|\n",
"+----+---------+---+\n",
"\n"
]
}
],
"source": [
"df_out = df.select(\"name\",\"city\",\"age\")\n",
"df_out.show()"
]
},
{
"cell_type": "markdown",
"id": "lyric-lewis",
"metadata": {},
"source": [
" Method#2"
]
},
{
"cell_type": "code",
"execution_count": 77,
"id": "smooth-berry",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---------+---+\n",
"|name| city|age|\n",
"+----+---------+---+\n",
"|John| Seattle| 60|\n",
"|Tony|Cupertino| 30|\n",
"|Mike| New York| 40|\n",
"+----+---------+---+\n",
"\n"
]
}
],
"source": [
"df.select(df.name,df.city,df.age).show()"
]
},
{
"cell_type": "markdown",
"id": "critical-disability",
"metadata": {},
"source": [
" Method#3"
]
},
{
"cell_type": "code",
"execution_count": 78,
"id": "pressed-arena",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---------+---+\n",
"|name| city|age|\n",
"+----+---------+---+\n",
"|John| Seattle| 60|\n",
"|Tony|Cupertino| 30|\n",
"|Mike| New York| 40|\n",
"+----+---------+---+\n",
"\n"
]
}
],
"source": [
"from pyspark.sql.functions import col\n",
"df.select(col(\"name\"),col(\"city\"),col(\"age\")).show()"
]
},
{
"cell_type": "markdown",
"id": "comprehensive-accountability",
"metadata": {},
"source": [
" Summary"
]
},
{
"cell_type": "code",
"execution_count": 79,
"id": "flush-hudson",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Input Output\n"
]
},
{
"data": {
"text/html": [
"\n",
" \n",
" \n",
" name | \n",
" city | \n",
" age | \n",
" smoker | \n",
" height | \n",
" birthdate | \n",
"
\n",
" \n",
" \n",
" \n",
" John | \n",
" Seattle | \n",
" 60 | \n",
" True | \n",
" 1.70 | \n",
" 1960-01-01 | \n",
"
\n",
" \n",
" Tony | \n",
" Cupertino | \n",
" 30 | \n",
" False | \n",
" 1.80 | \n",
" 1990-01-01 | \n",
"
\n",
" \n",
" Mike | \n",
" New York | \n",
" 40 | \n",
" True | \n",
" 1.65 | \n",
" 1980-01-01 | \n",
"
\n",
" \n",
"
\n",
" \n",
" \n",
" name | \n",
" city | \n",
" age | \n",
"
\n",
" \n",
" \n",
" \n",
" John | \n",
" Seattle | \n",
" 60 | \n",
"
\n",
" \n",
" Tony | \n",
" Cupertino | \n",
" 30 | \n",
"
\n",
" \n",
" Mike | \n",
" New York | \n",
" 40 | \n",
"
\n",
" \n",
"
"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"print(\"Input \", \"Output\")\n",
"display_side_by_side(df.toPandas(),df_out.toPandas())"
]
},
{
"cell_type": "markdown",
"id": "durable-smoke",
"metadata": {},
"source": [
""
]
},
{
"cell_type": "markdown",
"id": "incredible-welcome",
"metadata": {},
"source": [
"### 3b. How to rename columns?"
]
},
{
"cell_type": "markdown",
"id": "respected-muslim",
"metadata": {},
"source": [
"\n",
"```{figure} img/chapter2/5.png\n",
"---\n",
"align: center\n",
"---\n",
"```"
]
},
{
"cell_type": "markdown",
"id": "periodic-transport",
"metadata": {},
"source": [
"Input: Spark dataframe with one of the column \"age\""
]
},
{
"cell_type": "code",
"execution_count": 80,
"id": "dedicated-fantasy",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---------+---+------+------+----------+\n",
"|name| city|age|smoker|height| birthdate|\n",
"+----+---------+---+------+------+----------+\n",
"|John| Seattle| 60| true| 1.7|1960-01-01|\n",
"|Tony|Cupertino| 30| false| 1.8|1990-01-01|\n",
"|Mike| New York| 40| true| 1.65|1980-01-01|\n",
"+----+---------+---+------+------+----------+\n",
"\n"
]
}
],
"source": [
"df = spark.createDataFrame([('John', 'Seattle', 60, True, 1.7, '1960-01-01'), \n",
"('Tony', 'Cupertino', 30, False, 1.8, '1990-01-01'), \n",
"('Mike', 'New York', 40, True, 1.65, '1980-01-01')],['name', 'city', 'age', 'smoker','height', 'birthdate'])\n",
"df.show()"
]
},
{
"cell_type": "markdown",
"id": "severe-lithuania",
"metadata": {},
"source": [
"Output: spark dataframe with column \"age\" renamed to \"age_in_years\""
]
},
{
"cell_type": "code",
"execution_count": 81,
"id": "north-alaska",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---------+---------+------------+\n",
"|firstname|birthcity|age_in_years|\n",
"+---------+---------+------------+\n",
"| John| Seattle| 60|\n",
"| Tony|Cupertino| 30|\n",
"| Mike| New York| 40|\n",
"+---------+---------+------------+\n",
"\n"
]
}
],
"source": [
"df.select(col(\"name\").alias(\"firstname\"),df.city.alias(\"birthcity\"),df.age.alias(\"age_in_years\")).show()"
]
},
{
"cell_type": "code",
"execution_count": 82,
"id": "silver-vatican",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---------+------------+------+------+----------+\n",
"|name| city|age_in_years|smoker|height| birthdate|\n",
"+----+---------+------------+------+------+----------+\n",
"|John| Seattle| 60| true| 1.7|1960-01-01|\n",
"|Tony|Cupertino| 30| false| 1.8|1990-01-01|\n",
"|Mike| New York| 40| true| 1.65|1980-01-01|\n",
"+----+---------+------------+------+------+----------+\n",
"\n"
]
}
],
"source": [
"df_rename = df.withColumnRenamed(\"age\",\"age_in_years\")\n",
"df_rename.show()"
]
},
{
"cell_type": "code",
"execution_count": 83,
"id": "intensive-kelly",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Input Output\n"
]
},
{
"data": {
"text/html": [
"\n",
" \n",
" \n",
" name | \n",
" city | \n",
" age | \n",
" smoker | \n",
" height | \n",
" birthdate | \n",
"
\n",
" \n",
" \n",
" \n",
" John | \n",
" Seattle | \n",
" 60 | \n",
" True | \n",
" 1.70 | \n",
" 1960-01-01 | \n",
"
\n",
" \n",
" Tony | \n",
" Cupertino | \n",
" 30 | \n",
" False | \n",
" 1.80 | \n",
" 1990-01-01 | \n",
"
\n",
" \n",
" Mike | \n",
" New York | \n",
" 40 | \n",
" True | \n",
" 1.65 | \n",
" 1980-01-01 | \n",
"
\n",
" \n",
"
\n",
" \n",
" \n",
" name | \n",
" city | \n",
" age_in_years | \n",
" smoker | \n",
" height | \n",
" birthdate | \n",
"
\n",
" \n",
" \n",
" \n",
" John | \n",
" Seattle | \n",
" 60 | \n",
" True | \n",
" 1.70 | \n",
" 1960-01-01 | \n",
"
\n",
" \n",
" Tony | \n",
" Cupertino | \n",
" 30 | \n",
" False | \n",
" 1.80 | \n",
" 1990-01-01 | \n",
"
\n",
" \n",
" Mike | \n",
" New York | \n",
" 40 | \n",
" True | \n",
" 1.65 | \n",
" 1980-01-01 | \n",
"
\n",
" \n",
"
"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"print(\"Input \", \"Output\")\n",
"display_side_by_side(df.toPandas(),df_rename.toPandas())"
]
},
{
"cell_type": "markdown",
"id": "ignored-clerk",
"metadata": {},
"source": [
""
]
},
{
"cell_type": "markdown",
"id": "dated-prescription",
"metadata": {},
"source": [
"### 3c. How to add new columns?"
]
},
{
"cell_type": "markdown",
"id": "activated-consultation",
"metadata": {},
"source": [
"\n",
"```{figure} img/chapter2/6.png\n",
"---\n",
"align: center\n",
"---\n",
"```"
]
},
{
"cell_type": "markdown",
"id": "separated-contest",
"metadata": {},
"source": [
"Input: Spark dataframe"
]
},
{
"cell_type": "code",
"execution_count": 84,
"id": "chronic-typing",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---------+---+------+------+----------+\n",
"|name| city|age|smoker|height| birthdate|\n",
"+----+---------+---+------+------+----------+\n",
"|John| Seattle| 60| true| 1.7|1960-01-01|\n",
"|Tony|Cupertino| 30| false| 1.8|1990-01-01|\n",
"|Mike| New York| 40| true| 1.65|1980-01-01|\n",
"+----+---------+---+------+------+----------+\n",
"\n"
]
}
],
"source": [
"df = spark.createDataFrame([('John', 'Seattle', 60, True, 1.7, '1960-01-01'), \n",
"('Tony', 'Cupertino', 30, False, 1.8, '1990-01-01'), \n",
"('Mike', 'New York', 40, True, 1.65, '1980-01-01')],['name', 'city', 'age', 'smoker','height', 'birthdate'])\n",
"df.show()"
]
},
{
"cell_type": "markdown",
"id": "written-furniture",
"metadata": {},
"source": [
"Output: spark dataframe with a new column added"
]
},
{
"cell_type": "markdown",
"id": "published-dressing",
"metadata": {},
"source": [
"Method#1"
]
},
{
"cell_type": "code",
"execution_count": 85,
"id": "bottom-salmon",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---------+---+------+------+----------+-------------+\n",
"|name| city|age|smoker|height| birthdate|ageplusheight|\n",
"+----+---------+---+------+------+----------+-------------+\n",
"|John| Seattle| 60| true| 1.7|1960-01-01| 61.7|\n",
"|Tony|Cupertino| 30| false| 1.8|1990-01-01| 31.8|\n",
"|Mike| New York| 40| true| 1.65|1980-01-01| 41.65|\n",
"+----+---------+---+------+------+----------+-------------+\n",
"\n"
]
}
],
"source": [
"df.withColumn(\"ageplusheight\", df.age+df.height).show()"
]
},
{
"cell_type": "markdown",
"id": "middle-video",
"metadata": {},
"source": [
"Method#2"
]
},
{
"cell_type": "code",
"execution_count": 86,
"id": "removable-porter",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---------+---+------+------+----------+-------------+\n",
"|name| city|age|smoker|height| birthdate|ageplusheight|\n",
"+----+---------+---+------+------+----------+-------------+\n",
"|John| Seattle| 60| true| 1.7|1960-01-01| 61.7|\n",
"|Tony|Cupertino| 30| false| 1.8|1990-01-01| 31.8|\n",
"|Mike| New York| 40| true| 1.65|1980-01-01| 41.65|\n",
"+----+---------+---+------+------+----------+-------------+\n",
"\n"
]
}
],
"source": [
"df.select(\"*\",(df.age+df.height).alias(\"ageplusheight\")).show()"
]
},
{
"cell_type": "markdown",
"id": "ranking-petroleum",
"metadata": {},
"source": [
"Method#3"
]
},
{
"cell_type": "code",
"execution_count": 87,
"id": "fitted-theater",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---------+---+------+------+----------+----+\n",
"|name| city|age|smoker|height| birthdate|male|\n",
"+----+---------+---+------+------+----------+----+\n",
"|John| Seattle| 60| true| 1.7|1960-01-01|true|\n",
"|Tony|Cupertino| 30| false| 1.8|1990-01-01|true|\n",
"|Mike| New York| 40| true| 1.65|1980-01-01|true|\n",
"+----+---------+---+------+------+----------+----+\n",
"\n"
]
}
],
"source": [
"from pyspark.sql.functions import lit\n",
"df.select(\"*\",lit(\"true\").alias(\"male\")).show()"
]
},
{
"cell_type": "markdown",
"id": "quality-singer",
"metadata": {},
"source": [
""
]
},
{
"cell_type": "markdown",
"id": "improving-gibson",
"metadata": {},
"source": [
"### 3d. How to delete columns?"
]
},
{
"cell_type": "markdown",
"id": "pleased-diesel",
"metadata": {},
"source": [
"\n",
"```{figure} img/chapter2/7.png\n",
"---\n",
"align: center\n",
"---\n",
"```"
]
},
{
"cell_type": "markdown",
"id": "related-parade",
"metadata": {},
"source": [
"Input: Spark dataframe"
]
},
{
"cell_type": "code",
"execution_count": 88,
"id": "caring-estimate",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---------+---+------+------+----------+\n",
"|name| city|age|smoker|height| birthdate|\n",
"+----+---------+---+------+------+----------+\n",
"|John| Seattle| 60| true| 1.7|1960-01-01|\n",
"|Tony|Cupertino| 30| false| 1.8|1990-01-01|\n",
"|Mike| New York| 40| true| 1.65|1980-01-01|\n",
"+----+---------+---+------+------+----------+\n",
"\n"
]
}
],
"source": [
"df = spark.createDataFrame([('John', 'Seattle', 60, True, 1.7, '1960-01-01'), \n",
"('Tony', 'Cupertino', 30, False, 1.8, '1990-01-01'), \n",
"('Mike', 'New York', 40, True, 1.65, '1980-01-01')],['name', 'city', 'age', 'smoker','height', 'birthdate'])\n",
"df.show()"
]
},
{
"cell_type": "markdown",
"id": "automatic-scout",
"metadata": {},
"source": [
"Output: spark dataframe with one of the column deleted"
]
},
{
"cell_type": "markdown",
"id": "civil-middle",
"metadata": {},
"source": [
"dropping a single column"
]
},
{
"cell_type": "code",
"execution_count": 89,
"id": "behavioral-dylan",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---+------+------+----------+\n",
"|name|age|smoker|height| birthdate|\n",
"+----+---+------+------+----------+\n",
"|John| 60| true| 1.7|1960-01-01|\n",
"|Tony| 30| false| 1.8|1990-01-01|\n",
"|Mike| 40| true| 1.65|1980-01-01|\n",
"+----+---+------+------+----------+\n",
"\n"
]
}
],
"source": [
"df.drop('city').show()"
]
},
{
"cell_type": "markdown",
"id": "yellow-procurement",
"metadata": {},
"source": [
"dropping multiple columns"
]
},
{
"cell_type": "code",
"execution_count": 90,
"id": "blind-presentation",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---+------+------+\n",
"|name|age|smoker|height|\n",
"+----+---+------+------+\n",
"|John| 60| true| 1.7|\n",
"|Tony| 30| false| 1.8|\n",
"|Mike| 40| true| 1.65|\n",
"+----+---+------+------+\n",
"\n"
]
}
],
"source": [
"df.drop('city','birthdate').show()"
]
},
{
"cell_type": "markdown",
"id": "congressional-south",
"metadata": {},
"source": [
""
]
},
{
"cell_type": "markdown",
"id": "chubby-source",
"metadata": {},
"source": [
"### 3e. How to change the data type of columns?"
]
},
{
"cell_type": "markdown",
"id": "electric-summer",
"metadata": {},
"source": [
"\n",
"```{figure} img/chapter2/8.png\n",
"---\n",
"align: center\n",
"---\n",
"```"
]
},
{
"cell_type": "markdown",
"id": "defined-education",
"metadata": {},
"source": [
"Input: Spark dataframe with column \"age\" of string type"
]
},
{
"cell_type": "code",
"execution_count": 91,
"id": "extreme-living",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---------+---+------+------+----------+\n",
"|name| city|age|smoker|height| birthdate|\n",
"+----+---------+---+------+------+----------+\n",
"|John| Seattle| 60| true| 1.7|1960-01-01|\n",
"|Tony|Cupertino| 30| false| 1.8|1990-01-01|\n",
"|Mike| New York| 40| true| 1.65|1980-01-01|\n",
"+----+---------+---+------+------+----------+\n",
"\n",
"Data types:\n"
]
},
{
"data": {
"text/plain": [
"[('name', 'string'),\n",
" ('city', 'string'),\n",
" ('age', 'string'),\n",
" ('smoker', 'boolean'),\n",
" ('height', 'double'),\n",
" ('birthdate', 'string')]"
]
},
"execution_count": 91,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = spark.createDataFrame([('John', 'Seattle', \"60\", True, 1.7, '1960-01-01'), \n",
"('Tony', 'Cupertino', \"30\", False, 1.8, '1990-01-01'), \n",
"('Mike', 'New York', \"40\", True, 1.65, '1980-01-01')],['name', 'city', 'age', 'smoker','height', 'birthdate'])\n",
"df.show()\n",
"print(\"Data types:\")\n",
"df.dtypes"
]
},
{
"cell_type": "markdown",
"id": "operational-electronics",
"metadata": {},
"source": [
"Output: spark dataframe with column \"age\" of integer data type"
]
},
{
"cell_type": "code",
"execution_count": 92,
"id": "filled-catering",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---------+---+------+------+----------+-----------+\n",
"|name| city|age|smoker|height| birthdate|age_inttype|\n",
"+----+---------+---+------+------+----------+-----------+\n",
"|John| Seattle| 60| true| 1.7|1960-01-01| 60|\n",
"|Tony|Cupertino| 30| false| 1.8|1990-01-01| 30|\n",
"|Mike| New York| 40| true| 1.65|1980-01-01| 40|\n",
"+----+---------+---+------+------+----------+-----------+\n",
"\n"
]
},
{
"data": {
"text/plain": [
"[('name', 'string'),\n",
" ('city', 'string'),\n",
" ('age', 'string'),\n",
" ('smoker', 'boolean'),\n",
" ('height', 'double'),\n",
" ('birthdate', 'string'),\n",
" ('age_inttype', 'int')]"
]
},
"execution_count": 92,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.select(\"*\",df.age.cast(\"int\").alias(\"age_inttype\")).show()\n",
"df.select(\"*\",df.age.cast(\"int\").alias(\"age_inttype\")).dtypes"
]
},
{
"cell_type": "markdown",
"id": "gentle-trademark",
"metadata": {},
"source": [
""
]
},
{
"cell_type": "markdown",
"id": "hungarian-address",
"metadata": {},
"source": [
"### 3f. How to filter the data?"
]
},
{
"cell_type": "markdown",
"id": "organized-malpractice",
"metadata": {},
"source": [
"\n",
"```{figure} img/chapter2/9.png\n",
"---\n",
"align: center\n",
"---\n",
"```"
]
},
{
"cell_type": "markdown",
"id": "driven-dryer",
"metadata": {},
"source": [
"Input: Spark dataframe"
]
},
{
"cell_type": "code",
"execution_count": 93,
"id": "meaning-fancy",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+---------+---+------+------+----------+\n",
"|name| city|age|smoker|height| birthdate|\n",
"+----+---------+---+------+------+----------+\n",
"|John| Seattle| 60| true| 1.7|1960-01-01|\n",
"|Tony|Cupertino| 30| false| 1.8|1990-01-01|\n",
"|Mike| New York| 40| true| 1.65|1980-01-01|\n",
"+----+---------+---+------+------+----------+\n",
"\n",
"Data types:\n"
]
},
{
"data": {
"text/plain": [
"[('name', 'string'),\n",
" ('city', 'string'),\n",
" ('age', 'string'),\n",
" ('smoker', 'boolean'),\n",
" ('height', 'double'),\n",
" ('birthdate', 'string')]"
]
},
"execution_count": 93,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = spark.createDataFrame([('John', 'Seattle', \"60\", True, 1.7, '1960-01-01'), \n",
"('Tony', 'Cupertino', \"30\", False, 1.8, '1990-01-01'), \n",
"('Mike', 'New York', \"40\", True, 1.65, '1980-01-01')],['name', 'city', 'age', 'smoker','height', 'birthdate'])\n",
"df.show()\n",
"print(\"Data types:\")\n",
"df.dtypes"
]
},
{
"cell_type": "markdown",
"id": "damaged-amsterdam",
"metadata": {},
"source": [
"Output: spark dataframe containing people whose age is more than 39"
]
},
{
"cell_type": "code",
"execution_count": 94,
"id": "simple-henry",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+--------+---+------+------+----------+\n",
"|name| city|age|smoker|height| birthdate|\n",
"+----+--------+---+------+------+----------+\n",
"|John| Seattle| 60| true| 1.7|1960-01-01|\n",
"|Mike|New York| 40| true| 1.65|1980-01-01|\n",
"+----+--------+---+------+------+----------+\n",
"\n"
]
}
],
"source": [
"df.filter(\"age > 39\").show()"
]
},
{
"cell_type": "code",
"execution_count": 95,
"id": "separate-catering",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+--------+---+------+------+----------+\n",
"|name| city|age|smoker|height| birthdate|\n",
"+----+--------+---+------+------+----------+\n",
"|John| Seattle| 60| true| 1.7|1960-01-01|\n",
"|Mike|New York| 40| true| 1.65|1980-01-01|\n",
"+----+--------+---+------+------+----------+\n",
"\n"
]
}
],
"source": [
"df.filter(df.age > 39).show()"
]
}
],
"metadata": {
"celltoolbar": "Tags",
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.5"
}
},
"nbformat": 4,
"nbformat_minor": 5
}