Demo: RDD Operations Part 1

Demo: RDD Operations Part 1

Lesson objectives

In this lesson, we will explain the following topics:

  • Demonstrate the use of Spark RDD APIs, including map, flatMap, filter, reduce, groupBy, groupByKey, and reduceByKey for data transformation, extraction, organization, and reduction.
  • Learn to apply various operations for efficient data processing and aggregation.
  • Showcase how to navigate and utilize the Spark documentation effectively.

DEMO

Transformations

Map Function

# 1. map
print("#####  1. map ###")
print("Description: Return a new RDD by applying a function to all elements of this RDD.")

# Example 1: Multiply each element by 2
simple_map = rdd.map(lambda x: x * 2).collect()
print("01 map example (multiply by 2):", simple_map)

# Example 2: Extract the length of each word in a list of sentences
sentences = ["Hello world", "Apache Spark", "RDD transformations Wide Vs Narrow Spark"]
# Hello World => split (" ") => [(0)-> Hello, (1) -> World]
sentence_rdd = sc.parallelize(sentences)
words_map = sentence_rdd.map(lambda sentence: len(sentence.split(" "))).collect()
print("example_map example (word count in sentences):", words_map)

Filter Function

# 2. filter
print("\n#####  2. filter ###")
print("Description: Return a new RDD containing only the elements that satisfy a predicate.")

# 01 Example: Filter out even numbers
simple_filter = rdd.filter(lambda x: x % 2 == 0).collect()
print("01 filter example (even numbers):", simple_filter)

# example_Example: Filter sentences containing the word 'Spark'
words_filter = sentence_rdd.filter(lambda sentence: "Spark" in sentence).collect()
print("example_ filter example (sentences with 'Spark'):", words_filter)

FlatMap Function

# 3. flatMap
print("\n#####  3. flatMap ###")
print("Description: Return a new RDD by applying a function to all elements of this RDD and then flattening the results.")

# 01 Example: Split sentences into words
sentences_mapped = sentence_rdd.map(lambda sentence: sentence.split(" ")).collect()
print("01 sentences_mapped:", sentences_mapped)

simple_flatMap = sentence_rdd.flatMap(lambda sentence: sentence.split(" ")).collect()
print("02 flatMap example (split sentences into words):", simple_flatMap)

# example_Example: Flatten a list of lists
nested_lists = [[1, 2, 3], [4, 5], [6, 7, 8, 9]]
nested_rdd = sc.parallelize(nested_lists)
flatten_list = nested_rdd.flatMap(lambda x: x).collect()
print("flatten_list  flatMap example (flatten list of lists):", flatten_list)

Reduce Function

# 4. reduce
print("\n#####  4. reduce ###")
print("Description: Reduces the elements of this RDD using the specified commutative and associative binary operator.")

# 01 Example: Sum of elements
simple_reduce = rdd.reduce(lambda x, y: x + y)
print("01 reduce example (sum of elements):", simple_reduce)

# example_Example: Find the longest word in a list of words
words = ["cat", "elephant", "rat", "hippopotamus"]
words_rdd = sc.parallelize(words)
words_rdd_reduced = words_rdd.reduce(lambda x, y: x if len(x) > len(y) else y)
print("reduce example (longest word):", words_rdd_reduced)

groupByKey Function

# 5. groupByKey
print("\n#####  5. groupByKey ###")
print("Description: Group the values for each key in the RDD into a single sequence.")

