Machine Learning in Spark

Preparing, calling libraries, applying, results.

By Example

I will assume you already know ML and what you want to do, so I'm not going to explain what each thing does, but rather Spark pertinent details. I show you here an example of how I used it.

Refer to PySpark page group of this gitbook, then to "Machine Learning with PySpark" page for more examples and explanations about other ML models not shown here. That is still not an inclusive list of course, only to show you imports, instantiating, how to pass data to ML models, and what output to expect.

☞ Resources, and a Note

This time the official docs are actually pretty good to read details of what algorithms are available and an example on how to use them, that might or might not be good. Full list to the left of the page https://spark.apache.org/docs/2.4.3/ml-guide.html

However, the examples provided use the "almost depricated" mllib package, which is being replaced with the ml package. So, there is still some algorithms that aren't available yet in ml though I'm not sure how ML libraries have changed in Spark 3.x.

Below, I use the ml library, and I show you what I used to do a regression model. You can do similarly for a classification model, or read more on your own. I like to put everything in a function, so I'll show you that as well.

Packages to Import

//basic imports
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.util.SizeEstimator
import spark.implicits._

//ML Regression related imports
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.feature.{Interaction, PolynomialExpansion, VectorAssembler, VectorIndexer, StringIndexer, StandardScaler, StandardScalerModel}
import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor, LinearRegression}
import org.apache.spark.ml.evaluation.{RegressionEvaluator}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

Preparation

In addition to whatever cleaning and preparation of your data you need to do, Spark requires special preparation for ML models. Starting with combining all the X's into a vector column named exactly "features". Then for every prep step afterwards, you need to define the input column and the output columns names, as I will show you below for transformations like polynomial features (interaction terms), and standardization

For details and understanding of the arguments, go to the package of the transformer in the official Spark Scala API docs. For example, for the StandardScalar, type "org.apache.spark.ml.feature.StandardScaler" in the search bar, to land here. Look at the imports above to know which came from where.

def setupForML(df:DataFrame):DataFrame = {
    
    var d1 = df
    //do here whatever prep & clean you need to your original dataframe
    //I kept updating `d1` during this step. 
    
    //----------------
    //prepare for ML
    var d2 = d1

    //1. create the "features" vector column
    val assembler = new VectorAssembler().setInputCols(Array("col1","col2","col3", "col4")).setOutputCol("features")   
    d2 = assembler.transform(d2) // now we have features column
    
    //2. add second degree polynomial features 
    val polyExpansion = new PolynomialExpansion().
        setInputCol("features").
        setOutputCol("polyFeatures").
        setDegree(2)
    d2 = polyExpansion.transform(d2)

    //3. standarization
    val scaler = new StandardScaler().
        setInputCol("polyFeatures").
        setOutputCol("scaledFeatures").
        setWithStd(true).
        setWithMean(false)
    val scalerModel = scaler.fit(d2)
    d2 = scalerModel.transform(d2) //adds vector column.
    
    //4. adding perturbation to target var, to take log
    d2 = d2.withColumn("perturbedY", $"targetCol" +0.001) 
    d2 = d2.withColumn("logY", log("perturbedY")) 
    
    return d2
}

Taking Log of Target Variable

Other transformations can be done inside this function as well. I had a reason to believe that my data were skewed normal, so I added a logarithm transformation to the target variable.

First make sure your target variable doesn't have negative values, because there is no logarithm for negative numbers. Don't scale the y variable before taking log, because scaling/standarizing will create negative values, by definition of it.

There is no log for zero either. However, you can add a small perturbation to the values such that it would not alter it much. Size of perturbation depend on the context of your problem; the units of the target variable, and their interpretation. Usually anything small will do, even 0.0001 or so, as long as it's not zero.

You can standarize after taking the logarithm of that target variable, though there's no need for that, it would be an overkill, since taking log is a form of compacting.

NOTES:

  • Both StandardScalar and stddev from Spark functions, use N-1, the adjusted standard deviation.

  • As you know, StandardScaling does not change skewness or shape of distribution; neither does taking the logarithm.

  • You can scale manually if you like,

    var meanY = d2.select(avg(targetCol)).first.getDouble(0)
    var stdY = d2.select(stddev(targetCol)).first.getDouble(0)
    var df2 = df1.withColumn("scaledY", (targetCol - meanY)/stdY)

  • assembler.transform(d2) gives an RDD, not a DataFrame. So if you want to view the first, say, 4 rows, you can't use .show instead use .take(4) . It'll be shown as an array, not a table. But you can still use .printSchema on it though.

Evaluation / Scoring

As you know, we can use R-squared and/or Mean Squared Error to evaluate the regression model results. With better model having R-squared close to 1 and lowest MSE possible. Compare models to each other after scaling your data as we did above.

Below, I make a function to compare 5 regression models (4 linear and 1 nonlinear), with log(y) and without log(y). So we will be comparing 10 models per data frame.

Steps,

  • Instantiate each model. Read more about them in their respective libraries in the official docs, as mentioned above

  • Instantiate the evaluators

  • Train/Test/Split with random seed for reproducibility

  • Fit the models

  • Score on both MSE and R^2

  • Choose best score among the 5 models

  • Get the score, and the model name that produced it, saved in a list

  • Build a Row object having best scores, model name, whether or not we used the log of target variable. (the column called DeviceId is specific to my use-case, where I filter the general dataframe, on a specific device, thus having a data frame for each device id). The reason I made a Row object, is because in my case, I will run it in a for-loop for several data frames and have a data frame of results. See below.

def runML(df:DataFrame, labCol:String) : (String, String, Double, String, Double, String)={
    
    var d3 = df
    // INSTANTIATING THE ML MODELS TO USE
    val lr = new LinearRegression().
        setMaxIter(1000).
        setRegParam(0.0).
        setElasticNetParam(0.0).
        setFeaturesCol("scaledFeatures").
        setLabelCol(labCol).
        setStandardization(false)

    val elastic = new LinearRegression().
        setMaxIter(1000).
        setRegParam(1).
        setElasticNetParam(0.33).
        setFeaturesCol("scaledFeatures").
        setLabelCol(labCol).
        setStandardization(false)   

    val ridge = new LinearRegression().
        setMaxIter(1000).
        setRegParam(0.1).
        setElasticNetParam(0.0).
        setFeaturesCol("scaledFeatures").
        setLabelCol(labCol).
        setStandardization(false)

    val lasso = new LinearRegression().
        setMaxIter(1000).
        setRegParam(0.1).
        setElasticNetParam(1.0).
        setFeaturesCol("scaledFeatures").
        setLabelCol(labCol).
        setStandardization(false)

    val rf = new RandomForestRegressor().
        setNumTrees(100).
        setMaxDepth(3).
        setLabelCol(labCol).
        setFeaturesCol("scaledFeatures") //no need to scale in RandomForest, nor to take log(y)

    //evaluaters(scores) on hold-out set
    val r2_evaluator = new RegressionEvaluator().
        setLabelCol(labCol).
        setPredictionCol("prediction").
        setMetricName("r2")

    val mse_evaluator = new RegressionEvaluator().
        setLabelCol(labCol).
        setPredictionCol("prediction").
        setMetricName("mse")

    // APPLYING ON LOG(Y)
    //train-test split
    var Array(train, test) = d3.randomSplit(Array(0.8, 0.2), seed= 1234L) //1234L is the random seed. can also use 1234. L is for LongType

    //fitting
    var rfModel = rf.fit(train)
    var rfPreds = rfModel.transform(test)

    var lrModel = lr.fit(train)
    var lrPreds = lrModel.transform(test)

    var ridgeModel = ridge.fit(train)
    var ridgePreds = ridgeModel.transform(test)

    var lassoModel = lasso.fit(train)
    var lassoPreds = lassoModel.transform(test)

    var elasticModel = elastic.fit(train)
    var elasticPreds = elasticModel.transform(test)
    
    //scoring
    var rfMetric = r2_evaluator.evaluate(rfPreds)
    var lrMetric = r2_evaluator.evaluate(lrPreds)
    var ridgeMetric = r2_evaluator.evaluate(ridgePreds)
    var lassoMetric = r2_evaluator.evaluate(lassoPreds)
    var elasticMetric = r2_evaluator.evaluate(elasticPreds)

    var rfMSE = mse_evaluator.evaluate(rfPreds)
    var lrMSE = mse_evaluator.evaluate(lrPreds)
    var ridgeMSE = mse_evaluator.evaluate(ridgePreds)
    var lassoMSE = mse_evaluator.evaluate(lassoPreds)
    var elasticMSE = mse_evaluator.evaluate(elasticPreds)

    //choose the best score, Rsquared and MSE
    var scoreList = List[Double](rfMetric, lrMetric, ridgeMetric, lassoMetric, elasticMetric)
    var maxScore = scoreList.max
    var bestModel = scoreList.indexOf(maxScore)
    var names = Map(0->"RF", 1->"LR", 2->"Ridge", 3->"LASSO", 4->"ElasticNet")

    var MSEscoreList = List[Double](rfMSE, lrMSE, ridgeMSE, lassoMSE, elasticMSE)
    var minScore = MSEscoreList.min
    var lowestMSE = MSEscoreList.indexOf(minScore) //then use `names` again for the same mapping

    //make a Row object, collecting results
    //Row(deviceId, bestR2model, r2score, bestMSEmodel, MSEscore, log/nolog)
    var logNOlog : String = "Unknown"
    var devName:String = d3.select("DeviceId").first()(0).toString //returns `Any` type without `.toString`
    if (labCol == "scaledY"){logNOlog = "scaled_as_is"} else if (labCol == "perturbedY") {logNOlog = "as_is"} else {logNOlog = "withLog"}
    // val resTup = (devName, names(bestModel), f"$maxScore%1.2f", names(lowestMSE), f"$minScore%1.2f", logNOlog)
    val resTup = (devName, names(bestModel), f"$maxScore%1.2f".toDouble, names(lowestMSE), f"$minScore%1.2f".toDouble, logNOlog)

    return resTup
}

