TechnicalWriting

Scott Ralph's Personal Blog about Scala, Spark, Big Data, ML, and AI


Project maintained by scottralph Hosted on GitHub Pages — Theme by mattgraham

Sept. 5, 2019, Using Fold-Left with Case Class to Construct DataFrames

Context of the problem

I often find myself with a DataFrame consisting of a large number of features, each of which needs to be sanitized before analysis. For example, we may want to take a string column and set it to the same value if it is either Null, or the empty string (this is just a simple illustrative example).

Lets assume that there are a number of feature columns:

val featureCoumnNames = Set("first-name", "last-name", "city")

and that these are columns in the DataFrame:

scala> df.printSchema
root
 |-- first-name: string (nullable = true)
 |-- last-name: string (nullable = true)
 |-- city-name: string (nullable = true)
 .
 .
 . (others)

We want to sanitize each of these string columns by replacing the value of all null and empty entries with the token “nil”.

The Spark SQL UserDefined function is straightforward:

import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
val nullName = "nil"

val mapNullAndEmptyFunc: String => String = (x: String) =>
  if (x == null) nullName
  else if (x.isEmpty) nullName
  else x

val mapNullAndEmptyUdf : UserDefinedFunction =
  udf(mapNullAndEmptyFunc, StringType)

To keep things clear, we want to keep track of the column names of prior- and post-transformation. For this two simple case class definitions:

case class ColumnMapping(from : String, to: String)
case class SanitizedColumnResult(dataFrame : DataFrame,
                                  mappings : Seq[ColumnMapping])

The SanitizedColumnResult’s dataFrame field has the transformed DataFrame, and the mappings field has the mappings.

One way to do the mapping is to have a variable DataFrame, and iteratively make calls such as:

var dfNew = df
featureCoumnNames.foreach(f => {
  dfNew = dfNew.withColumn("some-new-name", ___)
})

But this seems ugly. So, another cleaner way is to create a method on the SanitizedColumnResult case class that takes care of the adding the columns and keeping track of the mapping.

val sanitizedSuffix = "_san"

case class SanitizedColumnResult(dataFrame : DataFrame,
                                 mappings : Seq[ColumnMapping]) {
  def apply(col : String) : SanitizedColumnResult = {
    val sanName = col + sanitizedSuffix
    val srcCol : Column = dataFrame(col)
    val newDataFrame = dataFrame.withColumn(sanName,
      mapNullAndEmptyUdf(srcCol))
    SanitizedColumnResult(newDataFrame, mappings :+
      ColumnMapping(col,sanName))
  }
}

The apply method takes a column name and produces a new SanitizedColumnResult containing a new DataFrame and an additional column mapping. This allows us to make a very clean call using the sequence foldLeft() method:

def sanitize(dataFrame : DataFrame,
              columns : Seq[String]) : SanitizedColumnResult = {
  columns.foldLeft(SanitizedColumnResult(dataFrame,
                   Seq.empty[ColumnMapping]))
                   ((acc,c) => acc.apply(c))
}

Or, to describe the solution in a more generic way:

  case class Acuumulator(/* helpful fields here */) {
    def apply(x : Element) : Accumulator
  }
  val initial : Accumulator = someExpression
  val x : Seq[Element] /* The elements to process */
  val result = x.foldLeft(initial)((acc,x) => acc(x))

One of the nice features of this approach is that if we wanted to perform different types of mapping, for example, based on the column type, etc., then we can easily add this complexity into the case class apply method, and the logic remains the same.

Other important advantages of using the fold-left and case class are:

Home