Spark – Accumulators and Broadcast Variables

Introduction

As part of this blog post we will see

  • Accumulators - which can be used as counters
    • for data quality
    • unit testing
  • Broadcast Variables - Similar to distributed cache
    • A variable will be broadcasted to all the executors
    • map side joins or look ups

While accumulators are used to get global counters of the application, broadcast variables can be used for tuning some of the applications

Introduction to accumulators

In this topic we will see details about accumulators

  • It is important to perform some counts as part of application for
    • unit testing
    • data quality
  • These counters cannot be global variables as part of the program
  • Instead we need to use accumulator which will be managed by spark
  • Accumulators will be passed to executors and scope is managed across all the executors or executor tasks

We will see

  • how accumulators are implemented
  • issues with accumulators

Implementation of accumulators

Implementation of accumulators

  • Take our program AvgRevenueDaily
  • Add as many accumulators as you desire
    • Create variable like val ordersCompletedAccum = sc.accumulator(0, "ordersCompleted count")
    • Update the appropriate Spark API RDD function to increment accumulator (see the sample code below)
  • Compile the program
  • Build jar and ship it to the remote cluster
  • Run on remote cluster
  • Open spark history server and review the appropriate executor task in which accumulators are implemented
  • You will see counter as part of history server

Sample code

package retail

import org.apache.spark.SparkContext, org.apache.spark.SparkConf
import com.typesafe.config._
import org.apache.hadoop.fs._

object AvgRevenueDailyAccumulators {
  def main(args: Array[String]) {
    val appConf = ConfigFactory.load()
    val conf = new SparkConf().
      setAppName("Average Revenue - Daily - Accumulators").
      setMaster(appConf.getConfig(args(2)).getString("deploymentMaster"))
    val sc = new SparkContext(conf)
    val inputPath = args(0)
    val outputPath = args(1)

    val fs = FileSystem.get(sc.hadoopConfiguration)
    val inputPathExists = fs.exists(new Path(inputPath))
    val outputPathExists = fs.exists(new Path(outputPath))

    if (!inputPathExists) {
      println("Input Path does not exists")
      return
    }

    if (outputPathExists) {
      fs.delete(new Path(outputPath), true)
    }

    val ordersRDD = sc.textFile(inputPath + "/orders")
    val orderItemsRDD = sc.textFile(inputPath + "/order_items")

    val ordersCompletedAccum = sc.accumulator(0, "ordersCompleted count")
    val ordersFilterInvokedAccum = sc.accumulator(0, "orders filter invoked count")
    val ordersCompleted = ordersRDD.
      filter(rec => {
        ordersFilterInvokedAccum += 1
        if (rec.split(",")(3) == "COMPLETE") {
          ordersCompletedAccum += 1
        }
        rec.split(",")(3) == "COMPLETE"
      })

    val ordersAccum = sc.accumulator(0, "orders count")
    val orders = ordersCompleted.
      map(rec => {
        ordersAccum += 1
        (rec.split(",")(0).toInt, rec.split(",")(1))
      })

    val orderItemsMapAccum = sc.accumulator(0, "orderItemsMap count")
    val orderItemsMap = orderItemsRDD.
      map(rec => {
        orderItemsMapAccum += 1
        (rec.split(",")(1).toInt, rec.split(",")(4).toFloat)
      })

    val orderItemsValuesAccum = sc.accumulator(0, "reduceByKey values count")
    val orderItems = orderItemsMap.
      reduceByKey((acc, value) => {
        orderItemsValuesAccum += 1
        acc + value
      })

    val ordersJoin = orders.join(orderItems)

    val ordersJoinMap = ordersJoin.map(rec => (rec._2._1, rec._2._2))

    val revenuePerDay = ordersJoinMap.aggregateByKey((0.0, 0))(
      (acc, value) => (acc._1 + value, acc._2 + 1),
      (total1, total2) => (total1._1 + total2._1, total1._2 + total2._2))

    val averageRevenuePerDay = revenuePerDay.
      map(rec => (rec._1, BigDecimal(rec._2._1 / rec._2._2).
        setScale(2, BigDecimal.RoundingMode.HALF_UP).toFloat))

    val averageRevenuePerDaySorted = averageRevenuePerDay.
      sortByKey()

    averageRevenuePerDaySorted.
      map(rec => rec._1 + "," + rec._2).
      saveAsTextFile(outputPath)

  }
}

Issues with accumulators

Here are some of the known issues with accumulators

  • Unless tasks are executed you will not see details about counters
  • Spark guarantees accumulators to be updated only in first execution
  • If any task is re-executed the results can be inconsistent
  • The issue is prevalent in both transformations and actions

Introduction to broadcast variables

As part of this topic we will see details about Broadcast Variables

  • At times we need to pass (broadcast) some information to all the executors
  • It can be done by using broadcast variables
  • A broadcast variable can be of preliminary type or it could be a hash map
  • Here are few examples 
    • Single value - Common discount percent for all the products
    • Hash map - look up or map side join
  • When very large data set (fact) is tried to join with smaller data set (dimension), broadcasting dimension can have considerable performance improvement.
  • Broadcast variables have to be immutable

Define problem statement and design the application

