Let us explore how we can perform row level transformations. First let us check typical scenarios for row level transformations.
- Data cleansing – removing special characters
- Standardization – eg: phone number, we might want to get phone numbers from different sources and it might be represented different manner in different systems. When we get onto down stream systems we have to represent phone number in one standard format.
- Discarding or filtering out unnecessary data
- Unpivoting the data, one row with many columns might have to return a collection of rows
Let us map these scenarios with APIs categorized under transformations
- Data cleansing and standardization – map. It takes one record as input and returns exactly one record as output
- Discarding or filtering – filter. It take one record as input, and if expression returns false record will be discarded
- Also for vertical filtering, we use map function
- Unpivoting – flatMap. For each input record there will be 1 to n output records
- Number of invocations = Number of elements in RDD
First we need to create RDD from files or collection and then apply transformations. To create RDD from files if you are using local installation on your PC use local path and for lab or virtual machines of Cloudera or Hortonworks or MapR use HDFS path.
For this you can launch Spark in local or stand alone mode on your PC, and on lab or virtual machines you can launch Spark in YARN or local mode.
- Read data from orders –
val orders = sc.textFile("/public/retail_db/orders")
- After each step preview data using
- Code to iterate after running take to print data in readable format –
- Filter for completed orders –
val completedOrders = orders.filter(rec => rec.split(",")(3) == "COMPLETE")
- Extract order_id and order_date from each of the completed orders –
val orderDates = completedOrders.map(rec => (rec.split(",")(0).toInt, rec.split(",")(1)))
- Each element after above code snippet is of type tuple or pair.
- Create RDD out of array of strings
- lines in below code will generate Array of Strings
- Code to convert to RDD –
val linesRDD = sc.parallelize(lines)
- Parse each string and get individual words unpivoted as separate records –
val words = linesRDD.flatMap(rec => rec.split(" "))