Difference between revisions of "Guinea Pig"

From Cohen Courses
Jump to navigationJump to search
Line 1,027: Line 1,027:
 
reused from planning episode to planning episode, unless you ask them
 
reused from planning episode to planning episode, unless you ask them
 
to be with the --reuse option, below.
 
to be with the --reuse option, below.
 +
 +
'''Hadoop options:''' Again in Hadoop mode, setting the options <code>hopts=foo</code> or <code>hdefs=foo</code> will pass options in to the Hadoop commands used to build this view.
  
 
=== Reusing previously computed views ===
 
=== Reusing previously computed views ===

Revision as of 11:34, 28 October 2015

Quick Start

What you should know before you start

Guinea Pig is similar to Pig, but intended to be easier to learn and debug. It's probably a good idea to be somewhat familiar with Pig before you start this tutorial.

I also assume a reading knowledge of Python and core Unix commands. If you're going to use Hadoop, you should also be somewhat familiar with (at least) the Hadoop file system and the hadoop fs subcommands.

Aside: versions

The most default, and most, recent version of GuineaPig is imported when you use import guineapig. Most of the changes between 1.1 and 1.3 are not API changes, so they should not affect the tutorial. If you want to use a particular version of Guinea Pig, then import it explicitly.

Installation and Running wordcount.py

Download the tutorial tarball and unpack it. As an alternative: you can also get the code by cloning the git repository https://github.com/TeamCohen/GuineaPig.git and run it by simply putting the directory that contains the guineapig.py file on your PYTHONPATH.

The tutorial contains a contains a few small corpora and the file wordcount.py which has this code:

from guineapig import *
import sys

# supporting routines can go here                                                                                                         
def tokens(line):
    for tok in line.split():
        yield tok.lower()

#always subclass Planner                                                                                                                  
class WordCount(Planner):

    wc = ReadLines('corpus.txt') | Flatten(by=tokens) | Group(by=lambda x:x, reducingTo=ReduceToCount())

# always end like this                                                                                                                    
if __name__ == "__main__":
    WordCount().main(sys.argv)

Then type the command:

% python wordcount.py --store wc

After a couple of seconds it will return, and you can see the wordcounts with

% head gpig_views/wc.gp

Understanding the wordcount example

A longer example

In the example above, wc is a GuineaPig view. I use the term "view" for both (1) a procedural definition of how to process data, using a particular set of operators and (2) data produced by a view. The rest of this tutorial will help you understand views and how to write them. To start, the tutorial directory also has a less concise but easier-to-explain wordcount file, longer-wordcount.py, with this view definition:

class WordCount(Planner):
    lines = ReadLines('corpus.txt')
    words = Flatten(lines,by=tokens)
    wordCount = Group(words, by=lambda x:x, reducingTo=ReduceToCount())

You see this is actually defined in three phases:

  1. read the lines from a file,
  2. convert the stream of lines into a stream of tokens, and
  3. one to count the tokens.

I'll use this example to explain some of the key ideas in GuineaPig.

Objects instead of records, and functions instead of fields

GuineaPig has no notion of records or fields. Instead, views contain sets of Python objects. Conceptually the sets are unordered and can only be streamed through sequentially. These objects can be anything: e.g., the output of ReadLines is a stream of Python strings, one per line. (More usually the objects are tuples.) I will use the term row for an object in a view.

To access part of an row, as when you 'Flatten' or 'Group' by a field, you use a function that takes the object and returns the appropriate subpart. Many of the view constructors in Guinea Pig include a keyword by=.... which returns part of a row. This part can be a pre-computed property of a row (like a field in a database setting), or any arbitrary function that it applied to the row. In the 'Flatten' view in this example, the 'by' keyword specifies the function tokens, which is a Python function that iterates over the tokens. The 'by' function for a 'Flatten' view should convert a single row into an iterator of objects, which are then spliced together, producing - in this case - a stream of tokens.

Python also lets you construct an unnamed function on-the-fly with the lambda key word. The 'by' argument in the 'Group' view here is just the identity function, so here we're grouping by an entire row (which at this point, is just a token).

The Group view is quite flexible. In the absence of a "reducingTo" keyword, the result of grouping is a tuple (key,[row1,...,rowN]) where the rowi's are the rows that have the indicated "key" (as extracted by the "by" clause). So, if we had used the line

wordCount = Group(words, by=lambda x:x)

then the resulting rows would be tuples like ("aardvark", ["aardvark","aardvark",...]) where the list is the list of all occurrences in the corpus of the token. Instead here we used a "reduceTo" argument, which is an optimization. The value for a "reduceTo" keyword should be an instance of the ReduceTo class. As an example, the ReduceToCount() class is such a subclass, defined as

ReduceTo(int,by=lambda accum,val:accum+1)

More generally when you defined a "ReduceTo" object, the first argument is the type of the output (and defines the initial value of the accumulator, which here will be the output of int(), or zero) and the second is the function that is used to reduce values pairwise.

Once again: note the use of functions as parameters in Group and Flatten. Guinea Pig has no notion of records: rows can be any python object (although they are often tuples). So if you want to do the equivalent of extracting a field from a record, you should use a function. Python's lambda functions and argument unpacking features make this fairly convenient, as the examples below show.

Using the pipeline

To use this pipeline: if you type

% python longer-wordcount.py

you'll get a brief usage message:

Guinea Pig 1.3.2 (c) William Cohen 2014,2015
usage: python longer-wordcount.py --(store|pprint|tasks|plan|cat) VIEW [OPTIONS] [PARAMS] --reuse VIEW1 VIEW2 ...
       python longer-wordcount.py --list

Subcommands that take a VIEW as argument:
 --tasks VIEW: print the abstract map-reduce tasks needed to materialize the VIEW
 --pprint VIEW: print the data structure associated with the VIEW
 --plan VIEW: print the commands that invoke each abstract map-reduce task
 --store VIEW: materialize the named VIEW and store it in the view directory
 --cat VIEW: store the VIEW and then print each line to stdout
The --list subcommand lists possible VIEWs defined by this program.

OPTIONS are specified as "--opts key:value,...", where legal keys for "opts", with default values, are:
  echo:0
  viewdir:gpig_views
  parallel:5
  streamJar:/usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.2.0.1.3.0.0-107.jar
  target:shell
Values in the "opts" key/value pairs are assumed to be URL-escaped.

PARAMS are specified as "--params key:value,..." and the associated dictionary is accessible to
user programs via the function GPig.getArgvParams().

There's more help at http://curtis.ml.cmu.edu/w/courses/index.php/Guinea_Pig

Typing

% python longer-wordcount.py --list

will list the views that are defined in the file: lines, words, and wordCount. If you pprint one of these, say wordCount, you can see what it essentially is: a Python data structure, with several named subparts (like words), which pprint will present as a tree. (The vertical bars in this printout are just to indicate the indent level, by the way, so don't confuse them with the "pipe" notation used for chaining together views.)

wordCount = Group(words,by=<function <lambda> at 0x10497aa28>,reducingTo=<guineapig.ReduceToCount object at 0x104979190>)
| words = Flatten(lines, by=<function tokens at 0x1048965f0>).opts(stored=True)
| | lines = ReadLines("corpus.txt")

Returning to the previous wordcount example, the "pipe" notation is just a shortcut for nested views: words = ReadLines('corpus.txt') | Flatten(by=tokens) is equivalent to words = Flatten(ReadLines('corpus.txt') , by=tokens) or to the two separate definitions for lines and words above. The named variables (like words) can be used, via devious Pythonic tricks, to access the data structures from the command line. Hence the --store command above.

To store a view, GuineaPig will first convert a view structure into a plan for storing the view. To see this plan, you can type something like

% python longer-wordcount.py --tasks wordCount

and the output will tell you the sequence of map-reduce tasks that are needed to produce the view wordCount. In this case, there's one task, which is described by a short "explanation" for humans, and by a shell command that implements the task for computers:

======================================================================
map-reduce task 1: corpus.txt => wordCount
 - +-------------------- explanation --------------------
 - |  read corpus.txt with lines
 - |  flatten to words
 - |  group to wordCount
 - +-------------------- commands --------------------
 - |  python longer-wordcount.py --view=wordCount --do=doGroupMap < corpus.txt | LC_COLLATE=C sort -k1 | python longer-wordcount.py --view=wordCount --do=doStoreRows > gpig_views/wordCount.gp

If you just want the commands you can use the command: python longer-wordcount.py --plan wordCount And in fact, if you typed

python longer-wordcount.py --plan wordCount | sh

this would be equivalent to python longer-wordcount.py --store wordCount, modulo some details about how errors are reported.

Looking at the command is actually useful:

python longer-wordcount.py --view=wordCount --do=doGroupMap < corpus.txt \
    | LC_COLLATE=C sort -k1 \
    | python longer-wordcount.py --view=wordCount --do=doStoreRows \
    > gpig_views/wordCount.gp

This is how GuineaPig works: the view definition (a data structure) is converted to a plan (a shell script), and the shell script is then executed, starting up some new processes while it executes. Going back to the example: The final result of the 'wordCount' plan to store the view wordCountis to create a file called wordCount.gp in a certain directory, the GuineaPig viewdir, which is by default gpig_views.

These new processes invoke additional copies of python longer-wordcount.py with special arguments. If you look at the plan for the view 'words', for instance, you will see:

python longer-wordcount.py --view=words --do=doStoreRows < corpus.txt > words.gp

which tell Python perform smaller-scale operations associated with individual views, as steps in the overall plan. You can invoke these substeps yourself if you want, and this is sometimes useful for testing: for instance, if you could type:

% python longer-wordcount.py --view=words --do=doStoreRows
this is a test:
hello world!
^D

To see what the 'words' step actually does on a sample input.

As another example,here's what the output of the doGroupMap operation looks like:

wcohen@shell2:~/pyhack/guineapig/tutorial$ head -1 corpus.txt 
 Quite a difference. Think it has anything to do with all that coke?
wcohen@shell2:~/pyhack/guineapig/tutorial$ python longer-wordcount.py --view=wordCount --do=doGroupMap < corpus.txt| head -10
'quite' 'quite'
'a'     'a'
'difference.'   'difference.'
'think' 'think'
'it'    'it'
'has'   'has'
'anything'      'anything'
'to'    'to'
'do'    'do'
'with'  'with'
Traceback (most recent call last):
  File "longer-wordcount.py", line 19, in <module>
  ...
  [Python's usual complaint's about writing to a "head" command]
  ...
IOError: [Errno 32] Broken pipe

The steps in a plan do not correspond one-to-one with the steps in the view definition: some optimization happens. We'll talk about this a little more below when we discuss debugging techniques.

Using Hadoop

The motivation for doing all this is because this sort of process can also be distributed across a cluster using Hadoop streaming. On a machine with hadoop installed, you can easily generate an alternative plan that uses Hadoop streaming:

% python longer-wordcount.py --plan wordCount --opts target:hadoop,viewdir:/user/wcohen/gpig_views

This produces a messier-looking plan that will store wordCount on HDFS using a series of Hadoop streaming jobs. To do this planning (let alone execution), we'll need to manually do a little setup: in particular, we need to first create a HDFS location for GuineaPig, and give a complete, non-relative path to that location with the viewdir option. You also need to tell GuineaPig where to find the Hadoop streaming jar, via the streamJar option or the GP_STREAMJAR environment variable.

To summarize, you should be able to do something like

% hadoop fs -mkdir /user/wcohen/gpig_views
% export GP_STREAMJAR=/some/place/on/your/hadoop/installation
% hadoop fs -copyFromLocal corpus.txt /user/wcohen
% python longer-wordcount.py --store wordCount --opts target:hadoop,viewdir:/user/wcohen/gpig_views

to run the wordcount example on Hadoop.

Note: as of version 1.3.2: values of keys given in the --opts command are assumed to be URL-escaped, so it's now possible to pass in a viewdir location that contains a colon. For example, you can pass in the view directory hdfs://localhost:9000/users/wcohen/gpig_views with --opts viewdir:hdfs%3A//localhost%3A9000/users/wcohen/gpig_views.

For the GHC cluster the right location for GP_STREAMJAR is /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.1.jar.

Passing options in to Hadoop

GuineaPig will generally generate all the Hadoop options that are needed for your code to run, but you may sometimes want to pass additional options in to Hadoop. You can do this in several ways.

  • You can use the command-line option --hopts. This takes as an argument one string, which will be passed in to Hadoop more or less intact whenever it is called. (Actually, it will be split by spaces and the resulting list will be joined by spaces and then passed in.)
  • You can modify the planner object with the hopts method. This takes a list of strings, which will be joined by spaces and passed in to Hadoop.
  • You can modify a view with the method opts(hopts=foo). The argument foo is again a list of strings, and these arguments will be passed in to Hadoop for the invocations that are associated specifically with the generation of this view. The view-specific options will be placed after the planner-level options, so they will override planner-level options.

Some Hadoop versions require special placement for "definition" options, which are those that are of the form -D param=value. If you need to do use these options, you can use the hdefs instead of hopts in any of the ways listed above.

The file smallvoc-tfidf.py contains some examples of Hadoop options, in this case, used to turn on and off compression of temporary output.

Debugging Tips

If you're code's not working you can use any Python tools to debug it. You should not print any debug messages to standard output, but you can print them to standard error. This is true for "normal" as well as Hadoop runs.

If you want to step through the various stages of your view definition, it's quite easy to do, by storing the intermediate results and inspecting them. To make this a little more convenient, there's another command-line option, --cat, which stores a view and then prints it. The cat command only works in local (non-Hadoop mode). So, to step through the wordcount program view-by-view you could type:

% python longer-wordcount.py --cat lines | head
% python longer-wordcount.py --cat words | head
% python longer-wordcount.py --cat wordCount | head

If you'd like to speed up the later steps by re-using the results of previous steps, you can do that with

% python longer-wordcount.py --cat lines  | head
% python longer-wordcount.py --cat words --reuse lines | head
% python longer-wordcount.py --cat wordCount --reuse lines words | head

This will re-use previously-computed views, if they are present in the viewdir directory and marked as complete. You can also use any filename with write basename instead of the view name as the argument to --reuse, so it usually works to say something like:

% python longer-wordcount.py --cat wordCount --reuse gpig_views/*.gp | head

As noted above, if you want to get even further into the details of what's happening, you can generate the plan and start running lines from it (or parts of them) one-by-one.

Debugging Hadoop

When you run on Hadoop, the logs contain a web page with job status (the Tracking URL). If your job fails, the Python error stack is not very interesting - it does say which Hadoop command failed, but not where in the Python program it failed. To see that, you need to go to the Tracking URL and navigate through to the error logs for a failing job.

hadoop -input ... -output ...  -mapper ...
...
14/08/07 03:49:24 INFO streaming.StreamJob: Running job: job_201407141501_0266
14/08/07 03:49:24 INFO streaming.StreamJob: To kill this job, run:
14/08/07 03:49:24 INFO streaming.StreamJob: /usr/local/hadoop-1.0.1/libexec/../bin/hadoop job  -Dmapred.job.tracker=ghc81.ghc.andrew.cmu.edu:9001 -kill job_201407141501_0266
14/08/07 03:49:24 INFO streaming.StreamJob: Tracking URL: http://ghc81.ghc.andrew.cmu.edu:50030/jobdetails.jsp?jobid=job_201407141501_0266
14/08/07 03:49:25 INFO streaming.StreamJob:  map 0%  reduce 0%
14/08/07 03:50:15 INFO streaming.StreamJob:  map 100%  reduce 100%
14/08/07 03:50:15 INFO streaming.StreamJob: To kill this job, run:
14/08/07 03:50:15 INFO streaming.StreamJob: /usr/local/hadoop-1.0.1/libexec/../bin/hadoop job  -Dmapred.job.tracker=ghc81.ghc.andrew.cmu.edu:9001 -kill job_201407141501_0266
14/08/07 03:50:15 INFO streaming.StreamJob: Tracking URL: http://ghc81.ghc.andrew.cmu.edu:50030/jobdetails.jsp?jobid=job_201407141501_0266
14/08/07 03:50:15 ERROR streaming.StreamJob: Job not successful. ...
 ...
Traceback (most recent call last):
  File "gpig/extra.py", line 115, in <module>
    wc.main(sys.argv)
  File "/afs/andrew.cmu.edu/usr3/pompraka/gpig/guineapig.py", line 1106, in main
    self.runMain(argv)
  File "/afs/andrew.cmu.edu/usr3/pompraka/gpig/guineapig.py", line 1139, in runMain
    plan.execute(self, echo=self.opts['echo'])
  File "/afs/andrew.cmu.edu/usr3/pompraka/gpig/guineapig.py", line 776, in execute
    subprocess.check_call(shellcom,shell=True)
  File "/usr/local/lib/python2.7/subprocess.py", line 511, in check_call
    raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command 'hadoop jar ....' returned non-zero exit status 1

On the Gates cluster, one way to open the Tracking URL is to ssh -X in to the cluster, and open a browser. (You can't open those URLs with an external browser for security reasons).

If you think you've fixed the problem, there's also a shortcut way of testing it, without running the whole workflow. You can also cut-and-paste the specific Hadoop command that failed (after subprocess.CalledProcessError: , manually clear the -output HDFS location, and re-run the failing command. If it succeeds and gives the right output, you can go on to the next bug.

A More Complicated Example With Joins

Here's a more complex problem: finding words that are much more frequent in one corpus than another. We have an ad hoc scoring function, and we will run two copies of the wordcount pipeline and compare the results with a Join.

from guineapig import *
import sys
import math

def tokens(line):
    for tok in line.split(): yield tok.lower()

def score(n1,n2):
    return math.log((n1 + 1.0)/(n1 + n2 + 2.0))

#always subclass Planner                                                                                                                    
class WordCmp(Planner):

    def wcPipe(fileName):
        return ReadLines(fileName) | Flatten(by=tokens) | Group(by=lambda x:x, reducingTo=ReduceToCount())

    wc1 = wcPipe('bluecorpus.txt')
    wc2 = wcPipe('redcorpus.txt')

    cmp = Join( Jin(wc1, by=lambda(word,n):word), Jin(wc2, by=lambda(word,n):word) ) \
          | ReplaceEach(by=lambda((word1,n1),(word2,n2)):(word1, score(n1,n2)))

    result = Format(cmp, by=lambda(word,blueScore):'%6.4f %s' % (blueScore,word))

# always end like this                                                                                                                      
if __name__ == "__main__":
    WordCmp().main(sys.argv)

There are several new things here, so let's discuss them one-by-one.

First, since the wordcount pipeline is just part of a data structure, we can write a "macro" by simply using a Python function which returns the data structure. The wcPipe uses this trick to save some typing in building two copies of the original wordcount pipeline.

Second, we're using unpacking to access rows which are tuples. This is a common pattern in Python, often used, e.g., to return multiple values from a function, as in

def divide(x, y):
    quotient = x/y
    remainder = x % y
    return (quotient, remainder)  

(q, r) = divide(22, 7)

In particular, the output of the wordcount pipelines are tuples: (word,count). A lambda function for these rows has the form shown: lambda (word,n):word returns the first element of the tuple. Alternatively, I could have written lambda x:x[0]. An important point: a lambda function does NOT have parenthesis around it's arguments, so lambda (a,b):... is quite different from lambda a,b:.... The former takes ONE argument, which must be a tuple or list with exactly two elements, and binds a to the first and b to the second. The second takes TWO arguments. If you used a defined function, you use any of these to get the same effect:

def getWord((word,n)): return word
def getWord(row): return row[0]

Third, we're using a Join view. The input of a join is defined by a substructure Jin which specifies a view and a "by" function. Joining k views will produce a k-tuple, with one component for each row of the input views, so we follow the join with a ReplaceEach that cleans up the structure and computes the final score. Again we're using argument unpacking to access the newly-created element, which is now a tuple-of-tuples of the form ((word1,n1),(word2,n2)) where, of course, word1 and word2 are the same. Python's argument unpacking handles this nicely. (Note that argument unpacking is not quite unification, if you know what that is: e.g., you could not use ((word,n),(word,n)) here.)

Fourth, Format is almost exactly like a ReplaceEach: the only difference is that when the view is stored, instead of printing the repr of each row, the str for each row is printed. This is used for the final output of a pipeline.

A More Complex Example Using Group

There are two important variants of the Group view. One which we discussed above is the "reducingTo" argument. The value of this should be an instance of a class called ReduceTo, which will will be applied pairwise to all the elements of the rows in the group. Specifically, each Reduce instance has two parts: a baseType, and a "by" function. The baseType is usually a standard Python class like int, float, or list. The "by" takes two arguments, an accumulator and the next value from the list, and returns a new accumulator value. The initial accumulator value is provided by baseType.

When called on a list a1....an, what happens is the following.

  1. The initial value of the accumulator is set as accum0 = baseType().
  2. The "by" function f is called to produce the next accumulator: accum1 = by(accum0,a1)
  3. The "by" function f is called to produce the next accumulator: accum2 = by(accum0,a2)
  4. ... and so on.

You can create a custom reduce-to instance, by providing these arguments. For instance, to aggregate the word counts in wc by prefix, you could use:

   pc1 = Group(wc, by=lambda (word,count):word[:k], reducingTo=ReduceTo(int, lambda accum,(word,count): accum+count))

Here the "by" function for the ReduceTo takes a stream of (word,count) pairs, produced by grouping on the first k letters of the word, and adds up the counts.

A slightly more efficient alternative is to use the retaining option to Group. This is a function that's called before the rows in a group are sent to the ReduceTo class. It can be used to simplify the rows, so that the ReduceTo's "by" function is shorter. For instance, here we can just keep the counts, and discard the word part of each pair in a Group:

   pc2 = Group(wc, by=lambda (word,count):word[:k], retaining=lambda (word,count):count, reducingTo=ReduceTo(int, lambda accum,val:accum+val))

ReduceTo(int, lambda accum,val:accum+val) is actually included in GuineaPig as ReduceToSum(), so in the code below, the three implementations are equivalent.

from guineapig import *
import sys

def tokens(line):
    for tok in line.split():
        yield tok.lower()
k = 2

class PrefCount(Planner):
    wc = ReadLines('corpus.txt') | Flatten(by=tokens) | Group(by=lambda x:x, reducingTo=ReduceToCount())
    pc1 = Group(wc, by=lambda (word,count):word[:k], reducingTo=ReduceTo(int, lambda accum,(word,count): accum+count))
    pc2 = Group(wc, by=lambda (word,count):word[:k], retaining=lambda (word,count):count, reducingTo=ReduceTo(int, lambda accum,val:accum+val))
    pc3 = Group(wc, by=lambda (word,count):word[:k], retaining=lambda (word,count):count, reducingTo=ReduceToSum())

if __name__ == "__main__":
    PrefCount().main(sys.argv)

Using Combiners for Group

The Group view also supports <a href="https://wiki.apache.org/hadoop/HadoopMapReduce">Hadoop combiners</a>, via an additional combiningTo keyword. For example, another implementation of wordcount could be

class WordCount(Planner):

    wc = ReadLines('corpus.txt') | Flatten(by=tokens) \
        | Group(retaining=lambda x:1, reducingTo=ReduceToSum(), combiningTo=ReduceToSum())

Combiners are basically used in the same way as reducers, but in two phases: first the retained elements in a group are combined, then the combined values are reduced. When you're using Hadoop as a target, combiners are run on the local worker machine, and the combined data is then sent off to a potentially different worker for reducing. So using combiners can lower network traffic and make the program more efficient. Combiners are not used in local mode.

Parameters and Programmatically Accessing Guinea Pig

In Pig, parameters can be passed in from the command line, and used in Pig programs, where they are marked as parameters with a dollar sign prefix. In Guinea Pig, you can pass in a set of parameters with the option --params key1:val1,kay2:val2,.... To access them, you use the special function GPig.getArgvParams(), which returns a Python dict mapping strings to strings. For instance, this program

from guineapig import *
import sys
import os

def tokens(line): 
    for tok in line.split(): 
        yield tok.lower()

class WordCount(Planner):

    D = GPig.getArgvParams()
    wc = ReadLines(D['corpus']) | Flatten(by=tokens) | Group(reducingTo=ReduceToCount())

if __name__ == "__main__":
    WordCount().main(sys.argv)

could be invoked with

python param-wordcount.py --params corpus:bluecorpus.txt --store wc 

Of course, sometimes you'll want a little more control over a Guinea Pig script. In Pig, there are APIs for Java and Python which let you can create one (or more) Pig script objects, which can be parameterized and run. This is hard to do with the mechanisms I've described so far: a Guinea Pig script isn't actually an object, but a specially-configured subclass of Planner.

However, you can also create a Guinea Pig script by configuring an instance of a Planner object. The process is the same, conceptually, but a little less concise. Here's an instance version of the WordCount program---note you have to complete the instance by calling setup() after all views have been added.

from guineapig import *
import sys

def wordCountScript():
    def tokens(line): 
        for tok in line.split(): 
            yield tok.lower()
    p = Planner()
    p.lines = ReadLines('corpus.txt')
    p.words = Flatten(p.lines,by=tokens)
    p.wordCount = Group(p.words, by=lambda x:x, reducingTo=ReduceToCount())
    p.setup() #always end with this
    return p

# always end like this
if __name__ == "__main__":
    wordCountScript().main(sys.argv)

I also moved the definition of tokens to a less globally-accessible place. Planner instances are treated the same as planner subclasses with these exceptions:

  • You need to call setup explicitly after configuring an Planner instance (i.e., after defining all the views.)
  • Planner instances have local parameters, which are accessible via the instance variable param (also a Python dict). The local params can be set in two ways:
    • Via the --params command line argument.
    • In the Planner() constructor, by adding keyword arguments. In case of conflict, the constructor's values take precedence.

To illustrate this a little more, here's an even more complex wordcount. This one supports the command-line usage

% python multi-wordcount.py pref corpus1 corpus2 ...  

and it will build a wordcount file for all the words in each corpus which start with the prefix pref, saving them in files named wc-for-corpus1, ..., etc.

First, we'll write some code to generate an appropriate planner instance. This one takes two arguments -- the corpus name, and a prefix -- and returns a configured Planner() object.

def wc(corpus=None,prefix=None):
    """Return a planner instance that can word-count a corpus."""
    p = Planner(corpus=corpus,prefix=prefix)
    #self.param is automatically constructed
    p.r = ReadLines(p.param['corpus'])  
    def tokens(line): 
        for tok in line.split(): 
            yield tok.lower()
    p.s = Flatten(p.r, by=tokens) 
    if p.param['prefix']: 
        p.f = Filter(p.s, by=lambda w:w.startswith(p.param['prefix']))
    else: 
        p.f = p.s
    p.t = Group(p.f, reducingTo=ReduceToCount())
    p.setup()  #must call AFTER you finish adding views
    return p

This should be pretty familiar: notice the the configuration can be conditional on the paramers (we're using that here to not introduce a Filter for a null prefix). Also note we close by calling setup() on the new planner.

Now let's show you the main. When a step within a plan is invoked, you need to dispatch to the right main procedure, so the first step is see if this is the case. If not, then you invoke a main which parses out the command-line arguments pref corpus1 corpus2 .... This code will iterate over the corpus arguments, positions 2:N in the sys.argv list, and create a planner instance using the function wc() above:

if __name__ == "__main__":
    #run subplan step, if necessary
    if Planner.partOfPlan(sys.argv):
        wc().main(sys.argv)  
    else:
        #run any main you want - this one's usage is python mult-wordcount.py prefix corpus1 corpus2 ....
        prefix = sys.argv[1]
        for a in sys.argv[2:]:
            #create the planner instance
            planner = wc(corpus=a,prefix=prefix)
            #decide where to store the wordcount data
            v = planner.getView('t')
            targetFile = 'wc-for-'+os.path.basename(a)
            print 'storing this view in',targetFile
            v.pprint()                         #print the view
            v.storagePlan().execute(planner)   #store it
            #move the stored file to its final location
            os.rename(v.storedFile(), targetFile)

The code gives several examples of things you can do with a view: grab a particular view, print it, and store it by executing the storage plan for it.

Views that Include Custom Data Structures

The examples so far have used built-in Python types as rows. It's fine to use your own types, but if you do, GuineaPig needs to know how to save them to disk (serialize them) and deserialize them. By default, it will serialize objects with Python's repr function, and deserialize them by running a broken version of eval - in fact a version that has been very deliberately broken, for security reasons, to not call any Python functions at all. If you want to use your own data structures, you need to provide a less broken eval routine to deserialize the rows. Specifically, you should do the following (in version 1.3 and up).

if __name__ == "__main__":
    wc = WordCount().setEvaluator(GPig.SafeEvaluator({'WordStat':WordStat}))
    wc.main(sys.argv)

where the argument to SafeEvaluator is an dictionary that maps the class names you'd like to evaluate to the appropriate, locally-defined classes. (More generally, you could pass any object tat implements an eval method to setEvaluator, but you should do it this way.)

Here's a complete example that uses the Python collections.namedtuple package to define a custom data structure, which is used to find the most common words in a corpus.

from guineapig import *
import sys
import math
import collections

def tokens(line):
    for tok in line.split(): yield tok.lower()

WordStat = collections.namedtuple('WordStat','word count')

class WordCount(Planner):

    wcTuple = ReadLines('corpus.txt') | Flatten(by=tokens) | Group(by=lambda x:x, reducingTo=ReduceToCount())
    wcNamedTuple = ReplaceEach(wcTuple, by=lambda (w,n):WordStat(word=w,count=n)).opts(stored=True)
    commonWords = Filter(wcNamedTuple, by=lambda ws:ws.count>=200) | Group(by=WordStat.count.fget)

# always end like this                                                                                                                                           
if __name__ == "__main__":
    wc = WordCount().setEvaluator(GPig.SafeEvaluator({'WordStat':WordStat}))
    wc.main(sys.argv)

Side Views in GuineaPig

GuineaPig's view definitions are composed incrementally in a way that is efficient for large views. However, if you want to use a small view, one that can be easily loaded into memory, the view definitions are sometimes a clumsy way of doing this. Sometimes these sorts of additional view inputs to a pipeline are called side views.

GuineaPig contains a view class called Augment that lets you

  1. construct a series of side views, using the planner
  2. bundle them together into whatever data structure you like - let's call this the side information
  3. add this side information to every row of an existing view - specifically by constructing a new pair (r,s) where r is the old row and s is the side information.

Usually you follow this with some other operation, say a ReplaceEach, which will actually use the side information to do something.

Note that the side views are (usually) loaded only once (or if using Hadoop, once per mapper), when the view data structure is created, and only one copy of the side information is created - i.e., generally s will be a pointer to a data structure that is shared across the different rows. Modifying s will have unpredictable results, so don't do it.

Here's a simple example.

from guineapig import *
import sys
import math
import logging

def tokens(line):
    for tok in line.split(): yield tok.lower()

class WordProb(Planner):

    wc = ReadLines('corpus.txt') | Flatten(by=tokens) | Group(by=lambda x:x, reducingTo=ReduceToCount())

    total = Group(wc, by=lambda x:'ANY',reducingTo=ReduceTo(float,lambda accum,(word,count):accum+count)) | ReplaceEach(by=lambda (dummy,n):n)

    wcWithTotal = Augment(wc, sideview=total, loadedBy=lambda v:GPig.onlyRowOf(v))

    prob = ReplaceEach(wcWithTotal, by=lambda ((word,count),n): (word,count,n,float(count)/n))

# always end like this                                                                                                                               
if __name__ == "__main__":
    WordProb().main(sys.argv)

The total view simply adds up all the wordcount tuples using a Group. Before wcWithTotals is stored, the system will first make sure that total is stored, and copy that stored view into the working directory. Then it will call the function associated with the wcWithTotals's "loadedBy" argument. (If you run this on Hadoop, it will upload the preloaded view's stored version appropriately to the worker nodes).

In general the usage allows a list of preloaded views: just replace the argument sideview with sideviews. The "loadedBy" function is called the appropriate number of views as arguments. If you want to read in the contents of the views, you're encouraged to use the utility functions GPig.rowsOf(view) or GPig.onlyRowOf(view). If you want to access the file directly, you should open the file view.distributableFile().

How it works: The way side views are implemented is worth discussing, because it's not invisible to you. A plan that includes an Augment view will include a step that copies the side view's contents into your working directory (i.e., the directory that pwd prints out). In Hadoop, this will involve a hadoop fs -getmerge... operation. In local mode, this will just involve a copy. In Hadoop, the side view will then be distributed to worker processes via a -file option.

Review and Discussion of Guinea Pig vs Pig

GuineaPig is like Pig, but embedded in Python, not a new language. So if you know Python, there's less to learn. If not, at least you're learning something elegant and stable.

GuineaPig has two backends: shell commands, and Hadoop streaming. Pig only supports Hadoop. This means that you can test programs more quickly in GuineaPig- there's not the overhead of starting up Hadoop jobs.

In Pig (and relational algebra) operators like Join, etc take fields as arguments. Here, there's nothing exactly like a 'field' in relational algebra. Instead of a fields, we just use a function - usually a lambda function - that takes a row object and returns some component of the row, and any object can serve as a "row".

GuineaPig does not have any special-purpose mechanism for macros, as Pig does - but it's easy to get that effect using Python functions which return views.

Using Guinea Pig on EC2

Setting up EC2

I'm old-school, so I like the command line interface (CLI), and will use EC2-Classic. Follow these instructions to set up the CLI tools on your machine. Currently this includes

  • downloading the right CLI code package;
  • having a local installation of Java;
  • setting the environment variable EC2_HOME appropriately;
  • putting $EC2_HOME/bin on your path;
  • creating a AWS access key, which has a public and private part, by
    • allocating an access key, if you haven't done that yet, and
    • defining the environment variables AWS_ACCESS_KEY and AWS_SECRET_KEY

I define the secret key by sourcing a bash file that contains a line export AWS_SECRET_KEY=...., and of course, keep that file readable only to me, by setting the permissions with chmod 600 .... Otherwise, it's not secret.

If you've done all that right, then the command

% ec2-describe-regions

should work properly. Next you need to prepare to we will follow the instructions to launch an instance by:

  • using the ec2-create-keypair command to set up a keypair, which is stored in another secret (protected) file - mine is named gpig-key-pair.pem
  • creating a security group, e.g., with ec2-create-group gpig-security-group -d "Gpig security group"

Now, authorize access from the machine you're working on:

ec2-authorize gpig-security-group -p 22 -s 128.2.211.59/32

where 128.2.211.59 is replaced by your IP address.

Reference Guide

Basic Concepts and Command-Line Usage

A guinea pig view - below abbreviated rel, r, s, t, - can be "stored" (materialized) to produce an unordered bag of "rows". A "row" is just an arbitrary Python data structure (with a few constraints on how it can be serialized, see below). Views can be defined by classes like 'ReadLines', 'ReplaceEach', 'Flatten', 'Group', 'Join', etc.

Aside from debugging aids, about only thing that Guinea Pig does is to allow you to 'store' a view, which means to materialize it on disk.

The Planner subclass you write should be defined in a separate script file, which should end with code that creates a single instance and calls its main method with sys.argv, like this.

if __name__ == "__main__":
   MyPlanner().main(sys.argv)

An instance of a Planner class knows, via inheritance, how to create a "storage plan" for any view. For instance, if the script above is defined in a file ugp.py, you would use this command to produce a storage plan for the view foo.

   python ugp.py --plan foo

The storage plan is a shell script, so to execute it you'd use one of these:

   python ugp.py --plan foo | sh
   python ugp.py --store foo

When the view foo is saved, it is saved in a file named foo.gp. So to see some of the output you'd use

   python ugp.py --plan foo | sh ; head foo.gp

Or as a shortcut you can say

   python ugp.py --cat foo | head

Views are saved on disk, which leads to some constraints on what they can be. A row must be something that can be stored and retrieved by the RowSerializer, which uses 'repr' and 'str', and it cannot contain newlines in its serialized form.

You can specify certain options to the system using the command-line option --opts, whose value is a comma-separated list of colon-separated option,value pairs. If you invoke the planner main without any command-line arguments, it will list these options for you. The current options are:

  1. echo [0]: if non-zero, echo shell commands.
  2. viewdir [gpig_views]: directory where stored views will be placed.
  3. parallel[5]: number of parallel tasks in Hadoop.
  4. streamJar[environment var $GP_STREAMJAR or /usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.2.0.1.3.0.0-107.jar]: location of Hadoop streaming Jar file.
  5. target[shell]: the other valid option is hadoop

You can specify user parameters using the command-line option --params which has values in the same format. You can access these using the function GPig.getArgvParams(), which returns a Python dict mapping strings to strings.

Known Bugs and Gotchas

  • Storing null python values (i.e., "None") or rows containing them is unpredictable, in particular when they are used as keys for joining or grouping.

View Types

In almost all of the views below, the first argument is a view, which is omitted when on the right-hand-side of a pipe operator.

Augment

Augment: loads one or more views from disk, and produces an object that encapsulates the information in those views. This object is then paired with the rows of the primary view.

a = Augment(x, sideviews=[y,z], loadedBy=lambda v1,v2:...)
a = Augment(x, sideview=y, loadedBy=lambda v:...)

The loadedBy argument is passed the sideview objects, and is called before the view is executed. It returns some Python object s. The result of the view is a stream of pairs (r,s) where r is from the view that is the first argument of Augment (x in the example).

Distinct

Distinct: removes duplicates from a view.

d = Distinct(x)

Filter

Filter...by: like ReplaceEach, but extracts a subset of the rows of a relation, instead of transforming them. The following example retains only the rows r where the first element is a string that ends with ".txt".

filtered = Filter(t, by=lambda(row):row[0].endswith(".txt") )

Flatten aka FlatMap

Flatten...by: Like ReplaceEach, but allows you to use a Python function that yields multiple values. This is pretty much the same as a map-reduce Mapper.

def idwordGen(row):
   for w in row['words']: yield (row['id'],w)
x = Flatten(y, by=idwordGen(row))

FlatMap is a synonym for Flatten.

Format

Format...by: a special form of ReplaceEach where the 'by' function produces a string, and this string is NOT serialized normally when stored, but just printed. You can use this to get the final output.

f = Format(z, by=lambda(a,b):'a is %s, b is %s' % (a,b))

Group

Group...by: produces a set of tuples (key, [row1,...,rowN]) where the first element is the 'key' extracted using the 'by' function, and [row1,...,rowN] is a list of rows from t that share this key.

u = Group(t, by=lambda row:row[1], ... )

Group...by...retaining: produces a set of tuples (key, [row1',...,rowN']) where the first element is the 'key' extracted using the 'by' function, and [row1',...,rowN'] is a list of rows from t that share this key, after being transformed by the retaining function.

u = Group(t, by=lambda row:row[1], retaining=lambda row:row[2]  )

Group...by..reducingTo: reducingTo's value is is a GuineaPig 'ReduceTo' object. A ReduceTo is created with two arguments:

  1. a basetype that provides the first value of the accumulator, when called withiut arguments (e.g., the basetype might be int, as int() returns a 0.
  2. a "by" function which takes two arguments, an accumulator and the next value from the list, and produces the next value of the accumulator.

There are also some standard cases: ReduceToSum(), ReduceToCount(), and ReduceToList().

The output is the result of reducing the list for rows using that object - so here the result would be words plus document-frequencies, rather than words plus inverted indices. This is much like a map-reduce reducer.

u = Group(t, by=lambda row:row[1], reducingTo=ReduceToCount())

Group...by..retaining..reducingTo: these options can all be combined, in which case the "retaining" function is applied before the "reducingTo" process.

Join

Join: joins are multiway, and the argument is a list of "join input" objects, denoted Jin. A join input is a view v plus a field-extracting function f. The result of a join is a tuple of the form (row1,...,rowk) where rowi is a row in the i-th input of the join.

j = Join(Jin(view1,by=getId), Jin(view2,by=getId))

For two-way joins only, you can specify outer joins (left outer, right outer, and full outer) by using the "outer" parameter of the Jin constructor. The tuple for a left outer join for an unmatched row row1 will have the form (row1,None).

j = Join(Jin(view1,by=getId), Jin(view2,by=getId,outer=True))  #right outer join
j = Join(Jin(view1,by=getId,outer=True), Jin(view2,by=getId))  #left outer join
j = Join(Jin(view1,by=getId,outer=True), Jin(view2,by=getId,outer=True))  #full outer join

JoinTo...by: like Join, but designed to be used in a pipeline. It has a single join-input as its first argument, and uses a "by" argument to specify the field of the primary (pipeline) input to be joined in.

j =  Filter(t, by=lambda(row):getId(row)>0 ) | JoinTo( Jin(view2,by=getId), by=getId)

ReadCSV

ReadCSV: The rows of this view are just lists, each one of which is produced by parsing a line from the file as a comma-separated value string. Any keyword arguments will be passed in to the initializer of a csv.reader object that's used to convert the file.

r = ReadCSV('mytest.csv',delimiter='|')

ReadLines

ReadLines: The rows of this view are just strings from the file, in the first case, and python objects produced by running 'eval' on the strings in the second.

r = ReadLines('mytest.txt')

ReplaceEach aka Map

ReplaceEach...by: Replaces each row with the result of applying some function to that row (i.e., similar to a map.) In this example parseLine is a function, and each row of this view is produced by calling parseLine(x) for some row x of view r.

parsed = ReplaceEach(r, by=parseLine)

Map is a synonym for ReplaceEach.

Union

Union: Combines the rows in one or more views, removing any duplicates. Since duplicates are removed, Union(x) has the same effect as Distinct(x).

all = Union(v, w, x)

UnionTo is a variant of Union that can be used on the right hand side of a pipe.

all = v | UnionTo(w, x)

View Options

You can attach options to a view like this

 parsed = ReplaceEach(r, by=parseLine).opts(stored=True)

Ie, the method "v.opts()" returns a copy of v with some options set. There are only a few options.

Eager storing: Eager storing is turned on with the option stored=True.. Storing works like this: if a view v is marked as 'stored=True', then the first time a plan needs it, it will be stored explicitly. If the same plan needs v again, the values will be read from the stored file. By default a view is eager-stored if it is used more than once. (Also, a view may be marked for eager storage to suppress certain classes of optimizations, as well: in particular, the inner view of an Augment view will be marked for eager storage, and also views that are part of a chain of map-reduce operations, like consecutive joins.) You can discourage eager storing for a view with opts(stored=False) as well as force it with opts(stored=True).

External storing: Setting the option storedAt=path/to/file will turn on eager storing, and also specify that the file will be stored at the given location. This is usually used to export output from GuineaPig.

Parallelism: Setting parallel=K will modify the number of reducers used to create the view, in Hadoop mode.

Stored views aren't deleted after the plan completes, so you can also inspect them for debugging purposes if you like. They are also NOT reused from planning episode to planning episode, unless you ask them to be with the --reuse option, below.

Hadoop options: Again in Hadoop mode, setting the options hopts=foo or hdefs=foo will pass options in to the Hadoop commands used to build this view.

Reusing previously computed views

Re-use: If you add the option --reuse to a plan, like this

       python ugp.py --plan foo --reuse v1.gp v2.gp

then instead of building a new plan for storing v1 and v2, GuineaPig will reuse any existing files (which should be previously stored views). So, in local mode, if you always do

       python ugp.py --plan foo --reuse gpig_views/*.gp

then GuineaPig will have a sort of incremental, make-like behavior: it will reuse everything it can.

Aside: if you try to reuse a view that doesn't exist, you get an error only when the plan executes, and can't find a file or HDFS input that it expects. GuineaPig will not check that the views you plan to --reuse actually exist (because on Hadoop, doing those checks are somewhat expensive.)

Aside: you can't use globbing (i.e., "*.gp") to get a list of completed views if you're working with Hadoop, but you can achieve a similar effect with a little more work, for instance, like this:

       python ugp.py --plan foo  ... --reuse `hadoop fs -ls /user/wcohen/gpig_views | cut -d/ -f5"`

But be careful, gpig_views/*.gp will always give a list of the locally-stored views, even if you're using target:hadoop.

Under the Hood

Guinea pig tries to optimize its plan by stitching together view definitions when possible. Generally, operations like Flatten and ReplaceEach, which just transform a sequence of rows, will be combined together, but operations which require a sorting step like Join and Group will not.

When you use Hadoop - and even if you don't! - the context in which your "by" functions work is pretty limited. In particular, they are executed by many different unknown processes, in general. So if you try anything fancy in terms of communication (e.g., using a ReplaceEach view to collect information in a process-local variable X and then using X in some subsequent view) it probably won't work.

In terms of implementation, every view has an associated checkpoint, which is the latest file that is need to create that view, and a rowGenerator, which is an iterator that produces the rows, one by one, assuming that's it run in a context where the checkpoint is available on stdin.

The checkpoint plan for a view produces the checkpoint. The storage plan stores the view.

Abstract view types are are Reader views, Transformation views (like ReplaceEach), or MapReduce views. The checkpoint for a reader is the file it reads. The checkpoint for a Transformation is the checkpoint for the view its transforming. The checkpoint for a MapReduce is the reducer input.