Machine Learning with PySpark

Preparing the data, calling, and applying ML methods examples

This is all the stuff you can find from the ml module in Spark. For tips on navigating Spark 3.1 official docs to see details for the ML module, and others; see "Essential Imports & General Notes" in "PySpark" page group of this guidebook.

Similarly to Spark Scala, the features you want to pass to an ML model, needs to be in a Vector column. Which is different internally than an Array column, but looks the same to the user. For example, you can't do a .getItem() on a Vector column, as you would on an ArrayType column.

Let's start with a simple standarization, this is what you do when you apply the formula (x-mean)\standard deviation where x is each value in the variable to standarize. StandardScalar - PySpark 3.1 official docs

Let's once again create our toy data frame. Find its code in "Creating a Data Frame" page in PySpark page group in this book. Here is what it looked like as a reminder,

And here is its schema,

>>> dummy_df.printSchema()
root
 |-- name: string (nullable = true)
 |-- arr1: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- lang: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state0: string (nullable = true)
 |-- state1: string (nullable = true)
 |-- num1: long (nullable = true)
 |-- num2: double (nullable = true)
 |-- num3: double (nullable = true)

Since we need to convert the features into a Vector column before we can standarize, I put them both in a function to make it reusable, easier, cleaner, and more efficient to implement.

Preparation - Make Feature Column, and Standarize it

Notice that this format below may not work for versions earlier than Spark 3.1, as line 8 cause an error.

import pyspark.sql.functions as f
from pyspark.sql.types import *
 
def standarize(main_df, colnames:list):
   from pyspark.ml.feature import StandardScaler, VectorAssembler
   
   vecAssembler = VectorAssembler(outputcol= "features")
   vecAssembler.setInputCols(colnames)
   vecd_df = vecAssembler.transform(main_df)
   
   sc =  StandardScaler()
   sc.setInputCol("features")
   sc.setOutputCol("standarized_features")
   scmodel = sc.fit(vecd_df)
   sc_df = scmodel.transform(vecd_df)
   
   return sc_df

This version works for earlier than 3.1 versions of Spark. The only difference, is now we added "inputCols" argument while instantiating the vectorAssembler, rather than setting it on it after instantiating.

import pyspark.sql.functions as f
from pyspark.sql.types import *

def standarize(main_df, colnames:list):
   from pyspark.ml.feature import StandardScaler, VectorAssembler
   
   vecAssembler = VectorAssembler(inputCols= colnames, outputCol="features")
   vecd_df = vecAssembler.transform(main_df)
   
   sc =  StandardScaler()
   sc.setInputCol("features")
   sc.setOutputCol("standarized_features")
   scmodel = sc.fit(vecd_df)
   sc_df = scmodel.transform(vecd_df)
   
   return sc_df

This works on a single column standarize(dummy_df, colnames=['num1']) which gives a Vector column that has one element in it; or on multiple columns, yielding a Vector columns of many elements in it, standarize(dummy_df, colnames=['num1', 'num2']) Compare the two outputs,

Since you need a Vector column for any ML model you want to employ; you can use your standarized column to feed in the next thing you want to do.

Bucketizer - Categorizing a Numerical Variable

from pyspark.sql import functions as f
from pyspark.sql.types import *
from pyspark.ml.feature import Bucketizer

#create the split values that we will feed into Bucketizer later on
the_splits= [-float("inf")] + list(range(0,1050,50)) + [float("inf")]

bucketizer = Bucketizer()
bucketizer.setSplits(the_splits)
bucketizer.setInputCol("num2")
bucketizer.setOutputCol("buckets")
bucketed_df = bucketizer.transform(dummy_df)

#show results
bucketed_df\
.select("num2","buckets")\
.show(10,truncate=False)

yields,

+------+-------+
|num2  |buckets|
+------+-------+
|456.78|10.0   |
|234.01|5.0    |
|987.02|20.0   |
|332.3 |7.0    |
|980.23|20.0   |
|937.94|19.0   |
|128.95|3.0    |
|563.63|12.0   |
|614.84|13.0   |
+------+-------+

Let's explain what happened here.

First, the splits the_splits we have are, [-inf, 0, 50, 100, 150, 200, 250, 300, 350, 400, 450, 500, 550, 600, 650, 700, 750, 800, 850, 900, 950, 1000, inf]

And the values of the variables we fed to the bucketizer are,

>>> [row['num2'] for row in dummy_df.select("num2").collect()]
[456.78, 234.01, 987.02, 332.3, 980.23, 937.94, 128.95, 563.63, 614.84]

Now, bucket 0 are all the values between -infinity and 0 (i.e. less than zero). Bucket 1 are all the values between 0 and 50 Bucket 2 are all the values between 50 and 100 Bucket 3 are all the values between 100 and 150 Bucket 4 are all the values between 150 and 200, and so on.

And so, 456.78 falls into bucket 10, because it's between 450 and 500. 234.01 falls in bucket 5 because it's between 200 and 250 987.02 falls in bucket 20 because it's between 950 and 1000 well, you get it.

Important Notes

  • Read Bucketizer - PySpark 3.1 official docs to understand what other parameters do. Since Spark 3.0 the Bucketizer can take several input columns instead of just one.

  • I'm assuming the categories/buckets it creates are half open (a,b] but I couldn't corroborate that in the docs.

  • Bucketizer puts values of the variable into the splits intervals. And so you can end up with empty categories. For example, if you have your splits 0 to 1000 by 50 stepsize, and you don't have any values in the variables that are say, between 0 and 50, you will not see 1.0 showing in "buckets" output column. So if it's important for you to have all the buckets (say you're comparing distributions/histograms of two variables), then you need to create your categories by hand as a UDF. I will show you how in page "Related to ML" page of PySpark page group in this guidebook.

  • You need to include the infinities in your splits. I think Bucketizer errored out for me once when I didn't add them.

Last updated