Pipeline in PySpark 3.0.1, By Example

If you go to the documentations, you won't understand much, there isn't even a code example.

Here, I show how to do OneHotEncoding, and create the "features" column, which is needed for any Machine Learning algorithm in Spark.

In this simple example, I have all the data numeric, except for two columns ["col1", "col2"] that I need to directly just encode into numbers, or continue one step further, and one hot encode them, that is to "dummify" following Pandas' language.

from pyspark.ml.pipeline import Pipeline
from pyspark.ml.features import StringIndexer, VectorAssembler, OneHotEncoder, Imputer, PCA

# whatever you need to do first in order to get your features list.
# Remember to keep ID column and target variable out of features
feats=[item for item in df.columns if item not in [ID_COLUMN, TARGET_VARIABLE]] 

# Building the pipeline
#----------------------
pipe_stages=[]

sindexer= StringIndexer(inputCols=feats, outputCols=[f"indexed_{item}" for item in feats], handleInvalid='keep', stringOrderType='frequencyDesc')
pipe_stages +=[sindexer]

# dummy numerized strings into a sparse vector
ohe= OneHotEncoder(inputCols=list_to_encode, outputCols=[f"ohe_{item}" for item in list_to_encode], handleInvalid='keep', dropLast=True)
pipe_stages +=[ohe]

# impute missing numerical values with the median
imp= Imputer(inputCols=numerical_cols, outputCols=[f"imputed_{item}" for item in numerical_cols], strategy='median')
pipe_stages +=[imp]

# all features wanted
vecAssembler= VectorAssembler(inputCols=[f"indexed_{item}" for item in feats]+[f"ohe_{item}" for item in list_to_encode]+[f"imputed_{item}" for item in numerical_cols], outputCol="features", handleInvalid='keep')
pipe_stages+=[vecAssembler]

# scale all features. Maybe do before encoding categorical columns?
ss= StandardScaler(inputCol='features', outputCol="ss_features", withMean=False, withStd=True)
pipe_stages+=[ss]

pipe= Pipeline(stages=pipe_stages)

# Applying the pipeline
# ---------------------
df2= pipe.fit(df).transform(df)

Notes

  • Now you can save the pipeline model the way we save models without MLFlow, pipe.fit(df).save(path) Page "Saving and Loading a Model with, and without MLFlow" on this book explains more on saving and loading PySpark models. You can also save the result dataframe and load it afterwards for the next steps in modeling.

  • Spark turns categorizes to numbers according to frequency according to stringOrderType='frequencyDesc'; that is, if I have a color column, and "red" appeared the most, it will be given 0, if "blue" was the second most repeated value, it gets 1, so on and so forth.

  • StringIndexer transforms categorical/string variables to numbers.

  • To one-hot encode multiple columns in PySpark, check out this stackoverflow answer

  • OneHotEncoder is called OneHotEncoderEstimator in Spark >=2.3 , < 3.0

  • VectorAssembler collected all the, now numerical, input features you want in a dense vector. Spark needs it like that for it to do modeling on.

  • Sometimes OHE and PCA can give you trouble.

  • You can save that pipeline model with pipe.save(path) then load it with PCAModel.load(path) PCAModel is from pyspark.ml.pipeline import PCAModel

  • StandardScaler withMean=True is to center each variable around its own mean making it False centers all variables around zero. Similarly for withStd, it's for the variable itself, turning that to False, makes it of std=1. Thus having withMean=False, and withStd=False should give standard mean=0, std=1

You might not want to only turning string/categorical variables into numbers and stopping there, since it implies ordinality. For example, if I have a column describing color "red", "white", "blue" and I turned it into 0, 1, 2 for instance; that might be interpretted by the model as blue > white and thus, when color increase by one unit, the target is affected, positively or negatively, by this much. If you recall from your statistics classes days, this is how linear regression translates coefficients of its explanatory variables. So if you want to avoid that, dummifying that categorical variable elimiate this issue, though it might blow up your dataframe to make it "wide"; in which case, you need to mitigate by dimensionality reduction, creating meaningful statistical features and dropping those that aren't meaningful using correlation or feature importance or your domain knowledge, or combine less frequent categories, or all of those. There's also learning from the output varible, explained on this medium post and this other medium post Yet another less known way to handle categorical variables, is to convert the strings into binary system instead of letters. Benefit is, this way you convert to numbers, and now they have less ordinality implication. See this datascience stackExchange post This process is totally different, and not to be confused with Spark's ML Binarizer, which only creates a threshold on a continuous variable to make it binary 0/1. This "thresholding" process is apparently frowned upon, see this thread

Last updated