Spark Context and Configuration

Introduction

As part of this topic we will understand what is SparkConf and SparkContext.

  • SparkConf - Configuration information of the application
  • SparkContext - Tells Spark how to access the cluster (YARN or Mesos or local)
  • Application parameters and configurations

Prepare for Certifications

SparkConf

As part of this topic we will see details about SparkConf and how it is used while submitting spark applications.

  • As part of Cluster setup we will have a directory which in turn have files such as spark-env.sh and spark-defaults.conf.
  • spark-env.sh and spark-defaults.conf will be accessible using SparkConf
  • As part of application development first we need to create SparkConf object
  • There are several set and get methods to override parameter values or environment variables
  • setAppName can be used to give readable name to the application to get the logs
  • setMaster can be used to run the application in YARN mode or local mode or mesos mode
  • setExecutorEnv can be used to override environment variable of executor (such as memory)
  • set is generic method to override any configuration parameter

Here is the code snippet to create SparkConf

    val conf = new SparkConf().
      setAppName("Word Count).
      setMaster("local")

SparkContext

As part of this topic we will see details about SparkContext and how it is used while submitting spark applications.

  • SparkContext tells how application should be run (based on default or SparkConf setMaster)
  • Following are different contexts in which spark applications can run
    • local
    • yarn-client
    • mesos URL
    • spark URL
  • Once SparkContext object is created we can invoke functions such as
    • textFile - to read text files from local, s3, HDFS etc
    • sequenceFile - to read Hadoop's sequence file
    • parallelize - to parallelize collection
    • and many more

Develop Word Count Program

As part of this topic we will see how word count program can be developed using Scala IDE. Here are steps in developing the program

  • Program takes 2 parameters - input path and output path
  • Initialize application name and master (using SparkConf)
  • Create spark context object
  • Read data from input path using textFile
  • Apply flatMap and map function to tokenize the data as well as assign 1 to each of the token
  • Perform the word count using reduceByKey
  • Write output to outputPath
import org.apache.spark.SparkContext, org.apache.spark.SparkConf

object WordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf().
      setAppName("Word Count").
      setMaster("local")
    val sc = new SparkContext(conf)
    val inputPath = args(0)
    val outputPath = args(1)
      
    val wc = sc.textFile(inputPath).
      flatMap(rec => rec.split(" ")).
      map(rec => (rec, 1)).
      reduceByKey((acc, value) => acc + value)
      
    wc.saveAsTextFile(outputPath)

  }
}

Disadvantages with the approach

  • setMaster is hardcoded
  • We should not pass master as argument
  • We need to externalize many parameters like master (in this case)
  • There is no validations about inputPath
  • If outputPath already exists it fails

Application Properties and Parameters

In this topic we will see application properties and parameters in detail. Following are the ways we can pass parameters to application.

  • Arguments
  • Properties files (application.properties, application.conf etc)
  • We typically develop program with main function
  • It takes Array of strings as parameters
  • We can pass as many parameters as we want
  • It is not good practice to pass all the parameters as arguments (eg: username, password etc)
  • At the same time username, password etc should not be hard coded as part of the code
  • We need to externalize those using properties files
  • There is scala plugin to load these parameter files (typesafe config)
    • libraryDependencies += "com.typesafe" % "config" % "1.3.0"

Code snippet to load configuration properties

    val appConf = ConfigFactory.load()

Wordcount Improvised

In this topic we will see improvised word count program, which uses

  • Pass input path and output path as arguments
  • application.properties to externalize deployment mode (local or yarn-client etc)
    • Create application.properties and add parameter deploymentMode
    • Add dependency typesafe config dependency to build.sbt
    • Modify code to load application.properties
    • Get deploymentMode and pass it to setMaster
  • Accessing all run time parameters
  • Validate input path
    • Check whether input path exists
  • Override output path
    • Delete output path if already exists
  • Exercise - Compile it and run it on the cluster (for reference, click here and go to last section)

build.sbt

name := "demo-spark-scala"
version := "1.0"
scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "1.6.2"
libraryDependencies += "com.typesafe" % "config" % "1.3.0"

WordCount.scala

package wordcount

import com.typesafe.config._
import org.apache.spark.SparkContext, org.apache.spark.SparkConf
import org.apache.hadoop.fs._


object WordCount {
  def main(args: Array[String]) {
    val appConf = ConfigFactory.load()
    val conf = new SparkConf().
      setAppName("Word Count").
      setMaster(appConf.getString("deploymentMaster"))
      
    for(c < - conf.getAll)
      println(c._2)
    val sc = new SparkContext(conf)
    val inputPath = args(0)
    val outputPath = args(1)
    
    val fs = FileSystem.get(sc.hadoopConfiguration)
    val inputPathExists = fs.exists(new Path(inputPath))
    val outputPathExists = fs.exists(new Path(outputPath))
    
    if(!inputPathExists) {
      println("Invalid input path")
      return
    }
      
    if(outputPathExists)
      fs.delete(new Path(outputPath), true)
      
    val wc = sc.textFile(inputPath).
      flatMap(rec => rec.split(" ")).
      map(rec => (rec, 1)).
      reduceByKey((acc, value) => acc + value)
      
    wc.saveAsTextFile(outputPath)

  }
}

 

Add Comment

Leave a Reply

shares

Big Data Introduction - YouTube live video

Please click here

Subscribe!