Spark Operations

Spark Operations

Lesson objectives

In this lesson, we will explain the following topics:

  • Understand the two types of Spark operations: transformations and actions.
  • Learn about the immutability of Spark operations and its implications.
  • Explore examples of transformations and actions, including lazy evaluation and its benefits.

Spark Operations

Spark Operations

  • Spark operations on distributed data can be classified into two types: transformations and actions.
  • All spark operations are immutable.

Immutable Objects

  • An object whose state cannot change after it has been constructed is called immutable (unchangeable).1
  • The methods of an immutable object do not modify the state of the object.

Immutable Objects

Spark Dataframe is immutable, and you can’t change its values.
Figure 1: Spark Dataframe is immutable, and you can’t change its values.

Immutable Objects

Filtering a PySpark DataFrame Based on Age
Figure 2: Filtering a PySpark DataFrame Based on Age

Spark Operations: Transformations

  • Transformations: transform a Spark DataFrame into a new DataFrame without altering the original data.
  • Example of Spark transformations: map(), select(), filter(), or drop().

Spark Transformations: What are Lazy Transformations?

  • In Spark, transformations are lazy.
  • This means computations are not executed immediately.
  • Spark builds a DAG (Directed Acyclic Graph) of transformations.
  • All Transformations results are not computed immediately, but they are recorded or remembered as a lineage.

Spark Transformations: Benefits of Lazy Evaluation

  • Optimization: A lineage allows Spark, at a later time in its execution plan, to rearrange certain transformations, coalesce them, or optimize transformations into stages for more efficient execution.
  • Resource Management: Executes tasks efficiently, using fewer resources.
  • Fault Tolerance: Easier to recompute parts of the pipeline if a part fails.

Spark Transformations: Lazy Transformation

  • Consider a dataset with map and filter transformations.
  • Spark does not execute these transformations when they are defined.
  • Transformations are executed when an action (like collect, count) is called.

Lazy Transformations Example

Spark Lazy Transformations Example.
Figure 3: Spark Lazy Transformations Example.

Spark Operations: Actions

  • An action triggers the lazy evaluation of all the recorded transformations.
  • Actions are operations that trigger execution of transformations.
  • They are used to either compute a result to be returned to the Spark driver program or to write data to an external storage system.
  • Actions include operations like count, collect, saveAsTextFile, and take.

Examples of Spark Actions

  • collect(): Collects all elements from the Spark context to the driver program.
  • count(): Returns the number of elements in the dataset.
  • saveAsTextFile(path): Saves the dataset to a text file at the specified path.
  • take(n): Returns an array with the first n elements of the dataset.

DEMO

Immutable RDDs

# Test Immutable RDDs
numbers = [1, 2, 3, 4, 5]
numbers_rdd = sc.parallelize(numbers)
print(f"Original RDD ID: {numbers_rdd.id()}")
print(f"Original RDD ID: {numbers_rdd.id()}")

# # Apply a transformation: multiply each number by 2
transformed_rdd = numbers_rdd.map(lambda x: x * 2)
print(f"Transformed RDD ID: {transformed_rdd.id()}")

# # Collect the results to trigger the computation
result = transformed_rdd.collect()
print(f"Transformed RDD result: {result}")
// Test Immutable RDDs
val numbers = List(1, 2, 3, 4, 5)
val numbersRdd = sc.parallelize(numbers)
println(s"Original RDD ID: ${numbersRdd.id}")
println(s"Original RDD ID: ${numbersRdd.id}")
println(s"Original RDD ID: ${numbersRdd.id}")

// numbersRdd = numbersRdd.map(x => x * 2) //OPS!!!!!!!!!!!

// Apply a transformation: multiply each number by 2
val transformedRdd = numbersRdd.map(x => x * 2)
println(s"Transformed RDD ID: ${transformedRdd.id}")

// Collect the results to trigger the computation
val result = transformedRdd.collect()
println(s"Transformed RDD result: ${result.mkString(", ")}")

Immutable DF Example

# Create an RDD
data = [("John", 28), ("Smith", 44), ("Adam", 65), ("Henry", 23)]
rdd = sc.parallelize(data)

# Show the original RDD
print("Original RDD:")
for row in rdd.collect():
  print(row)
print(f"Original RDD ID: {rdd.id()}")

rdd = rdd.filter(lambda x: x[1] > 30)

print(f"Original RDD ID After filter: {rdd.id()}")

# Filter rows where the age is greater than 30
filtered_rdd = rdd.filter(lambda x: x[1] > 30)
print(f"Transformed RDD ID: {filtered_rdd.id()}")

# Show the transformed RDD
print("Filtered RDD:")
for row in filtered_rdd.collect():
  print(row)
  // Create an RDD
  val data = Seq(("John", 28), ("Smith", 44), ("Adam", 65), ("Henry", 23))
  val rdd = sc.parallelize(data)

  // Show the original RDD
  println("Original RDD:")
  rdd.collect().foreach(println)
  //rdd = rdd.filter{ case (name, age) => age > 30 }
  // // Filter rows where the age is greater than 30
  val filteredRdd = rdd.filter{ case (name, age) => age > 30 }
  println(s"Transformed RDD ID: ${filteredRdd.id}")

  // Show the transformed RDD
  println("Filtered RDD:")
  filteredRdd.collect().foreach(println)

Spark Lazy Evaluation

# Create an RDD
rdd = sc.parallelize([
  ("John", 28),
  ("Smith", 44),
  ("Adam", 65),
  ("Henry", 23)
])

# Apply a map transformation to create a new RDD with a tuple including the name and a boolean flag
# if the person is older than 30
mapped_rdd = rdd.map(lambda x: (x[0], x[1], x[1] > 30))

# Filter the RDD to include only people older than 30
filtered_rdd = mapped_rdd.filter(lambda x: x[2])

# Convert the filtered RDD back to a DataFrame
df = spark.createDataFrame(filtered_rdd, ["Name", "Age", "OlderThan30"])

# Select only the name and age columns
final_df = df.select("Name", "Age")

# # Collect the results which triggers the execution of all transformations
results = final_df.collect()
display(results)

Watch on Youtube

Watch on our Servers

You can download the videog the link and chose save link as: Download Video

Download the code

You can download the Jupyter notebook, Databricks Notebook, or the Python source code using the following links: