Merging and Cleanup Duplicate Columns
Before joining data frames, making sure no columns are repeated between tables
If you have the same column name in two tables, after joining you'll have two columns of the same name in the dataframe. This will cause disambiguity errors when trying to select or operate on one or the both of them. Dropping the column will drop all that have the same name. Thus, make sure you don't have any columns of the same name between any of your tables.
In my case, I had over 14 tables to join, each having more than 200 columns, there's no way to do that manaually. In the code below, I show a programatic way; though it's fragile and may not execute property to each case, it works. You do need to verify and tweak it according to your needs.
Imports and setup
Some needed global variables
The timestamp column is used for ordering, the id column is used for partionining, and the target variable used for correlations and predictions. I will used the same random seed througout the project notebooks. The columns above, target, id, timestamp, should also be omitted of the predictors set when you're predicting. Singling them out help keep in mind to omit them before predicting.
Exploring files in a blob storage path. This cell is DataBricks specific
This function returns a list of the folders names only, whereas dbutils.fs.ls
returns a list of the folder names, and other information about it.
Reading in tables, and creating lists
In the next step, read in the parquet files for your wanted tables, in my case, there were 14 of them. table1= spark.parquet.read("path_to_table1")
etc. Then I'm going to create a list of these tables, for ease of looping through for repeated procedures. I'm also going to create a string list of the corresponding tables names, for ease of tracking and knowing what is where.
In my case, it's probably commonplace, that you will have some id columns exists in some tables but not others, and other tables have other id columns to merge on. So you need to locate the shared id columns to merge tables on. First, list all the id columns you have, can be 2 or more. known_id_columns=["idcol1", "idcol2"]
Adding counts of a collection of columns, if needed
Sometimes, you will have a repeated quantity with different values. For example, diagnosis or procedure codes per patient, purchases per customer, etc. It's a powerful statistical feature to find how many we have of those columns. Next snippet counts them.
Now simply apply it for however many families of columns you need to count, I had more than 2 in my case. table1= count_codes(df1= table1, colfamily="diag_cd")
IMPORTANT: in my case, the family of columns were all diagnosis codes, that started with "diag_cd" but took values between 2 and 25, so they were like "diag_cd2", "diag_cd3", etc. Whereas the other family I needed to count started with "icd_proc_" because they were of the format "icd_proc_02" etc. I made the code to capture the starting phrase of the family of columns I wanted to count. You modify your code as you see fit.
Table explorations needed before joining
Things to consider: If you don't need all the columns from all the tables, in this step you reassign the tables variable names with the new columns selections table1= table1.select(...)
Remember to always select the id columns and the timestamp from the tables as well!
This step is needed exploration for joining the tables; because tables with more count, may have more of the ID column and thus more information we don't want to lose. Other things to consider, refresh rate for each table, that is, when was the table last updated? and how often it gets updated? While you can ask your data stewards or engineers about that, don't take anyone's word for it, do the exploration yourself, and it's a simple query. See details below, under "Explorations before joinig" paragraph.
Sorting tables w.r.t. row count
So we can use the appropriate type of join to favor the most important one in your opinion; or to favor the bigger, and more recently updated table. This is going to be partially manual step, you put them in a list from biggest and most updated, to least updated and smallest.
This snippet to get counts, and sort them in descendant order, such that, all being equal in refresh rate, we can always use "left" join to favor the bigger table.
NOTES
[2758781, 3104096524, ...]
this is a list of the row counts each table intables_list
, preserving order. We get this withtable.count()
command. This can be computationally expensive, so it's up to you whether you wanna break it down and do it one at a time, or decide you can handle it in a for-loop in one cell, which might break, depending on your cluster capabilities. I usually prefer to break down heavy computations as much as possible to keep things flowing and avoid bottle necks. I noticed that after a particularly big query that pushes the cluster limits, it become bogged down and not as responsive or fast in executing the next commands, even on a good cluster mangement software like DataBricks.ordered_tables_list
is the same asordered_tables_list_names
but as variables, not as text. This is a manual step for me; copy, paste, and remove quotations, since I kept the variable names same as their string names.
Rename shared ID columns to avoid confusion after merging
So I don't end up with duplicated id columns in the same dataframe, Spark won't distinguish them and I can't resolve that then.
Cleaning up repeated columns between tables, to avoid trouble after merging
The idea is,
start with the first table columns list
in the loop: a. get the second table columns b. IF any of the new table columns are NOT an id column, AND NOT in current existing list of previous tables, keep it; otherwise, drop it. c. add set of new columns to the bag of columns, and repeat
IMPORTANT: Note how this snippet is ran only after we renamed the id columns according to each table's place in the orderd list of tables. If we don't rename the id columns first, we will lose them in all tables but the first, meaning we can't merge tables.
Another way to de-duplicate columns, but less efficient,
Joining tables on shared id columns
In my usecase, I had the first four tables in the ordered list, of the same refresh rate, monthly, and of the same size, each contained a different set of columns, some are shared, and had one of the id columns in between them. Only one of them would have an id column to join some of the other tables.
Joining first four tables in the ordered list on same id column, with inner join because of same refresh rate and same size
Joining another table to this merged one, on a different id column
clean_tables[12]
is far down the ordered list, this is a table that is small and isn't refreshed frequently. Thus I favored the former with a "left" join.
Joining the rest of the tables on yet another id column "idcol2"
Now I have the rest of the tables all have the same id column, and it exists in the big merged table now.
In order to do that, and not repeat codes for 9 tables, I need to create a shorted list of the previously ordered and cleaned list of tables; one that doesn't contain the tables I joined so far. Here the tables I need are in locations 4 through 11 and the 13th one.
Also to not repeat myself while joining, and because I already have a copy of the id column with the suffix "_LOC0"
in the merged table, I'm going to rename all the previuosly renamed idcol2 with suffix "_LOC"
to one without the suffix so I put the joins in a loop. Also, because the tables are already ordered, in a descendant fashion, I'm using "left" joins from one to the next
Joining cleanup
To see the id colums leftovers, print(sorted([item for item in merged.columns if "_LOC" in item]))
We only need one copy of each, so delete the rest manually merged= merged.drop("idcol3_LOC9", "idcol2_LOC7")
Undo the id columns renaming, now that there's no duplicated columns anywhere
To get rid of the "_LOC"
suffix
Explorations before joining
Seeing ID columns in each table
Like I said, I had more than 2 id columns between all the tables, I had more than 10 actually, and it became useful to see which tables have which columns. Snippet below reveal that in a Pandas dataframe
You'll have something like this,
table1
0
1
1
table2
1
1
0
...
Then you can locate which tables have a particular id column easily, id_df[id_df["idcol1"]==1].astype('int32')
Columns overlap between tables
When you merge tables from a SQL database, some of these tables might have more than id columns in common. This duplication of columns will cause you a major headache after joining; because Spark will not distinguish between the same column name across tables, thus trying to select a column will through an "ambiguous" error, and dropping the duplicated column will drop all of them and you won't have it anymore. In the code above, I showed you how to tackle this by de-duplicating the common columns between tables, and renaming id columns that you need to join on. Here, I show you more or less an exploratory step to see how many columns you have more than 1 of in each table. This might not be very usefull if you have too many columns, because you'll end up with a too big of a dataframe to visually scan, but you can still do more statistics on it and get an idea.
And if it's too big, here's the aggregated version of it, to see which columns have more than one occurance,
Don't forget you can wrap a dataframe with display()
in DataBricks to get the downloadable CSV to your local machine, if you wanna share it with stakeholders, or do some Excel things with it.
This will show you a table like this,
table1
0
0
1
1
table2
1
0
0
1
...
Testing refresh rate
This is done by simply checking the last timestamp available, and cross-referencing that with the date you collected it on. Repeat that daily, weekly, and monthly to verify refresh rate, and/or to verify your data stewards/engineers word
This will have a printout like this table1
2022-02-27 05:28:05
...
...
Show data range, via oldest and newest record
dates_results1
Will look something like this,
table1
2018-09-19 15:36:26
2022-02-27 05:28:05
table2
2022-02-25 23:35:24
2022-02-26 02:04:45
...
...
...
Similarly for dates_results2
Samples of dataframe to test merging
Joining tables is one of the expensive processes to do with Spark, you might want to take a small sample of the tables you're trying to join, so you test out on and check their data, until you're confident in the direction of your joins and everything else. To do so, I'm gonng loop through all the tables, and take a random 100 rows of them
Last updated
Was this helpful?