Related Python Snippets

Useful Python tips and tricks related to PySpark

This page isn't a dedicated Python programming page, it's things I needed while I'm working with PySpark.

Text and Float Format in Python

num= 12.3456789                                                         

"%0.2f" % num                                                          
# outputs '12.35'

print("{:0.2f}".format(num)) 
# outputs 12.35

Notice that it rounds it as well. source

For string variables, f"{var:23s}" e.g. f"{name:23s}" prints the length of 23 characters for all rows in the variable, padding with empty spaces at the end. Useful to line up the output, say for printing a report of some sort.

List-of-Lists, to List in Python

Sometimes when dealing with dataframe columns, we need to itereate over them with different conditions, resulting in creating a list of lists, but we really need it to be a flat list. Numpy commands for list-of-lists of strings have trouble. np.array().flatten() didn't do what I expected it to do. Using np.concatenate(np.array(list_of_lists)) gave a deprecation warning, specially with string lists. Without Numpy,

import itertools
concat_list= list(itertools.chain(*list_of_lists))

Using Counter with df columns to reveal how many we have of each kind

Counter from collections package returns a tuple of (object name, its freqency count in data).

df.dtypes returns a tuple of (column name, column type) In the next commands, I iterate on these tuples, to capture columns of certain type with specific part in their name.

Worth noting that Counter doesn't return a list, rather a "Counter" object, with its own commands. You can get the keys of it with .keys() as if it's a dictionary, but it's not a python dictionary.

To get the Counter results orderd in a descending fashion, add .most_common() at the end. source

I think these commands work on Pandas dataframes as well, because df.dtypes is available to Pandas dfs as well. df.dtypes is what we use Counter on.

TIP: If you're creating a small spark dataframe, and want to convert it to Pandas for whatever reason; convert all "decimal" types to DoubleType first. Otherwise, you'll get a warning that it's not efficient to leave them as decimals after you convert to Pandas, and it will take significantly longer to do the spark to pandas df conversion.

Types and their counts of all columns in a Spark DF

from collections import Counter
Counter([col[1] for col in df.dtypes])

Get counts of specific type

Counter([col[1] for col in df.dtypes if (col[1]=='string') and (col[0].startswith("merchant_"))])

Count Prefixes and Suffixes of all (string) columns

In case there was some data engineering done, and columns are prefixed or suffixed with some standard acrynoyms. Here's how you find the count of each prefix and suffix Prefix

from collections import Counter
print(Counter([col[0].split("_")[0] for col in df.dtypes]).most_common())

The last part .most_common() orders Counter results by count of values; pretty neat. Suffix

print(Counter([col[0][::-1].split("_")[0][::-1] for col in df.dtypes]).most_common())

First reverse get the last element, which is now first in the reversed list. The second reverse is needed to order the words correctly, because they come out reversed.

Apply a function to a single column in Pandas df

Use case: apply a complex function or codition to a column, using other columns in the dataframe, row-wise. Example: I want to test if "target" falls between "col1" and "col2" in the dataframe, have a special logic to follow when comparing, then create a new column in the df to store the results. Here's how to do it

# my function that I need to apply
def compute_error(x:float, y1:float, z99:float) -> float:
	if (x>=y1) & (x<=z99):
		return 0 #this means target in interval
	elif (x>=z99):
		return x-z99 #this means target exceeds upper bound
	elif (X<=y1):
		return x-y1 #this means target lower than lower bound
		
# now to apply it to the dataframe, replacing generic columns x,y1, z99 with column names from the dataframe 
df['manually_computed_error'] = df.apply(lambda x: compute_error(x['target'], x['0.01'], x['0.99']), axis=1)

To give you a little bit more context, I was doing Quantile Regression here, and columns "0.01" and "0.99" are the predictions for the 1% and 99% quantiles, respectively. So yeah, my column names are the string instance of the quantile itself.

To understand all details why this .apply method works this way in this syntax, I don't know about you but I was surprised to see x['colname'], check out this fantastically thourough StackOverFlow answer (answer)[https://stackoverflow.com/a/62968313/11381214/]

Least Squares Polynomial Fit, Quantile Regression, and Tips on Linear Regression

Even if you haven't heard of this before, the idea is very appealing. Wonderful Numpy makes it as as easy as one command np.polyfit() check it out!

Since we're on the topic of regression fits, here's a quick tip, Besides using Ridge, Lasso, and ElasticNet to trim your linear regression model, you can also check out the p-value for each coefficient it spits out, the null hypothesis is: that coefficient = 0. So if the p-value of that coefficient is very small, typically less than 0.05, you should keep that coefficient; i.e. reject the null hypothsis.

