Schema: Extracting, Reading, Writing to a Text File

Saving and Loading Schema of a DataFrame to .txt File

We don't get into how to create a schema for data frame in this page, I leave that to "Creating a Data Frame" page.

Extracting Schema of a Data Frame

Schema is the structure of a dataframe, i.e. column names, types, and nullability. It can get pretty complicated if you have nested columns, that is nested JSONs, which is common to be found in parquet files. In my case, in order to reach a variable I wanted, there was sometimes 7 layers.

By the way, if you have Big Data, you don't need the whole thing to get the schema, a small sample of it would do. It seems like parquet is really good in preserving and remembering its schema even for a sample where not all the wanted nested variables might be present. In my case, I was dealing with event data; that is, data comes in for every variable when it changes; so if nothing changes in the variable we're measuring for a while, we won't see it in the data during that while. But seems like parquet still remembers the schema nonetheless, which is great.

So how would you extract it from an existed data frame?

The main idea is we need a StructType (can be also JSON), to apply it to the column we want to unpack in the dataframe; using from_json() function in withColumn(). The following steps can be summarized like this, if we omit steps of writing and reading text files,

//1. read target column as List of String
val p1 = df.select("wantedCol").as[String].collect.toList

//2. make it RDD and get its schema, then convert it to JSON
val p2 = sc.parallelize(p1)
val p3 = spark.read.json(spark.createDataset(p2))
val p4 = p3.schema
val p5 = p4.json
println("Now p5 is ", p5.getClass.getName)

//3. now convert it to StructType, so you can apply it
val p6 = DataType.fromJson(p5).asInstanceOf[StructType]
println("Now p6 is ", p6.getClass.getName)

//4. apply it (also a checking step)
var testingDF = df.withColumn("newCol", from_json($"wantedCol", p6))
testingDF.printSchema

You should see now all old columns in your data frame, plus the newly created column, having a list of all the variables it contains on its own, which you couldn't access directly before.

The main two commands above come from this answer.

Writing Schema of a Data Frame

You actually want to save the JSON object at this step; named p5 in the above code block.

We will repeat what we did in "Writing and Reading a Text File" page, but no new-line character here,

import java.io._

val theSchema = new File("myPath/wantedColScalaSchema.txt")
val bw = new BufferedWriter(new FileWriter(theSchema))
bw.write(p5)
bw.close()

Source gives a more concise way to write it, doing,

import java.io.PrintWriter
new PrintWriter("filename") { write("file contents"); close }

I haven't actually try it myself, but it's there for you.

☞ NOTE: Worth mentioning that sometimes you need to unpack the same column another time if it's too dense and you still can't directly access the variable you want doing colname.var If you need to do that, you repeat the process above, and replace wantedCol with newCol from the step before. See "Accessing Variables in Data Frames" page of this book.

Reading and Applying Schema to a Data Frame

Now you have a JSON schema saved as a .txt file. To read it and apply it, you need to convert it to StructType first,

import org.apache.spark.sql.types.{DataType, StructType}

//1. add the text file to Spark
val file1 = sc.textFile("lake_path_to_string_schema.txt").collect()(0) 
//now it's a (JSON) string

//2. convert the JSON string to StructType
val theSchemaStruct = DataType.fromJson(file1).asInstanceOf[StructType]

//3. apply it
val newDF = df.withColumn("newCol", from_json($"wantedCol", theSchemaStruct))
newDF.printSchema

Now you have the blown up column "newCol" for you to access whatever you like, with "newCol.varName". See "Accessing Variables in Data Frames" page of this book for more details.

Last updated