Guinea Pig

From Cohen Courses
Jump to navigationJump to search

Quick Start

TODO:

  • tutorial.tgz download and links

What you should know before you start

Guinea Pig is similar to Pig, but intended to be easier to learn and debug. It's probably a good idea to be somewhat familiar with Pig before you start this tutorial. I also assume a reading knowledge of Python.

Running wordcount.py

Set up a directory that contains the file guineapig.py, a sample document corpus.txt, and a second script called wordcount.py which contains this code:

# always start like this
from guineapig import *
import sys

# supporting routines can go here
def tokens(line):
    for tok in line.split():
        yield tok.lower()

#always subclass Planner
class WordCount(Planner):

    wc = ReadLines('corpus.txt') | FlattenBy(by=tokens) | Group(by=lambda x:x, reducingWith=ReduceToCount())

# always end like this
if __name__ == "__main__":
    WordCount().main(sys.argv)

Then type the command:

% python tutorial/wordcount.py --store wc

After a couple of seconds it will return, and you can see the wordcounts with

% head wc.gp

Understanding the wordcount example

A longer example

In the example above, wc is a GuineaPig view. The tutorial directory also has a less concise but easier-to-explain wordcount file, longer-wordcount.py, with this view definition:

class WordCount(Planner):
    lines = ReadLines('corpus.txt')
    words = Flatten(lines,by=tokens)
    wordCount = Group(words, by=lambda x:x, reducingTo=ReduceToCount())

Functions instead of fields

The Group view is actually quite flexible: the "by" clause is a lambda that extracts an arbitrary value that defines the groups, and in the absence of a "reducingTo" clause, the result of grouping is a tuple (key,[row1,...,rowN]) where the rowi's are the rows that have the indicated "key" (as extracted by the "by" clause). The "reduceTo" argument is an optimization, which you can define with an instance of the ReduceTo class. For instance, instead of using the ReduceToCount() subclass, you could have used

ReduceTo(int,by=lambda accum,val:accum+1)

where the first argument is the type of the output (and defines the initial value of the accumulator, which here will be int(), or zero) and the second is the function that is used to reduce values pairwise.

Note the use of functions as parameters in Group and Flatten. Guinea Pig has no notion of records: rows can be any python object (although they are often tuples). So if you want to do the equivalent of extracting a field from a record, you should use a function. Python's lambda functions and argument unpacking features make this fairly convenient, as the examples below show.

Using the pipeline

To use this pipeline: if you type

% python longer-wordcount.py

you'll get a brief usage message:

usage: --[store|pprint|plan|cat] view [--echo] [--target hadoop] [--reuse foo.gp bar.gp ...] [other options]
       --list

Typing

% python longer-wordcount.py --list

will list the views that are defined in the file: lines, words, and wordCount If you pprint one of these, say wordCount, you can see what it essentially is: a Python data structure, with several named subparts (like words), which pprint will present as a tree. (The vertical bars in this printout are just to indicate the indent level, by the way, so don't confuse them with the "pipe" notation used for chaining together views.)

wordCount = Group(words,by=<function <lambda> at 0x10497aa28>,reducingTo=<guineapig.ReduceToCount object at 0x104979190>)
| words = Flatten(lines, by=<function tokens at 0x1048965f0>).opts(stored=True)
| | lines = ReadLines("corpus.txt")

The "pipe" notation is just a shortcut for nested views: words = ReadLines('corpus.txt') | Flatten(by=tokens) is equivalent to words = Flatten(ReadLines('corpus.txt') , by=tokens) or to the two separate definitions for lines and words above. The named variables (like words) can be used, via devious pythonic tricks, to access the data structures from the command line. Hence the --store command above.

To store a view, GuineaPig will first convert a view structure into a plan for storing the view. To see a plan, you can type something like

% python longer-wordcount.py --plan wordCount

If you typed

% python longer-wordcount.py --plan wordCount | sh

this would equivalent to python longer-wordcount.py --store wordCount, modulo some details about how errors are reported.

Notice how this works: the view definition (a data structure) is converted to a plan (a shell script), and the shell script is then executed, starting up some new processes while it executes. These new processes invoke additional copies of python longer-wordcount.py with special arguments, like

python longer-wordcount.py --view=words --do=doStoreRows < corpus.txt > words.gp

which tell Python perform smaller-scale operations associated with individual views, as steps in the overall plan. Here the word view is stored for later processing.

The motivation for doing all this is because this sort of process can also be distributed across a cluster using Hadoop streaming. If you're working on a machine that has Hadoop installed you can generate an alternative plan that uses Hadoop streaming:

% python longer-wordcount.py --plan wordCount --target hadoop

This produces a messier-looking plan that will store wordCount on HDFS using a series of Hadoop streaming jobs.

Debugging Tips

Another command is --cat which stores a vew and then prints it. So, to step through the program view-by-view you could type.

% python longer-wordcount.py --cat lines | head
% python longer-wordcount.py --cat words | head
% python longer-wordcount.py --cat wordCount | head

If you'd like to speed up the later steps by re-using the results of previous steps, you can do that with

% python longer-wordcount.py --cat lines  | head
% python longer-wordcount.py --cat words --reuse lines.gp | head
% python longer-wordcount.py --cat wordCount --reuse lines.gp words.gp | head

If you want to get even further into the details, you can generate the plan and start running lines from it (or parts of them) one-by-one.

A More Complicated Example

Here's a more complex problem: finding words that are much more frequent in one corpus than another. We have an ad hoc scoring function, and we will run two copies of the wordcount pipeline and compare the results with a Join. Each input of a join is defined by a substructure Jin which specifies a view and a key. Joining k views will produce a k-tuple, with one component for each row of the input views, so we follow the join with a ReplaceEach that cleans up the structure and computes the final score.

from guineapig import *
import sys
import math

def tokens(line):
    for tok in line.split(): yield tok.lower()

def score(n1,n2):
    return math.log((n1 + 1.0)/(n1 + n2 + 2.0))

#always subclass Planner                                                                                                                    
class WordCmp(Planner):

    def wcPipe(fileName):
        return ReadLines(fileName) | Flatten(by=tokens) | Group(by=lambda x:x, reducingTo=ReduceToCount())

    wc1 = wcPipe('bluecorpus.txt')
    wc2 = wcPipe('redcorpus.txt')

    cmp = Join( Jin(wc1, by=lambda(word,n):word), Jin(wc2, by=lambda(word,n):word) ) \
          | ReplaceEach(by=lambda((word1,n1),(word2,n2)):(word1, score(n1,n2)))

    result = Format(cmp, by=lambda(word,blueScore):'%6.4f %s' % (blueScore,word))

# always end like this                                                                                                                      
if __name__ == "__main__":
    WordCmp().main(sys.argv)

Note that since the wordcount pipeline is just part of a data structure, we can write a "macro" by simply using a Python function which returns the data structure. (Of course, we can only store components of the macro if they are bound to some variables in the top-level Planner subclass you're defining.)


Parameters and Programmatically Accessing Guine Pig

In Pig, parameters can be passed in from the command line, and used in Pig programs, where they are marked as parameters with a dollar sign prefix. In Guinea Pig, you can pass in a set of parameters with the option --params key1:val1,kay2:val2,.... To access them, you use the special function GPig.getArgvParams(), which returns a Python dictmapping strings to strings. For instance, this program

from guineapig import *
import sys
import os

def tokens(line): 
    for tok in line.split(): 
        yield tok.lower()

class WordCount(Planner):

    D = GPig.getArgvParams()
    wc = ReadLines(D['corpus']) | Flatten(by=tokens) | Group(reducingTo=ReduceToCount())

if __name__ == "__main__":
    WordCount().main(sys.argv)

could be invoked with

python param-wordcount.py --params corpus:bluecorpus.txt --store wc 

Of course, sometimes you'll want a little more control over a Guinea Pig script. In Pig, there are APIs for Java and Python which let you can create one (or more) Pig script objects, which can be parameterized and run. This is hard to do with the mechanisms I've described so far: a Guinea Pig script isn't actually an object, but a specially-configured subclass of Planner.

However, you can also create a Guinea Pig script by configuring an instance of a Planner object. The process is the same, conceptually, but a little less concise. Here's an instance version of the WordCount program---note you have to complete the instance by calling setup() after all views have been added.

from guineapig import *
import sys

def wordCountScript():
    def tokens(line): 
        for tok in line.split(): 
            yield tok.lower()
    p = Planner()
    p.lines = ReadLines('corpus.txt')
    p.words = Flatten(p.lines,by=tokens)
    p.wordCount = Group(p.words, by=lambda x:x, reducingTo=ReduceToCount())
    p.setup() #always end with this
    return p

# always end like this
if __name__ == "__main__":
    wordCountScript().main(sys.argv)

I also moved the definition of tokens to a less globally-accessible place. Planner instances are treated the same as planner subclasses with these exceptions:

  • You need to call

setup explicitly after configuration.

  • They have local parameters, which are accessible via the

instance variable param (also a Python dict). The local params can be set in two ways:

    • Via the --params command line argument.
    • In the Planner() constructor, by adding

keyword arguments.

To illustrate this, here's an even more complex wordcount. This one supports the command-line usage

% python multi-wordcount.py pref corpus1 corpus2 ...  

and it will build a wordcount file for all the words in each corpus which start with the prefix pref, saving them in files named wc-for-corpus1, ..., etc.

First, we'll write some code to generate an appropriate planner instance. This one takes two arguments -- the corpus name, and a prefix -- and returns a configured Planner() object.

def wc(corpus=None,prefix=None):
    """Return a planner instance that can word-count a corpus."""
    p = Planner(corpus=corpus,prefix=prefix)
    #self.param is automatically constructed
    p.r = ReadLines(p.param['corpus'])  
    def tokens(line): 
        for tok in line.split(): 
            yield tok.lower()
    p.s = Flatten(p.r, by=tokens) 
    if p.param['prefix']: 
        p.f = Filter(p.s, by=lambda w:w.startswith(p.param['prefix']))
    else: 
        p.f = p.s
    p.t = Group(p.f, reducingTo=ReduceToCount())
    p.setup()  #must call AFTER you finish adding views
    return p

This should be pretty familiar: notice the the configuration can be conditional on the paramers (we're using that here to not introduce a Filter for a null prefix). Also note we close by calling setup() on the new planner.

Now let's show you the main. When a step within a plan is invoked, you need to dispatch to the right main procedure, so the first step is see if this is the case. If not, then you invoke a main which parses out the command-line arguments pref corpus1 corpus2 .... This code will iterate over the corpus arguments, positions 2:N in the sys.argv list, and create a planner instance using the function wc()

if __name__ == "__main__":
    #run subplan step, if necessary
    if Planner.partOfPlan(sys.argv):
        wc().main(sys.argv)  
    else:
        #run any main you want - this one's usage is python mult-wordcount.py prefix corpus1 corpus2 ....
        prefix = sys.argv[1]
        for a in sys.argv[2:]:
            #create the planner instance
            planner = wc(corpus=a,prefix=prefix)
            #decide where to store the wordcount data
            v = planner.getView('t')
            targetFile = 'wc-for-'+os.path.basename(a)
            print 'storing this view in',targetFile
            v.pprint()                         #print the view
            v.storagePlan().execute(planner)   #store it
            #move the stored file to its final location
            os.rename(v.storedFile(), targetFile)

The code gives several examples of things you can do with a view: grab a particular view, print it, and store it by executing the storage plan for it.

Review, and Comments on Guinea Pig vs Pig

GuineaPig is like Pig, but embedded in Python, not a new language. So there's less to learn.

In Pig (and relational algebra) operators like Join, etc take fields as arguments. Here, there's nothing exactly like a 'field' in relational algebra. Instead of a fields, we just use a function - usually a lambda function - that takes a row and returns some component of the row.

GuineaPig has two backends, shell and Hadoop streaming. Pig only supports Hadoop streaming. This means that you can test programs more quickly - there's not the overhead of starting up Hadoop jobs.

GuineaPig does have any special-purpose mechanism for macros, as Pig does - but it's easy to get that effect using Python functions which return views.

GuineaPig doesn't have any special-purpose mechanism for command-line parameters. However, you can get a similar effect use shell variables and Python's os.environ feature.

User Guide

Basic Concepts

A guinea pig view - usually abbreviated rel, r, s, t, - can be "stored" (materialized) to produce an unordered bag of "rows". A "row" is just an arbitrary python data structure (with a few constraints on how it can be serialized, see below). Views can be defined by classes like 'ReadLines', 'ReplaceEach', 'Flatten', 'Group', 'Join', etc.

Aside from debugging aids, about only thing that guinea pig does is to allow you to 'store' a view, which means to materialize it on disk.

The Planner subclass you write should be defined in a separate script file, which should end with code that creates a single instance and calls its main method with sys.argv, like this.

if __name__ == "__main__":
   MyPlanner().main(sys.argv)

An instance of a Planner class knows, via inheritance, how to create a "storage plan" for any view. For instance, if the script above is defined in a file ugp.py, you would use this command to produce a storage plan for the view foo.

   python ugp.py --plan foo

The storage plan is a shell script, so to execute it you'd use

   python ugp.py --plan foo | sh

When the view foo is saved, it is saved in a file named foo.gp. So to see some of the output you'd use

   python ugp.py --plan foo | sh ; head foo.gp

Or as a shortcut you can say

   python ugp.py --cat foo | head

Views are saved on disk, which leads to some constraints on what they can be. A row must be something that can be stored and retrieved by the RowSerializer, which uses 'repr' and 'str', and it cannot contain newlines in its serialized form.

View Types

In almost all of the views below, the first argument is a view, which is omitted when on the right-hand-side of a pipe operator.

Distinct

Distinct: removes duplicates from a view.

d = Distinct(x)


Filter

Filter...by: like ReplaceEach, but extracts a subset of the rows of a relation, instead of transforming them.

filtered = Filter(t, by=lambda(row):row[0].endswith(".txt") )

Flatten

Flatten...by: Like ReplaceEach, but allows you oto use a python function that yields multiple values. This is pretty much the same as a map-reduce Mapper.

def idwordGen(row):
   for w in row['words']: yield (row['id'],w)
x = Flatten(y, by=idwordGen(row))

Format

Format...by: a special form of ReplaceEach where the 'by' function produces a string, and this string is NOT serialized normally when stored, but just printed. You can use this to get the final output.

f = Format(z, by=lambda(a,b):'a is %s, b is %s' % (a,b))

Group

Group...by: produces a set of tuples (key, [row1,...,rowN]) where the first element is the 'key' extracted using the 'by' function, and [row1,...,rowN] is a list of rows from t that share this key.

u = Group(t, by=lambda(row):row[1], ... )

Group...by..reducingTo: as above, but reducingWith is a GuineaPig 'reducer' object. The output is the result of reducing the list for rows using that object - so here the result would be words plus document-frequencies, rather than words plus inverted indices. This is much like a map-reduce reducer.

u = Group(t, by=lambda(row):row[1], reducingTo=ReduceToCount())

Join

Join: joins are multiway, and the argument is a list of "join input" objects, denoted Jin. A join input is a view v plus a field-extracting function f. The result of a join is a tuple of the form (row1,...,rowk) where rowi is a row in the i-th input of the join.

j = Join(Jin(view1,by=getId), Jin(view2,by=getId))

JoinTo...by: like Join, but designed to be used in a pipeline. It has a single join-input as its first argument, and uses a "by" argument to specify the field of the primary (pipeline) input to be joined in.

j =  Filter(t, by=lambda(row):getId(row)>0 ) | JoinTo( Jin(view2,by=getId), by=getId)

ReadLines

ReadLines: The rows of this view are just strings from the file, in the first case, and python objects produced by running 'eval' on the strings in the second.

r = ReadLines('mytest.txt')

ReplaceEach

ReplaceEach...by: Here parseLine is a function, and each rows of this view is produced by parseLine(x) for some row x of view r.

parsed = ReplaceEach(r, by=parseLine)

Wrap

Wrap: creates a view from any iterable python object. Mainly for debugging purposes....

w = Wrap( ['this is a test', 'here is another test'] )

Details

You can attach options to a view like this

   parsed = ReplaceEach(r, by=parseLine).opts(stored=True)

Ie, the method "v.opts()" returns a copy of v with some options set.

Eager storing: Right now the only option is stored=X.. Storing works like this: if a view v is marked as 'stored=True', then the first time a plan needs it, it will be stored explicitly. If the same plan needs v again, the values will be read from the stored file. By default a view is stored if it is used more than once (or if it's part of a certain type of chain of map-reduces, which needs caching to make planning tractable.) You can discourage eager storing for a view with opts(stored=False) as well as force it with opts(stored=True).

Stored views aren't deleted after the plan completes, so you can also inspect them for debugging purposes if you like. They are also NOT reused from planning episode to planning episode, unless you ask them to be with the --reuse option, below.

Re-use: If you add the option --reuse to a plan, like this

       python ugp.py --plan foo --reuse v1.gp v2.gp

then instead of building a new plan for storing v1 and v2, GuineaPig will reuse any existing files (which should be previously stored views). So, if you always do

       python ugp.py --plan foo --reuse *.gp

then GuineaPig will have a sort of incremental, make-like behavior: it will reuse everything it can.

Under the Hood

Plans are constructed collaboratively by View instances, and a the GuineaPig class instance. The steps of the plan typically involve invoking the script with arguments like --view=foo and --do=bar, eg like these:

   python ugp.py --rel=u --do=doGroupMap < mytest.txt | sort -k1 > u.gpri
   python ugp.py --rel=u --do=doStoreRows  < u.gpri > u.gp

Guinea pig tries to optimize its plan by stitching together view definitions when possible, but it may need to create 'checkpoint' files, like the 'u.gpri' file above (the extension is for "guinea pig reducer input").

This approach means that the context in which your field-extraction functions work is pretty limited---they are executed by many different unknown processes, in general. So if you try anything fancy in terms of communication (e.g., using a ReplaceEach view to collect information in a process-local variable X and then using X in some subsequent view) it probably won't work.

Way Under the Hood

Every view has an associated checkpoint, which is the last checkpoint file that is need to create that view, and rowGenerator, which is an iterator that produces the rows, one by one, assuming that's it run in a context where the checkpoint is available from stdin.

The checkpoint plan for a view produces the checkpoint. The storage plan stores the view. Guinea pig tries to minimize checkpoints.

Abstract view types are are Reader views, Transformation views (like ReplaceEach), or MapReduce views. The checkpoint for a reader is the file it reads. The checkpoint for a Transformation is the checkpoint for the view its transforming. The checkpoint for a MapReduce is the reducer input.

In planning, the planner keeps track of which views will be materialized to avoid regenerating them. So,

  • when returning a storage plan, it checks to see if the files has 'been created' by a previous subplan, and if not marks the file that will be created by the plan.
  • when you --reuse a view, the planner marks this as having been previously 'created', so even the first subplan will be replaced with a null plan.
  • at run time, when the rowGenerator is called, it checks if the cache file actually exists to determine if it should generate or reuse the stored data.