Other cool tip, besides gradient boosted regression from scikit-learn, which you can read about on your own, there's Quantile Regression. Which is most suitable for highly skewed variables, like medical insurance claims payouts for example. Quantile Regression benefits in a nutshell are that it's robust to outliers, predicts a range (interval) rather than a point estimate, and beneficial for skewed and/or heteroskesastic variables. There are awesome resources about that like dataman-git/codes_for_articles/From Quantile Regression to Quantile Random Forests.ipynb GitHub repository and it's companion Medium post to walk you through code. There's plenty of other great resources on it out there too, including scikit-learn's documentation if you look up "quantile regression sklearn".

Find duplicates in a list or a Spark df columns

Since df.columns is a Python list, we're gonna use that to find duplicated columns names. The fastest way is using collections.Counter, which returns a dictionary. Then you can run a simple for-loop to find duplicates. source1 source2

from collections import Counter
t1= Counter(df.columns)

def get_duplicates(values):
  return [key for key in Counter(values).keys() if Counter(values)[key]>1]

Or

def has_duplicates(elements):
  counter= Counter(elements)
	
	for key, value in counter.items():
	  if value > 1:
		  raise ValueError("Duplicate score found {}".format(key))

Saving a list or a dictionary - Python

Saving a list to a text file

You can do it with pickle, json, or native python

import json
# to save it
with open("/path/filename.txt", "w") as file:
  json.dump(list_name, file)
	
# later to open it
with open("/path/filename.txt", "r") as file: 
  list_name= json.load(file)

NOTE: Difference between json.load and json.loads is that the latter doesn't read the input as path, but as a String as-is. python - What is the difference between json.load() and json.loads() functions - StackOverflow

Resources python - How to save a list to a file and read it as a list type? - StackOverflow python - Save a list to a .txt file - StackOverflow

Saving a dictionary to a text/json file

It's better to save a dictionary to a json file because you can readily deserialize it. Whereas you need an extra read step if it's a text file. Save a dictionary to a file in python - Delft Stack

import json
with open("/path/filename.json", "w") as file:
  json.dump(dictionary_name, file)
	
# to read it later
with open("/path/filename.json", "r") as file:
  dictionary_name= json.load(file)

Dictionary to Pandas' dataframe

You can load a saved JSON file directly to a Pandas datatframe. And there are a couple of arguments you can control the layout so Pandas read the dictionary saved in the JSON file properly. Particularly, the argument orient pd.DataFrame().from_dict() docs You can pass the JSON object, or the path.

CSV to Pandas' df to dictionary/json

import pandas as pd
df= pd.read_csv("/path/csvfile.csv")
df.to_json()

The last command will create a very long string, as it will convert the whole dataframe to json. Copy that, and paste it in a cell in the notebook. Now save it as a JSON file to any directory. If there's an older version of the file, remove it first with os library. Finally, you can read it back into a dataframe with json library as shown above,

import json
with open("/path/jsonfile.txt", "r") as text_file:
  mydict= json.load(text_file)
	
import pandas as pd
df= pd.read_json(mydict)

Here's another way, in case your JSON already is a dictionary

import pandas as pd
df= pd.DataFrame().from_dict(mydict) #change `orient` argument in this command, if needed

A few other things you can try if changing orient argument failed:

df= pd.read_json("/path_to_json_file.json/", orient="index")

df['colname0'] = df.index

df.reset_index(inplace=True, drop=True)

df.rename(columns={0:'colname1'}, inplace=True)

In this example, we have two columns: "colname0", and "colname1"

Creating and updating a dictionary dynamically, plus itertools

Updating a dictionary in a for loop

Usecase: Create a dictionary with keys being column names that are of String type in a dataframe. And values that are a Tuple of (count of distinct values of that column, first 10 distinct values of that column)

mydict= dict()

for pos, item in enumerate([str(col[0]) for col in df.dtypes if col[1]=='string']):
  temp= df.select(item).distinct()
	val_levels= [row[item] for row in temp.collect()]
	val_count= len(val_levels)
	
	mydict.update({item:(val_count, val_levels)})
	print("Now at {}".format(pos)) # for our benefit to monitor the process

NOTE: in here, I make use of the fact that df.dtypes in Spark lists all the column names and their types in a Tuple (colname, type); similarly to df.printSchema() but without the extra information about nulls. A trick here, is you can get a count of each column type with the following:

from collections import Counter
print(Counter([col[1] for col in df.dtypes]))

In the same command, col[0] is the name of the column, col[1] is its type.

How many columns we have to each type, PySpark

from collections import Counter
print(Counter([col[1] for col in df.dtypes]))

Creating an empty dictionary with known keys, and dynamic lists as values

values= dict([(k,[]) for k in ['col1', 'col2', 'col3', ..., 'coln']])