Get total revenue per department for each day

  • Again we will be using our retail_db database
  • Please refer for data model
  • Department name is in departments
  • To get department name we need to join these tables
    • order_items
    • products
    • categories
    • departments
  • We will first join products, categories and departments 
    • get product_id and department_name
    • broadcast data which contain product_id and department_name
  • Then join orders and order_items
    • Perform simple join between orders and order_items
    • As part of join look up into hashmap with product_id to get department_name
  • Use reduceByKey to compute revenue for each date and department

Broadcast Variables - Implementation

Let us see implementation and execution of application using broadcast variables

  • Read products, categories and departments
    val departments = sc.textFile(inputPath + "/departments")
    val categories = sc.textFile(inputPath + "/categories")
    val products = sc.textFile(inputPath + "/products")
  • Join products, categories and departments
    val departmentsMap = departments.
      map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))
    val categoriesMap = categories.
      map(rec => (rec.split(",")(0).toInt, rec.split(",")(1).toInt))
    val productsMap = products.
      map(rec => (rec.split(",")(1).toInt, rec.split(",")(0).toInt))

    val productCategories = productsMap.join(categoriesMap)
    val productCategoriesMap = productCategories.
      map(rec => (rec._2._2, rec._2._1))
    val productDepartments = productCategoriesMap.join(departmentsMap)
  • Build hash map and create broadcast variable
    val productDepartmentsMap = productDepartments.
      map(rec => (rec._2._1, rec._2._2)).
      distinct

    val bv = sc.broadcast(productDepartmentsMap.collectAsMap())
  • Read orders and order_items
    val orders = sc.textFile(inputPath + "/orders")
    val orderItems = sc.textFile(inputPath + "/order_items")
  • Filter for completed orders and extract required fields from orders
    val ordersCompleted = orders.
      filter(rec => (rec.split(",")(3) == "COMPLETE")).
      map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))
  • Extract required fields from order_items while looking up into hash map to get department name
    val orderItemsMap = orderItems.
      map(rec =>
        (rec.split(",")(1).toInt, (bv.value.get(rec.split(",")(2).toInt).get,
          rec.split(",")(4).toFloat)))
  • Join orders and order_items and compute daily revenue for each product
    val ordersJoin = ordersCompleted.join(orderItemsMap)
    val revenuePerDayPerDepartment = ordersJoin.
      map(rec => ((rec._2._1, rec._2._2._1), rec._2._2._2)).
      reduceByKey((acc, value) => acc + value)
  • Save output to file system of your choice
    revenuePerDayPerDepartment.sortByKey().saveAsTextFile(outputPath)
  • Here is the complete program (which include integration with application properties as well as file system validations)
package retail

import org.apache.spark.SparkConf, org.apache.spark.SparkContext
import com.typesafe.config._
import org.apache.hadoop.fs._

object TotalRevenueDailyPerDepartment {
  def main(args: Array[String]) {
    val appConf = ConfigFactory.load()
    val inputBaseDir = args(0)
    val conf = new SparkConf().
      setAppName("Total Revenue per department - Daily").
      setMaster(appConf.getConfig(args(2)).getString("deploymentMaster"))
    val sc = new SparkContext(conf)

    val inputPath = args(0)
    val outputPath = args(1)

    val fs = FileSystem.get(sc.hadoopConfiguration)
    val inputPathExists = fs.exists(new Path(inputPath))
    val outputPathExists = fs.exists(new Path(outputPath))

    if (!inputPathExists) {
      println("Input Path does not exists")
      return
    }

    if (outputPathExists) {
      fs.delete(new Path(outputPath), true)
    }

    val departments = sc.textFile(inputPath + "/departments")
    val categories = sc.textFile(inputPath + "/categories")
    val products = sc.textFile(inputPath + "/products")

    val departmentsMap = departments.
      map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))
    val categoriesMap = categories.
      map(rec => (rec.split(",")(0).toInt, rec.split(",")(1).toInt))
    val productsMap = products.
      map(rec => (rec.split(",")(1).toInt, rec.split(",")(0).toInt))

    val productCategories = productsMap.join(categoriesMap)
    val productCategoriesMap = productCategories.
      map(rec => (rec._2._2, rec._2._1))
    val productDepartments = productCategoriesMap.join(departmentsMap)
    val productDepartmentsMap = productDepartments.
      map(rec => (rec._2._1, rec._2._2)).
      distinct

    val bv = sc.broadcast(productDepartmentsMap.collectAsMap())

    val orders = sc.textFile(inputPath + "/orders")
    val orderItems = sc.textFile(inputPath + "/order_items")

    val ordersCompleted = orders.
      filter(rec => (rec.split(",")(3) == "COMPLETE")).
      map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))

    val orderItemsMap = orderItems.
      map(rec =>
        (rec.split(",")(1).toInt, (bv.value.get(rec.split(",")(2).toInt).get,
          rec.split(",")(4).toFloat)))

    val ordersJoin = ordersCompleted.join(orderItemsMap)
    val revenuePerDayPerDepartment = ordersJoin.
      map(rec => ((rec._2._1, rec._2._2._1), rec._2._2._2)).
      reduceByKey((acc, value) => acc + value)
    revenuePerDayPerDepartment.sortByKey().saveAsTextFile(outputPath)
  }
}
  • Now compile, build jar and execute the application on remote cluster or virtual machine

Big Data Engineer Immersion course

Go through Big Data Engineer Immersion course to build all the skills required to be successful Big Data Engineer. Content will be ready by October, 1st 2016.

Course Content

 

Add Comment

Leave a Reply

shares

Big Data Introduction - YouTube live video

Please click here

Subscribe!