# 01 Example: Group numbers by even and odd
pairs = [(1, 'a'),(1, 'ali'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')]
pairs_rdd = sc.parallelize(pairs)
simple_groupByKey = pairs_rdd.groupByKey().mapValues(list).collect()
print("01 groupByKey example (group numbers):", simple_groupByKey)

# example_Example: Group words by their starting letter
words_pairs = [("cat", 1), ("car", 2), ("dog", 3), ("deer", 4), ("elephant", 5),("elephant", 20)]
words_rdd = sc.parallelize(words_pairs)
# mapValues(list) converts the grouped values (which are iterable) into lists.
words_grouped = words_rdd.groupByKey().mapValues(list).collect()
print("words_grouped example (group words by starting letter):", words_grouped)

reduceByKey Function

# 6. reduceByKey
print("\n#####  6. reduceByKey ###")
print("Description: Merge the values for each key using an associative and commutative reduce function.")
pairs = [(1, 'a'),(1, '_a'), (2, 'b'), (2, '_b'), (3, 'c'), (4, 'd'), (5, 'e')]
pairs_rdd = sc.parallelize(pairs)

# 01 Example: Sum values with the same key
simple_reduceByKey = pairs_rdd.reduceByKey(lambda x, y: x + y).collect()
print("01 reduceByKey example (sum values by key):", simple_reduceByKey)

# example_Example: Count the occurrences of each word in a list
word_list = ["cat", "cat", "dog", "elephant", "dog", "dog"]
word_pairs_rdd = sc.parallelize(word_list).map(lambda word: (word, 1))
example__reduceByKey = word_pairs_rdd.reduceByKey(lambda x, y: x + y).collect()
print("example_ reduceByKey example (word count):", example__reduceByKey)

join Function

# 7. join
print("\n#####  7. join ###")
print("Description: Perform an inner join of this RDD and another one.")

# 01 Example: Join two RDDs by key
fruits = sc.parallelize([(1, "apple"), (2, "banana")])
colors = sc.parallelize([(1, "red"), (2, "yellow")])
fruits_color_join = fruits.join(colors).collect()
print("01 join fruits_color_join (join two RDDs):", fruits_color_join)

# example_Example: Join employee data with department data
employees = sc.parallelize([(1, "John"), (2, "Jane"), (3, "Joe")])
departments = sc.parallelize([(1, "HR"), (2, "Finance")])
employees_department_join = employees.join(departments).collect()
print("join example (employee-department join):", employees_department_join)

cogroup Function

TableA:

idvalue
1apple
2banana
3orange

TableB:

idcolor
1red
2yellow

Result of cogroup:

idvaluecolor
1applered
2bananayellow
3orangeNULL
# 8. cogroup
# The cogroup function in PySpark is used to group data from two RDDs that share the same key. 
# It combines the values of matching keys from both RDDs into a tuple of lists.
print("\n#####  8. cogroup ###")
print("Description: Group data from two RDDs sharing the same key.")

# 01 Example: Cogroup two RDDs
fruits_rdd = sc.parallelize([(1, "apple"), (2, "banana"), (3, "orange")])
colors_rdd = sc.parallelize([(1, "red"), (2, "yellow")])
cogrouped_fruits_colors = fruits_rdd.cogroup(colors_rdd).mapValues(lambda x: (list(x[0]), list(x[1]))).collect()
print("01 cogroup example (group two RDDs):", cogrouped_fruits_colors)



# example_Example: Cogroup sales data with target data
sales_rdd = sc.parallelize([("store1", 100), ("store2", 200)])
targets_rdd = sc.parallelize([("store1", 150), ("store3", 250)])
cogrouped_sales_targets = sales_rdd.cogroup(targets_rdd).mapValues(lambda x: (list(x[0]), list(x[1]))).collect()
print("example_cogroup example (sales-targets cogroup):", cogrouped_sales_targets)

distinct Function

# 9. distinct
print("\n#####  9. distinct ###")
print("Description: Return a new RDD containing the distinct elements in this RDD.")

# example_Example: Unique words from a list of words
words = ["cat", "dog", "cat", "elephant", "dog"]
words_rdd = sc.parallelize(words)
example__distinct = words_rdd.distinct().collect()
print("example_distinct example (unique words):", example__distinct)

Watch on Youtube

Watch on our Servers

You can download the video by right clicking the link and chose save link as: Download Video

Download the code

You can download the Jupyter notebook, Databricks Notebook, or the Python source code using the following links: