A while back I described a Hadoop job that I implemented with Scalding (Distilling the Newest Record with Scalding). To recap, the goal was to take a huge list of “facts”, each containing a single timestamped fact about a large piece of system data. The goal is to get a recombined version of a domain object at a given time.
1 2 3 4
The first version of this job I wrote used a built-in JSON parser. Turns out that’s an iffy approach, so I turned to the Argonaut library to parse my JSON into well structured Scala structs.
This code is almost literally off the Argonaut examples. I was really impressed at how easy this was.
1 2 3 4 5 6 7 8 9
This allows me to take a string and call
decodeOption on it to get a Option[Fact].
One of the things I really wanted to explore was splitting up the large job into an aggregate of lots of small jobs. The best way to do that of course is using functions.
Here’s an easy one to do the JSON parsing:
1 2 3 4 5 6
This takes a
TypedPipe[String] and for each string, transforms it into a
TypedPipe[Fact], or just throws away anything that didn’t parse.
Getting the input and parsing
Actually fetching input, and working with it to make output is easy. We
assemble our small functions with Scala’s
andThen combinator. This makes one
large function that is named
job, which we then run with the input.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
Finishing up the job
Here is an example of the whole pipeline I have written. It did take me a bit
to figure out how
filterByType could be parameterized by the thing I was filtering.
1 2 3 4 5 6 7 8 9 10 11
The filtering is easy. But there’s a little trick in that the
TypedPipe into a
Grouped, which has two type arguments – the
“group key” and the type of the values that match that key.
Second note: the custom sorting of facts was a hurdle I had to get over, turned out to be easy – just define the sorting, and call it in a rather unintuitive way.
1 2 3 4 5 6 7 8 9 10 11
I won’t go into all the pieces of my whole pipeline, most of it isn’t all that interesting, but I do want to note that you can return more or less records from a pipe than came in. It doesn’t have to be a 1:1 tranformation.
For example, I needed both the date and the full datetime in my observation
domain object. This function does that for me by splitting the pipe in two
with filters, sidelining the uninteresting half (
returning multiple values from the
measurement_at_pipe. Finally the
sidelined pipe can be merged back into the stream.
1 2 3 4 5 6 7 8 9 10 11 12 13 14
I really like the Typed api for writing jobs. The Scala compiler informs you of errors (which is a much faster testing cycle than waiting for Hadoop to run a compiled jar and fail at some point. That makes it a 10 second response time, versus a 5 minute response time).
In addition, it’s so much easier to keep track of real classes and work on them, than trying to track sets of untyped, named fields.
So use the TypedApi, and let Scala do more of the work for you.