Array Columns

All about array columns; creating, summing up elements, filtering, etc.

Find the Size of an Array Column

df.select(size("colname")) where you can filter on it too, df.filter(size($"colname")>=2) which returns rows of the array column "colname" that contain two or more elements in the array.

Creating an Array Column

You will need to create an array column for the X's (features) you want to feed in a Machine Learning algorithm in Spark, there are methods from the ML package to do that. I will cover that in the Machine Learning section. Here I show you how to create an array column for general purpose, whatever that purpose might be, it comes in handy sometimes.

withColumn("arrCol", array($"col1", $"col2",...)) Source

Parsing Array of Array Column in a Data Frame

Sometimes, when you have very compressed dataframes, you might end up with a column that is an array of an array like [[1,0,2,1,5]] or [[val1, 12,3],[val2,567]] we use the function explode to unravel the interior array. Credit http://bigdatums.net/2016/02/12/how-to-extract-nested-json-data-in-spark/

A couple of ways to know you have an array of an array column are, - you see [[]] where there should be [] , - when filtering to see the null rows, you get [] instead of null . Can't remember if printSchema would reveal the error by showing two levels of array elements or not, test it out.

☞ NOTE: This function creates new row for every element in the outer array. In my case, I had only one element, which is the interior array [[1,2,3,4,5,6]] so using is once got rid of the outer array. It will Not extract each element to the array into a new stand-alone column.

If you're looking to extract each element of the array to a column of its own, added to the data frame, do it manually by picking that element with direct parentheses, e.g. (2) for the third element; or using .getItem(2) . I couldn't find another way other than manually accessing each element in the array and adding it using withColumn clause.

How I used it

val df1 = df0.
select($"msg.DeviceId", 
       $"bod.dGrps.tStamp"(0).alias("tStamp") ,  
       $"bod.dGrps.data.dispOutTemp.temp"(0).alias("dispOutTemp") ).
withColumn("cl", explode($"cl"))

Here I showed a few things,

  • how to select second level nested column,

  • how to access the first element from multi-level nested arrays, and alias it such that the result column name is what I want, and

  • how to explode array-of-array columns, and add it to the dataframe using a withColumn clause.

You can also use the explode as a function in select clause, DF.select(col("x"),explode(col("y"))) which will show two columns, x, and the unraveled y.

Sum Up All Elements of an Array Column, row-wise

df.
selectExpr("*",
"aggregate(col1,BIGINT(0),(x,y) -> x + y) as summedCol1", 
"aggregate(col2,BIGINT(0), (x,y) -> x+y) as summedCol2", 
"aggregate(col3, BIGINT(0), (x,y)-> x+y) as summedCol3")

I got the idea from here. Where "*" gets all the other columns for you in the dataframe, as what you're used to in SQL to select all columns. The other expressions in selectExpr clause are the new columns that are the sum of all elements from the array columns col1, col2, col3. BIGINT(0) is the initial value; my columns here were all LongType that is long integer, so I needed the initial value, which is zero, to be of the same type. Remember that Spark Scala is stringent about types. The rest of the syntax (x,y) -> x+y remains the same whether you have 2 or 10 elements in your array columns, even if number of elements varies between one row and the next. Neat, isn't it?

Explanation: The syntax is, aggregate(colname, initial_value, lambda_function) The initial value should be of same type as the values you sum, so you may need to use "0.0" or "DOUBLE(0)" etc if your inputs are not integers. This explanation website: higher order functions tutorial has a lot of other important information that I couldn't find anywhere else, and I Googled for 3 days straight. higher order functions tutorial also has some good information about the type of lambda functions one can use, and how to chain them together. Worth noting that aggregate in SQL as used here is an alias for reduce (as I learned on this website) so look in reduce section in this website. Find here a longer answer.

Element-Wise Operations on Array Columns

You can use SQL context with selectExpr like we did above, which is great. I personally am not familiar with this technique, and thus can't create good code with it myself. Instead, I found that I can use UDFs to do element-wise operations on any ArrayType column. I didn't test that in Spark Scala myself, I used it only in PySpark. For details, explanation, and examples, refer to PySpark page group, then UDF page of it.

Other Functions allowed on Array Columns' Elements

You can do all sorts of stuff, like getting an index of an element meeting a certain condition, or removing an element, etc. All other possible functions on array columns, and what they do are found here and in Spark Scala official docs under "org.apache.spark.sql.functions". e.g. find ( ⌘ + F ) "index", "position", or "remove" in the page to find related functions to do that.

EXTRA: explode Function

Source If you have array column, this function repeats rows to have one element per row.

Example

inventory.withColumn("newCol", explode('emp_details)).show(false) gives new dataframe,

Thus, each row is repeated to as many rows as there are elements in the arrays of the exploded column. This is useful if we want to group by these elements.

Last updated