Estimating, Partitioning, Writing/Saving a DataFrame

Remember that what works on Datasets, mostly work on DataFrames. In fact, there's no package in the docs about DataFrames; when I need more details about a method that works on DataFrames, I refer to "org.apache.spark.sql.Dataset"

Estimating the Used Java Heap Space

import org.apache.spark.util.SizeEstimator.estimate

estimate(df)

This won't give you the size estimate of the data frame you're working with, or the result one after all the filtering and trimming. Official Spark Scala API docs says about the topic (search "org.apache.spark.util.SizeEstimator") SizeEstimator.estimate : "Estimates the number of bytes that the given object takes up on the JVM heap. The estimate includes space taken up by objects referenced by the given object, their references, and so on and so forth. This is useful for determining the amount of heap space a broadcast variable will occupy on each executor or the amount of space each object will take when caching objects in deserialized form. This is not the same as the serialized size of the object, which will typically be much smaller."

Partitioning, and Repartitioning a Data Frame Before Saving It

It depends on which column you use to partition your data frame, but say you do it by time, then Spark will do very many small partitions if you didn't specify number of partitions you need.

  • coalesce allows you to lower the partition number of the df. I showed you when it was needed when I filled forward, for example.

  • repartition allows you to define the numbers of partitions you want.

  • Other functions sortWithinPartitions , foreachPartition , mapPartitions , and repartitionByRange

  • Go to "org.apache.spark.sql.Dataset" package in the official Spark Scala API docs. Search for "partition" in the page, and read all the functions related to partitioning, and their details.

  • See also "org.apache.spark.sql.DataFrameWriter" package for details about mode and partitionBy among others.

Saving/Writing a DataFrame Example

Saving As Parquet

df.
repartition(500).write.mode("overwrite").
parquet(s"$lake_path/workspace/Haya/testing_0")

Saving As CSV

df.write.format("csv").
option("header",true).
option("inferSchema",true).
save(s"$lake_path/workspace/Haya/testing_0")

This will still save in a folder of CSVs, similar to Parquet. If your data frame is small enough to be in one partition, you can make it into one partition using coalesce(1) such that you'll have one CSV (or Parquet) in the folder you just saved to. Thus, the full saving to CSV becomes,

df.coalesce(1).write.format("csv").
option("header",true).
option("inferSchema",true).
save(s"$lake_path/workspace/Haya/testing_0")

There are other options to control partitions, rewriting etc. You can read about them in the official Spark Scala API docs, under package "org.apache.spark.sql.DataFrame", typed the search box.

Last updated