#then to add values to it, do
values['col1'].append(value_goes_here)
values['col2'].append(value_goes_here)
#... ... ....
values['coln'].append(value_goes_here)

Create combinations of lists, with itertools

from itertools import product, chain
result_list= [*map(list, map(chain.from_iterable, product(list1, list2)))]

Source

Creating UML chart to visualize libraries dependencies

Suppose your company contracted a consultancy to do a project for them. They wrote it in the most complex way, as they usually do, then they hand it off to you. There are tens of folders, with tens of files in each, and each file imports and inherets dozens of other files and classes of them. You have a reliable IDE, I prefer and highly recommend Microsoft's Visual Studio Code for this job, you open the repository main folder in it and you can quickly open and view the inhereted class or file in a new tab. Yet, it's still too much to process in your head and understand enough to modify or fix or do anything with it other than mindlessly clicking Run button.

Before you panic; ok you will panic, but through that, use some tools to help you sort through that gigantic mess. One helpful tool I've been told about, but haven't toyed with myself is a library dependency mapping package.

UML charts explain dependencies and linkage between Python modules, namely .py files. Instead of having to trace it almost manually, even with a smart IDE, you can use pyreverse library which requires Graphiz and possibly Pylint libraries.

Useful os, and shutil library Commands

Deleting all contents of a folder in a path

import os
for item in os.listdir("/path/foldername/"):
  os.remove("/path/foldername/{}".format(item))

Delete an entire folder, with all its contents

import shutil
shutil.rmtree("path_tol_folder")

source

Moving a file from one folder to another

Use os.rename

import os
os.rename("file_old_path", "file_new_path")

source This command can also be used to rename a folder if you clip the name of the file at the end. That is, give the older folder path with name, and the new folder path with name. source

Creating a new folder

import os
os.mkdir("/path/new_folder_name")

Removing a file

import os
os.remove("/path/file_name_and_extension")

Moving a file from one folder to another OR Renaming a folder

Use os.rename("origin path", "destination path") source This can also be used to rename a folder.

import os
os.rename("/path/old_folder_name", "/path/new_folder_name")

Source

Pandas df to Spark DF

Use case: I have a dataframe on my local machine. I want to convert it to parquet and save it to the blob storage.

  1. convert the Excel to CSV from Excel program

  2. upload the CSV to your DataBricks's DBFS (local storage) a. open a local Python interpreter b. read the CSV using Pandas (on Windows, you need to replace the backslash in the path with double backslash, otherwise Windows won't read the path as the single backslashes will be interpretted as escape characters for the one right after them) c. convert the now Pandas dataframe to JSON, using jdf= df.to_json() d. either manually copy the result to a DataBricks notebook cell and save it there as txt file; OR save the result JSON to local machine as text file.

import json
with open("local_path_file.txt", "w") as file:
	json.dump(jdf, file)
e. upload the file to DBFS using the "upload" button, where it will be saved to local DataBricks storage DBFS path

3. read it in DataBricks as Pandas df

import json 
with open("/dbfs/my_folder/file_name.txt", "r") as file: 
	jdf = json.load(file)
pdf= pd.read_json(jdf)
  1. convert the Pandas dataframe to a list of tuples, so Spark can take it; since the data format needed in Spark dfs is a list of tuples.

data= [tuple(item) for item in pdf.to_records(index=False)]

It's important here to specify "tuple" otherwise it reads them as Numpy.records type, which can't be parsed by Spark 5. specify the schema to feed to Spark dataframe creation function. In this case, I had two string columns in the df I want to convert. This process can be daunting if there were many columns. You might want to try to save the Pandas df as CSV to DataBricks' local storage DBFS instead, and see if it works, then read that CSV from Spark, and save it as CSV to blob storage if it's allowed and it works properly. Alternatively, you can look up a way to extract and save the schema of a CSV file in Spark, the same way I showed in Spark Scala.

from pyspark.sql.types import *
	
# first way to specify the schema
data=[tuple(item) for item in pdf.to_records(index=False)]

#second way to define schema
schema= StructType([
StructField("col1", StringType(), True),
StructField("col2", StringType(), True)
])
  1. make sure the dataframe is what you want it to be, spark.createDataFrame(data=data, schema=schema).show()

  2. save it to blob storage

dfname.write.mode("overwrite").option("header", "true").format("parquet").save("blob_storage_path")

JSON to CSV, commands you might need

Read the docs on pd.read_json and I think there's another one pd.from_json

import pandas as pd
import json

df1= pd.read_json("path", orient='index')
df1['col1']=df1.index
df1.reset_index(inplace=True, drop=True)
df1.rename(columns={0:'col1'}, inplace=True)
# in this example, we have two columns

Last updated