Monday, 3 January 2022

Shuffle Operation

 Shuffle operation is the most expensive operation in Spark as takes a lot of I/O, networks calls. 

Why we need shuffle operation: 

While running some RDD transformations like groupByKey, Join the data needs to be re-partition. 

For example:

groupByKey expects data of the same key on a single worker node but some of the data might be on other worker nodes, in such cases, data of the same key from all the worker nodes have to move to a single node. This shuffling of data involves transferring data on a network.

import org.apache.spark._
object Shuffle {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext("local[*]", "shuffleOperation")
    val input = List(("Scala", 123), ("R", 100), ("Scala", 199), ("Java", 555), ("Java", 77))
    val rdd = sc.parallelize(input)
    val mapInput =
      rdd.groupByKey()
        .map(x => (x._1, (x._2.size, x._2.sum)))
        .collect()
    mapInput.foreach(println)
    // Just to keep main function running so that we can check spark server at 4200.
    scala.io.StdIn.readLine()
  }
}


DAG for above program



Total time taken by this operation is 4 sec.

In this case all the data of same key moves to respective worked node and then it invokes group by key operations.

We can improve this by using redeuceByKey:

import org.apache.spark._
object ShuffleUsingReduce {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext("local[*]", "shuffleOperation")
    val input = List(("Scala", 123), ("R", 100), ("Scala", 199), ("Java", 555), ("Java", 77))
    val rdd = sc.parallelize(input)
    val mapInput =
      rdd.map(x=> (x._1, (1, x._2)))
        .reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2))
        .collect()
    mapInput.foreach(println)
    // Just to keep main function running so that we can check spark server at 4200.
    scala.io.StdIn.readLine()
  }
}


Total time taken by this operation is 3 sec.

Before transferring the same key data to the respective worker node, reducebykey invokes reduce operation, so the total amount of transferred data is less than groupbykey.

In this case, only distinct/unique key data will be transferred from each node to the respective node. 






2 comments: