Reading Files, Essential Imports & Docs + Optimization

Loading CSV and Parquet files as SQL data frames. Essential packages to import. Tips on Reading Official Docs. and Tips on Optimization and Making Spark Execution Faster

Quick word on val and var in Scala,

val is immutable i.e. can't be changed after assignment, while var is immutable and can be changed after assignment. What does that practically mean? I use val to define windows, main dataframes I import, models, etc. and I use var to define variables in for-loops, functions, and dataframes I want to manipulate and transform as I go.

Reading Parquet Files

val mainDF = spark.read.parquet("path_to_parquet_file.parquet")

☞ NOTE: Must use double quotes for everything in Spark Scala.

Reading CSV Files

val mainDF = spark.read.format("csv").
  option("header",true).
  option("inferSchema", true).
  load("path_to_csv.csv")

☞ NOTE: Unlike Python, spaces are not important in Scala. If you want to break a code into multiple lines, do it after the dot.

I leave out reading text files for now, becaues I personally haven't needed to read text files to load dataframes. However, I did need to save to, load, and read text files to apply the schema to a highly compressed Parquet file. Extracting, saving, importing, and applying schemas will have its own section because it was a big thing on its own for me to figure out, and because there're multiple steps to it.

☞ Type "org.apache.spark.sql.DataFrameReader" in the search bar of the official Spark Scala API docs, to find all functions available to reading a data frame in the package.

Read Multiple CSVs at Once, by passing them to the spark.read.csv() method, separating paths with a comma. Or pass a directory (folder) that has only and all the CSVs you want, to the spark.read.csv() method. See options and further details.

Seeing a Dataframe

