Demo: Joining RDDs
Lesson objectives
In this lesson, we will explain the following topics:
- Demonstrate the process of joining RDDs in Spark.
- Learn about the different types of joins supported by Spark.
- Explore practical examples of RDD joins and their applications in data processing.
DEMO
Join RDDs
Browse Databricks datasets
#https://docs.databricks.com/en/discover/databricks-datasets.html
display(dbutils.fs.ls('/databricks-datasets'))
TPCH Data Analysis tpch
display(dbutils.fs.ls("/databricks-datasets/tpch/data-001/"))
Analyze Parts
dataset
# Read input part dataset as RDD[String]
part_input = sc.textFile("/databricks-datasets/tpch/data-001/part/")
part_input.take(10)
# Split the input data
part_input_splitted = part_input.map(lambda p: p.split('|'))
part_input_splitted.take(10)
Apply simple filteration for malformed records
Assume any record with more than 9 columns is malformed https://github.com/oracle/heatwave-tpch/blob/main/TPCH/create_tables.sql
CREATE TABLE PART ( P_PARTKEY INTEGER NOT NULL,
P_NAME VARCHAR(55) NOT NULL,
P_MFGR CHAR(25) NOT NULL,
P_BRAND CHAR(10) NOT NULL,
P_TYPE VARCHAR(25) NOT NULL,
P_SIZE INTEGER NOT NULL,
P_CONTAINER CHAR(10) NOT NULL,
P_RETAILPRICE DECIMAL(15,2) NOT NULL,
P_COMMENT VARCHAR(23) NOT NULL,
PRIMARY KEY (P_PARTKEY));
CREATE TABLE PARTSUPP ( PS_PARTKEY INTEGER NOT NULL,
PS_SUPPKEY INTEGER NOT NULL,
PS_AVAILQTY INTEGER NOT NULL,
PS_SUPPLYCOST DECIMAL(15,2) NOT NULL,
PS_COMMENT VARCHAR(199) NOT NULL,
PRIMARY KEY (PS_PARTKEY, PS_SUPPKEY));
PART_SIZE = 10
# Split part input and parse it into RDD[(partKey, Part)]
part_mapped = part_input_splitted \
.filter(lambda arr: len(arr) == PART_SIZE)
part_mapped.take(10)
# Catch the rejected records which aren't matching the case class size
part_rejected = part_input_splitted.filter(lambda arr: len(arr) != PART_SIZE)
#### Any better way to filter?
part_rejected.take(10)
# Display parsed and rejected records count
print(f"Number of rejected records = {part_rejected.count()}")
print(f"Number of parsed records = {part_mapped.count()}")
class Part:
def __init__(self, p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment):
self.p_partkey = p_partkey
self.p_name = p_name
self.p_mfgr = p_mfgr
self.p_brand = p_brand
self.p_type = p_type
self.p_size = p_size
self.p_container = p_container
self.p_retailprice = p_retailprice
self.p_comment = p_comment
def __repr__(self):
return f"Part({self.p_partkey}, {self.p_name}, {self.p_mfgr}, {self.p_brand}, {self.p_type}, {self.p_size}, {self.p_container}, {self.p_retailprice}, {self.p_comment})"
part_transformed = part_mapped.map(lambda arr: (int(arr[0]), Part(int(arr[0]), arr[1], arr[2], arr[3], arr[4], int(arr[5]), arr[6], float(arr[7]), arr[8])))
part_transformed.take(10)
# RDD [(Int, Part)]
Read and Parse the partsupp
Dataset
class PartSupp:
def __init__(self, ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment):
self.ps_partkey = ps_partkey
self.ps_suppkey = ps_suppkey
self.ps_availqty = ps_availqty
self.ps_supplycost = ps_supplycost
self.ps_comment = ps_comment
def __repr__(self):
return f"PartSupp({self.ps_partkey}, {self.ps_suppkey}, {self.ps_availqty}, {self.ps_supplycost}, {self.ps_comment})"
# Read input partSupp dataset as RDD[String]
partsupp_input = sc.textFile("/databricks-datasets/tpch/data-001/partsupp/")
partsupp_input.take(10)
# Split the input data
partsupp_input_splitted = partsupp_input.map(lambda p: p.split('|'))
partsupp_input_splitted.take(10)
PARTSUPP_SIZE = 6
# Split partSupp input and parse it into RDD[(partKey, PartSupp)]
partsupp_mapped = partsupp_input_splitted \
.filter(lambda arr: len(arr) == PARTSUPP_SIZE) \
.map(lambda arr: (int(arr[0]), PartSupp(int(arr[0]), int(arr[1]), int(arr[2]), float(arr[3]), arr[4])))
partsupp_mapped.take(10)
# RDD [ (integer , PartSupp)]
# Catch the rejected records which aren't matching the case class size
partsupp_rejected = partsupp_input_splitted.filter(lambda arr: len(arr) != PARTSUPP_SIZE)
# Display parsed and rejected records count
print(f"Number of rejected records = {partsupp_rejected.count()}")
print(f"Number of parsed records = {partsupp_mapped.count()}")
Join the part and partsupp Datasets
# PartRDD -> RDD[ (integer, Part)]
# PartSuppRDD -> RDD [ (integer, PartSupp)]
# JoinedRDD -> RDD [(integer,(Part,PartSupp)) ]
# Perform inner join on part and partsupp datasets
part_joined_partsupp = part_transformed.join(partsupp_mapped)
# Take the first 10 elements of the joined RDD and print them
# for record in part_joined_partsupp.take(10):
# print(record)
# Print the count of joined records
print(f"Number of joined records = {part_joined_partsupp.count()}")
Watch on Youtube
Watch on our Servers
You can download the video by right clicking the link and chose save link as: Download Video
Download the code
You can download the Jupyter notebook, Databricks Notebook, or the Python source code using the following links: