Chapter 13 : User Defined Functions(UDFs)

Chapter Learning Objectives

  • What is User Defined Function and how to use it?.

Chapter Outline

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
from IPython.display import display_html
import pandas as pd 
import numpy as np
def display_side_by_side(*args):
    html_str=''
    for df in args:
        html_str+=df.to_html(index=False)
        html_str+= "\xa0\xa0\xa0"*10
    display_html(html_str.replace('table','table style="display:inline"'),raw=True)
space = "\xa0" * 10
import panel as pn

css = """
div.special_table + table, th, td {
  border: 3px solid orange;
}
"""
pn.extension(raw_css=[css])

1. What is User Defined Function?

User-Defined Functions (UDFs) are user-programmable routines that act on one row. Note: UDF’s are the most expensive operations hence use them only you have no choice and when essential

1a. How to create User-defined functions ?

Method:1

def squared(s):
  return s * s
spark.udf.register("squared", squared)
<function __main__.squared(s)>

You can optionally set the return type of your UDF. The default return type is StringType.

from pyspark.sql.types import LongType
def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared, LongType())
<function __main__.squared(s)>

Method:2

from pyspark.sql.functions import udf
@udf("long")
def squared(s):
  return s * s

Method:3

from pyspark.sql.types import IntegerType
squared = udf(lambda x: x*x, IntegerType())

1b. How to use UDF in data frames?

Input: Spark dataframe

df = spark.createDataFrame([(1,),(2,),(3,)],["value"])
df.show()
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
+-----+

Use:1 Output : Spark dataframe containing transformed column using UDF

df.select(squared(df.value).alias("squared")).show()
+-------+
|squared|
+-------+
|      1|
|      4|
|      9|
+-------+

Use:2 Output : Spark dataframe containing transformed column using UDF

df.withColumn("squared", squared("value")).show()
+-----+-------+
|value|squared|
+-----+-------+
|    1|      1|
|    2|      4|
|    3|      9|
+-----+-------+
df.select(squared(df.value).alias("squared")).show()
+-------+
|squared|
+-------+
|      1|
|      4|
|      9|
+-------+

Evaluation order and null checking

Spark DataFrame API does not guarantee the order of evaluation of subexpressions. In particular, the inputs of an operator or function are not necessarily evaluated left-to-right or in any other fixed order.

df = spark.createDataFrame([(1,),(2,),(3,),(None,)],["value"])
df.show()
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
| null|
+-----+

Below code will fail if you execute

df.select(squared(df.value).alias(“squared”)).show() # fail

from pyspark.sql.types import LongType
def squared(s):
    if s is not None:
        return s * s
spark.udf.register("squaredWithPython", squared, LongType())
<function __main__.squared(s)>
df.select(squared(df.value).alias("squared")).show()
+-------+
|squared|
+-------+
|      1|
|      4|
|      9|
|   null|
+-------+