df.show as default shows 20 rows, in nice printed tabular format, and truncates wide columns. It can take two arguments df.show(rows#, truncate) to print top five rows, without truncation df.show(5,truncate=false) you can leave out the parameters names. You don't have to type the parameter name. e.g. show(false) sufficies to show the first 20 rows, with full column width.

If you have an RDD instead, use .take() which works in the same way, except that it doesn't display a table like .show, but rather an Array of the Rows.

☞ VERY IMPORTANT: Optimization - How to Make it Faster

  • Size of input data frame matters One thing to keep in mind, your commands are not executed until you call certain methods on them, like .show or .count. This is called lazy evaluation, meaning, any command won't be executed until a result is demanded. Know that the size of the input matters a whole lot! For example, if your table has a couple hundred of columns, select only the ones you need as you're reading it.

💡 Tip If you're reading the table for the first time, then read it as you saw above, then call .printSchema on it, without assigning it to a variable. Then, in a new cell, decide which columns you need, and select those only, assigning them to a variable. We will see more details on select in the next page; here's the example to get you started,

spark.read.parquet("lake_path_to_parquet_file.parquet").printSchema

val df0 = spark.read.parquet("lake_path_to_parquet_file.parquet").
    select("col1","col2","col3")

Other things to double check with your data engineers for smooth and quick operations,

  • Cluster size and capabilities. i.e. nodes count, process power, memory size Make sure your cluster is of adaquate size. If you have solid data engineers, you can refer to them to recommend the best cluster size and specifications, considering costs and all; or you can do it by trial and error; that is, you keep reducing the processed dataframe a little by little until it can digest it. I might be wrong, but I think what you use for cluster management also plays a role, Jenkins, on Azure, makes a lot more out of the cluster than a cluster management interface we used, which is a cheaper and smaller competitor of DataBricks. How your data is saved, cold or hot storage, also plays a huge role in how much computing power you need to "thaw" your data to retrieve it and operate on it.

  • Sequential saving and unsaving data frames to memory and disk Another trick to sequentially persist and unpersist data frames as you're transforming them so make it faster to compute, also to debug which transformation is taking the longest time. For example,

val df0 = spark.read.parquet("file_path.parquet").select("col1","col2")

df0.persist
val df1 = --some transformations on df0--

df0.unpersist
df1.persist

// so on and so forth

.persist in its default behavior, commits the object to disk only. While .cache commits to both disk and memory. Though you can modify their behavior. Memory will be faster, but smaller. Read more on caching and persisting on the official docs page in the spark.sql module. For Scala, Spark 2.4.4 SQL module -then search in page for cache and persist. For PySpark pyspark 3.1 persist, and pyspark 3.1 cache, both of which I got from main pyspark 3.1 sql module page.

By transformations, I mean all the operations you can do on the data frame, or on one or more of its columns. See next pages for transformations on one column, and on multiple columns.

An alternative to .show is .take which can take number of rows as argument. .take() won't print the df in a nice tabular format, but as an array without column names. It's much, much faster though.

Dataframe Info

Showing column names, and types df.printSchema Showing just column names, df.columns Advanced: df.explain gives logical and physical plans for the data frame.

  • Make everything into functions, so you don't overload the cluster memory and disk space.

  • Avoid converting to Python or Pandas or Scala objects as they save only to cluster memory, take a long time, and hog your master node resources. Not to mention that takes a long time to execute too.

  • If things got flustered, detach and reatach your notebook (if you're on DataBricks) to delete all the variables in the notebook.

  • Restarting your cluster will purge the memory and start you again on a clean slate.

Sample from a DataFrame / RDD

df.sample(withReplacement=false, fraction=0.5, seed=1234L).show

As you know, seed sets a random seed, so you get the same sample from the dataframe everytime you run the command with that seed number. L at the end of the number here means it's a long integer. You don't have to add it.

If you have an RDD instead, you can use very similar syntax, with the takeSample function rdd.takeSample(withReplacement: Boolean, num: Int, seed: Long)

Read more on all possible functions on an RDD from the official Spark Scala API docs, typing "org.apache.spark.rdd.RDD" in the search bar in the top left corner.

Essential Libraries to Import

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._ //for Row, Column, DataFrame
import org.apache.spark.sql.expressions.Window 
import org.apache.spark.util.SizeEstimator
import spark.implicits._

Recommendations & Observations

💡Practical Tip

As I said above; because Spark is based on "lazy evaluation" i.e. doens't really do anything until there's a command that demands results, you'll see that it executes things fast, until you add a result-demanding function like .show or .count at the end of the command; then it takes a long time and resources, depending on the size of the data passed through from the beginning.

I recommend you don't use a function like show or count until you make sure your code works. Also, use small samples of your data frame whenever possible, to test on and develop your script locally on your machine rather than on the cluster.

Spark Scala v.s. Python

Most prominent Scala differences from Python, other than the syntax,

  • Scala is stringent and particular about data types, which is a point of frustration, since it will give vague, unrelated errors sometimes, say if the type mismatch is tucked away in a function..

  • Spark dataframes don't have index column. This is a complete shift of thinking for us coming from Python and R, because it throws out of the window everything you know about locating rows, and how to write functions on columns. Instead, you will be using .filter() and Window for everything you need to do on columns; as we'll see in later sections.

  • You can use SQL syntax sometimes.

  • Always use double quotations " for text. Unless you're in SQL context, then use single quotation. e.g. df.filter( "colname" == 'value' ")

💡 Pro Tip - Reading Official Docs

The official Spark docs have great programming guides to start with. Though they aren't that helpful beyond that for the most part, they do have the content, and once in a blue moon will have very useful examples of usage; I still check them first for specifications or availabaility of a function.

How to find something:

  1. Google "spark 2.4.3 docs", or whatever version you have,

  2. Go to API Docs tab on top --> Scala, then

  3. Type in the search bar in the top left corner the name of the package you're looking for. e.g. "org.apache.spark.sql.functions" to see all the methods in that package; sometimes grouped in sections throughout the page.

  4. Use cmd+F to quickly find what I'm looking for in the page afterwards.

  • Searching "functions" alone or "types" alone will also return results from other packages, so it's confusing.

  • Most functions that work on Dataset, work on DataFrame also.

  • You can find all functions on a column from the official docs, typing "org.apache.spark.sql.Column" in the search bar.

  • I will tell you the full package name when I reference it.

Bonus

If you're running spark-shell locally on your computer, you can always tap the tab button on your keyboard, after the dot, on any object. e.g. df. then tab and you'll see a list of all possible methods on a dataframe. Works for any object.

Last updated