Applying it in my case for several devices (i.e. data frames), in a for-loop, will result in a data frame. Since every iteration will yield a Row in the data frame,

//prepare the results data frame

var dfSeq : Seq[(String, String, Double, String, Double, String)] = Seq()

This will generate a boatload of warnings; very verbose. Also notice that runML trains two models per device, so it takes a while.

Creating a data frame using the Seq() method is repeated in "Creating a Data Frame" page of this book.

Next, have a for loop for data frames you want to test the models on.

//I have already created a list of strings having the devices I
//want, called `devsList` 

for (item <- devsList) {
    //add the needed columns for `runML` function
    var itemDF = df.filter($"DeviceId"===item)
    var meanItemDF = itemDF.select(avg($"perturbedY")).first.getDouble(0)
    var stdItemDF = itemDF.select(stddev($"perturbedY")).first.getDouble(0)
    itemDF = itemDF.withColumn("scaledY", ($"perturbedY" - meanY)/stdY)
    
    //apply `runML` for the three different cases of the target variable
    var itemRes_asis_scaled = runML(itemDF, "scaledY") //just scaled, no log
    var itemRes_log = runML(itemDF, "logY")
    var itemRes_asis = runML(itemDF, "perturbedY") //almost as is, no log
    dfSeq = dfSeq :+ itemRes_asis
    dfSeq = dfSeq :+ itemRes_asis_scaled
    dfSeq = dfSeq :+ itemRes_log
}
var resDF = dfSeq.toDF("Device", "bestR2Model","r2Score", "bestMSEmodel", "MSEscore", "targetColTreatment")
resDF.show

☞ Find explanation and details of how I built this data frame in "Creating a Data Frame" page of this book.

I used 0.33 for the ElasticNet hyperparameter because I wanted the Ridge and LASSO effects to be almost equal. Equation for L1 and L2 regularization found on LinearRegression page in the official Spark Scala API docs.

Syntax

General Tips

  • Use "new" in front of objects for ML. See answer for why.

  • Ending with "Model" is the fitted/product object of the same name. e.g. "PipelineModel" is the fitted pipeline from Pipeline object so you can call other methods on it, like params etc.

Automatically identify categorical features, and index them,

// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer().
    setInputCol("features").
    setOutputCol("indexedFeatures").
    setMaxCategories(4).
    fit(data)

From this.

Resources

ml docs and scala 2.4.3 docs -- Reminder: to search in latter, type the package name in the search box. e.g. org.apache.spark.ml. and choose from list

General Plan

  1. imports

  2. create "features" and "labels" columns

  3. transformations setup, scalars, nlp processing etc.

  4. train/test/split

  5. pipeline and/or model

  6. fit to train set

  7. predict

  8. evaluation of model

  9. save results to disk

  10. load saved model

Step by Step Syntax

Imports

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.feature.{VectorAssembler, VectorIndexer, StringIndexer, StandardScaler, StandardScalerModel}
import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor, LinearRegression}
import org.apache.spark.ml.evaluation.{RegressionEvaluator}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

☞ IMPORTANT: Read df as var. "df", "test", "train", and "scaledDF" must all be var type! because we're gonna need to frequently modify it on the fly.

var df= spark.read.format("csv").
  option("header", "true").
  option("inferSchema", "true").
  load("csv_path")

Create the "label" column

df = df.withColumnRenamed("myTargetCol","label")

Create the "features" column can use VectorIndexer() here instead of, or with VectorAssembler()

val asmblr = new VectorAssembler().
  setInputCols(Array("col1,"col2",...)).
  setOutputCol("features")

df = asmblr.transform(df)

Transform Code Source

val scaler = new StandardScaler().
    setInputCol("features").
    setOutputCol("scaledFeatures").
    setWithStd(true).
    setWithMean(false)

val scalerModel = scaler.fit(df)

var scaledDF = scalerModel.transform(df)

scaledData.show()

Train/Test/Split

var Array(train,test)= scaledDF.randomSplit(Array(0.8,0.2), 42) //42 is the random seed

test.show //this is the test set dataframe

Model

var lr = new LinearRegression().
    setMaxIter(10).
    setRegParam(0.3).
    setElasticNetParam(0.8)

var rf = new RandomForestRegressor().
    setNumTrees(50).
    setMaxDepth(3).
    setLabelCol("label").
    setFeaturesCol("scaledFeatures")

// fitting
var lrModel = lr.fit(train)
var rfModel = rf.fit(train)

Pipeline

// Chain indexer and forest in a Pipeline.
val pipe = new Pipeline()
  .setStages(Array(transformer1, transformer2,..,assembler, model))

// fitting i.e. learning parameters and applying transformers, indexers, assemblers, etc.
var pipeModel = pipe.fit(train)

// Make predictions.
val predictions = model.transform(testData)

Evaluating - Regression

// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse") //or "r2"

var metric = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $metric")

Hypertuning Parameters

//construct a pipeline with instantiated models as before.

// build the parameters grid
var paramGrid = new ParamGridBuilder().
  addGrid(transformer1.param1, Array(option1,option2)).
  addGrid(model.param1, Array(option1, option2, option3, option4)).
  addGrid(model.param2, Array(option1, option2)).
  build()

// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
// This will allow us to jointly choose parameters for all Pipeline stages.
// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric
// is areaUnderROC.
val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)  // Use 3+ in practice
  .setParallelism(2)  // Evaluate up to 2 parameter settings in parallel

// Run cross-validation, and choose the best set of parameters.
val cvModel = cv.fit(training)

// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test)
  .select("id", "text", "probability", "prediction")
  .collect()
  .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
    println(s"($id, $text) --> prob=$prob, prediction=$prediction")
  }

Cross-Validation

var cv = new CrossValidator().
  setEstimator(pipe).
  setEstimatorParamMaps(paramGrid).
  setEvaluator(evaluator)

var cvModel = cv.fit(train)
var preds = cvModel.transform(test)

evaluator.evaluate(preds)

Extras

// Print the coefficients and intercept for linear regression
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

// Summarize the model over the training set and print out some metrics
val trainingSummary = lrModel.summary
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
trainingSummary.residuals.show()
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

// show learned RandomForest model
val rfModel = model.stages(1).asInstanceOf[RandomForestRegressionModel]
println(s"Learned regression forest model:\n ${rfModel.toDebugString}")

Last updated