Typed Scalding Pipes

Quick Recap

A while back I described a Hadoop job that I implemented with Scalding (Distilling the Newest Record with Scalding). To recap, the goal was to take a huge list of “facts”, each containing a single timestamped fact about a large piece of system data. The goal is to get a recombined version of a domain object at a given time.

A Fact

{ "asserted_at": "2014-05-01T04:02:56Z",
  "subject": "device:123",
  "property": "serial_number",
  "value": "V29B044" }


The first version of this job I wrote used a built-in JSON parser. Turns out that’s an iffy approach, so I turned to the Argonaut library to parse my JSON into well structured Scala structs.

This code is almost literally off the Argonaut examples. I was really impressed at how easy this was.

import argonaut._
import Argonaut._

case class Fact(asserted_at: String, subject : String, property: String, value: Json)

object Fact {
  implicit def FactCodecJson : CodecJson[Fact] =
    casecodec4(Fact.apply, Fact.unapply)("asserted_at", "subject", "property", "value")

This allows me to take a string and call decodeOption on it to get a Option[Fact].


One of the things I really wanted to explore was splitting up the large job into an aggregate of lots of small jobs. The best way to do that of course is using functions.

Here’s an easy one to do the JSON parsing:

def parseJsonAsFact(pipe : TypedPipe[String]) : TypedPipe[Fact] = {
    .map    { _.decodeOption[Fact] }
    .filter { _.nonEmpty }
    .map    { _.orNull }

This takes a TypedPipe[String] and for each string, transforms it into a TypedPipe[Fact], or just throws away anything that didn’t parse.

Getting the input and parsing

Actually fetching input, and working with it to make output is easy. We assemble our small functions with Scala’s andThen combinator. This makes one large function that is named job, which we then run with the input.

val input_file  = args.getOrElse("input",  "/master-dataset")
val output_file = args.getOrElse("output", "/output")

// Everything is stored as a SequenceFile, where the key is the timestamp it was recorded.
val source   = WritableSequenceFile[DoubleWritable, Text](input_file, ('sequenceFileKey, 'factJSON))
val rawInput = TypedPipe.from(source)
val input    = rawInput.map { _._2.toString } // TypedPipe[String]

// This is a single column, so Tsv is misleading, no tabs will be output
val output = TypedTsv[String](output_file)

// Build up a large function that is our entire pipeline.
val job = parseJsonAsFact _ andThen
          ///// More steps here.


Finishing up the job

Here is an example of the whole pipeline I have written. It did take me a bit to figure out how filterByType could be parameterized by the thing I was filtering.

// TypedPipe[String] => TypedPipe[String], which is handily our input and output types.
val job = facts.parseJsonAsFact _                        andThen   //    TypedPipe[Fact]
          (facts.filterByType _).curried("observations") andThen   // => TypedPipe[Fact] (only observation related ones)
          facts.filterNewest _                           andThen   // => TypedPipe[Fact] (only the newest of any given subject/property)
          createMeasurementDate _                        andThen   // => TypedPipe[Fact] (new records with measurement_date in the stream)
          mergeObservations _                            andThen   // => TypedPipe[Observation] combine facts into observations
          renderAsJson _                                           // => TypedPipe[String] observations spun out as json

def filterByType(filter : String, pipe : TypedPipe[Fact]) : TypedPipe[Fact] = {
  pipe.filter { _.subject.startsWith(filter) }

The filtering is easy. But there’s a little trick in that the .groupBy call changes a TypedPipe into a Grouped, which has two type arguments - the “group key” and the type of the values that match that key.

Second note: the custom sorting of facts was a hurdle I had to get over, turned out to be easy - just define the sorting, and call it in a rather unintuitive way.

def filterNewest(pipe : TypedPipe[Fact]) : TypedPipe[Fact] = {
    .groupBy { fact : Fact => (fact.subject, fact.property) } // => Grouped[Fact, (String, String)]

object AssertedAtOrdering extends Ordering[Fact] {
  def compare(a:Fact, b:Fact) = a.asserted_at compare b.asserted_at

I won’t go into all the pieces of my whole pipeline, most of it isn’t all that interesting, but I do want to note that you can return more or less records from a pipe than came in. It doesn’t have to be a 1:1 tranformation.

For example, I needed both the date and the full datetime in my observation domain object. This function does that for me by splitting the pipe in two with filters, sidelining the uninteresting half (not_measurement_at_pipe), returning multiple values from the measurement_at_pipe. Finally the sidelined pipe can be merged back into the stream.

def createMeasurementDate(pipe : TypedPipe[Fact]) : TypedPipe[Fact] = {
  val measurement_at_pipe     = pipe.filter    { fact : Fact => fact.property == "measurement_at" }
  val not_measurement_at_pipe = pipe.filterNot { fact : Fact => fact.property == "measurement_at" }
  val converted_pipe = measurement_at_pipe
    .flatMap { fact : Fact =>
            fact.copy(property = "measurement_date",
                      value    = jString(fact.value.stringOr("0000-00-00").substring(0, "yyyy-mm-dd".length)))

  converted_pipe ++ not_measurement_at_pipe

Final Thoughts

I really like the Typed api for writing jobs. The Scala compiler informs you of errors (which is a much faster testing cycle than waiting for Hadoop to run a compiled jar and fail at some point. That makes it a 10 second response time, versus a 5 minute response time).

In addition, it’s so much easier to keep track of real classes and work on them, than trying to track sets of untyped, named fields.

So use the TypedApi, and let Scala do more of the work for you.