Guinea Pig

From Cohen Courses
Jump to navigationJump to search

Quick Start

to add:

  • example of macros
  • discuss lambda's versus fields
  • sort view definitions

--Wcohen (talk) 17:37, 9 May 2014 (EDT)

Running wordcount.py

Set up a directory that contains the file gp.py and a second script called wordcount.py which contains this code:

# always start like this
from gp import *
import sys

# supporting routines can go here
def tokens(line):
    for tok in line.split():
        yield tok.lower()

#always subclass Planner
class WordCount(Planner):

    wc = ReadLines('corpus.txt') | FlattenBy(by=tokens) | Group(by=lambda x:x, reducingWith=ReduceToCount())

# always end like this
if __name__ == "__main__":
    WordCount().main(sys.argv)

Then type the command:

% python tutorial/wordcount.py --store wc

After a couple of seconds it will return, and you can see the wordcounts with

% head wc.gp

Understanding the wordcount example

There's also a less concise but easier-to-explain wordcount file, longer-wordcount.py, with this view definition:

class WordCount(Planner):
    lines = ReadLines('corpus.txt')
    words = Flatten(lines,by=tokens)
    wordCount = Group(words, by=lambda x:x, reducingTo=ReduceToCount())

If you type

% python longer-wordcount.py

you'll get a brief usage message:

usage: --[store|pprint|plan|cat] view [--echo] [--target hadoop] [--reuse foo.gp bar.gp ...] [other options]
       --list

Typing

% python longer-wordcount.py --list

will list the views that are defined in the file: lines, words, and wordCount If you pprint one of these, say wordCount, you can see what it essentially is: a Python data structure, with several named subparts (like words)

wordCount = Group(words,by=<function <lambda> at 0x10497aa28>,reducingTo=<guineapig.ReduceToCount object at 0x104979190>)
| words = Flatten(lines, by=<function tokens at 0x1048965f0>).opts(cached=True)
| | lines = ReadLines("corpus.txt")

The "pipe" notation is just a shortcut for nested views: words = ReadLines('corpus.txt') | Flatten(by=tokens) is equivalent to words = Flatten(ReadLines('corpus.txt') , by=tokens) or to the two separate definitions for lines and words above. The named variables (like words) can be used, via devious pythonic tricks, to access the data structures from the command line. Hence the --store command above.

To store a view, GuineaPig will first convert a view structure into a plan for storing the view. To see a plan, you can type something like

% python longer-wordcount.py --plan wordCount

If you typed

% python longer-wordcount.py --plan wordCount | sh

this would equivalent to python longer-wordcount.py --store wordCount, modulo some details about how errors are reported.

Notice how this works: the view definition (a data structure) is converted to a plan (a shell script), and the shell script is then executed, starting up some new processes while it executes. These new processes invoke additional copies of python longer-wordcount.py with special arguments, like

python longer-wordcount.py --view=words --do=doStoreRows < corpus.txt > words.gp

which tell Python perform smaller-scale operations associated with individual views, as steps in the overall plan. Here the word view is stored for later processing.

The motivation for doing all this is because this sort of process can also be distributed across a cluster using Hadoop streaming. If you're working on a machine that has Hadoop installed you can generate an alternative plan that uses Hadoop streaming:

% python longer-wordcount.py --plan wordCount --target hadoop

This produces a messier-looking plan that will store wordCount on HDFS using a series of Hadoop streaming jobs.

Debugging Tips

Another command is --cat which stores a vew and then prints it. So, to step through the program view-by-view you could type.

% python longer-wordcount.py --cat lines | head
% python longer-wordcount.py --cat words | head
% python longer-wordcount.py --cat wordCount | head

If you'd like to speed up the later steps by re-using the results of previous steps, you can do that with

% python longer-wordcount.py --cat lines  | head
% python longer-wordcount.py --cat words --reuse lines.gp | head
% python longer-wordcount.py --cat wordCount --reuse lines.gp words.gp | head

If you want to get even further into the details, you can generate the plan and start running lines from it (or parts of them) one-by-one.

Guinea Pig vs Pig

GuineaPig is like Pig, but embedded in Python, not a new language. So there's less to learn.

In Pig (and relational algebra) operators like Join, etc take fields as arguments. Here, there's nothing exactly like a 'field' in relational algebra. Instead of a fields, we just use a function - usually a lambda function - that takes a row and returns some component of the row.

GuineaPig has two backends, shell and Hadoop streaming. Pig only supports Hadoop streaming. This means that you can test programs more quickly - there's not the overhead of starting up Hadoop jobs.

User Guide

Basic Concepts

A guinea pig view - usually abbreviated rel, r, s, t, - can be "stored" (materialized) to produce an unordered bag of "rows". A "row" is just an arbitrary python data structure (with a few constraints on how it can be serialized, see below). Views can be defined by classes like 'ReadLines', 'ReplaceEach', 'Flatten', 'Group', 'Join', etc.

Aside from debugging aids, about only thing that guinea pig does is to allow you to 'store' a view, which means to materialize it on disk.

The Planner subclass you write should be defined in a separate script file, which should end with code that creates a single instance and calls its main method with sys.argv, like this.

if __name__ == "__main__":
   MyPlanner().main(sys.argv)

An instance of a Planner class knows, via inheritance, how to create a "storage plan" for any view. For instance, if the script above is defined in a file ugp.py, you would use this command to produce a storage plan for the view foo.

   python ugp.py --plan foo

The storage plan is a shell script, so to execute it you'd use

   python ugp.py --plan foo | sh

When the view foo is saved, it is saved in a file named foo.gp. So to see some of the output you'd use

   python ugp.py --plan foo | sh ; head foo.gp

Or as a shortcut you can say

   python ugp.py --cat foo | head

Views are saved on disk, which leads to some constraints on what they can be. A row must be something that can be stored and retrieved by the RowSerializer, which uses 'repr' and 'str', and it cannot contain newlines in its serialized form.

View Types

In almost all of the views below, the first argument is a view, which is omitted when on the right-hand-side of a pipe operator.

TODO: sort these.--Wcohen (talk) 17:35, 9 May 2014 (EDT)

ReadLines: The rows of this view are just strings from the file, in the first case, and python objects produced by running 'eval' on the strings in the second.

r = ReadLines('mytest.txt')

ReplaceEach...by: Here parseLine is a function, and each rows of this view is produced by parseLine(x) for some row x of view r.

parsed = ReplaceEach(r, by=parseLine)

Filter...by: like ReplaceEach, but extracts a subset of the rows of a relation, instead of transforming them.

filtered = Filter(t, by=lambda(row):row[0].endswith(".txt") )

Flatten...by: Like ReplaceEach, but allows you oto use a python function that yields multiple values. This is pretty much the same as a map-reduce Mapper.

def idwordGen(row):
   for w in row['words']: yield (row['id'],w)
x = Flatten(y, by=idwordGen(row))

Group...by: produces a set of tuples (key, [row1,...,rowN]) where the first element is the 'key' extracted using the 'by' function, and [row1,...,rowN] is a list of rows from t that share this key.

u = Group(t, by=lambda(row):row[1], ... )

Group...by..reducingTo: as above, but reducingWith is a GuineaPig 'reducer' object. The output is the result of reducing the list for rows using that object - so here the result would be words plus document-frequencies, rather than words plus inverted indices. This is much like a map-reduce reducer.

u = Group(t, by=lambda(row):row[1], reducingTo=ReduceToCount())

Join: joins are multiway, and the argument is a list of "join input" objects, denoted Jin. A join input is a view v plus a field-extracting function f. The result of a join is a tuple of the form (row1,...,rowk) where rowi is a row in the i-th input of the join.

j = Join(Jin(view1,by=getId), Jin(view2,by=getId))

JoinTo...by: like Join, but designed to be used in a pipeline. It has a single join-input as its first argument, and uses a "by" argument to specify the field of the primary (pipeline) input to be joined in.

j =  Filter(t, by=lambda(row):getId(row)>0 ) | JoinTo( Jin(view2,by=getId), by=getId)

Wrap: creates a view from any iterable python object. Mainly for debugging purposes....

w = Wrap( ['this is a test', 'here is another test'] )

Distinct: removes duplicates from a view.

d = Distinct(x)

Format...by: a special form of ReplaceEach where the 'by' function produces a string, and this string is NOT serialized normally when stored, but just printed. You can use this to get the final output.

f = Format(z, by=lambda(a,b):'a is %s, b is %s' % (a,b))

Details

You can attach options to a view like this

   parsed = ReplaceEach(r, by=parseLine).opts(cached=True)

Ie, the method "v.opts()" returns a copy of v with some options set.

Caching: Right now caching is the only option. Caching works like this: if a view v is marked as 'cached', then the first time a plan needs it, it will be stored explicitly. If the same plan needs v again, the values will be read from the cached file. By default a view is cached if it is used more than once (or if it's part of a certain type of chain of map-reduces, which needs caching to make planning tractable.) You can discourage caching for a view with opts(cached=False) or force it with opts(cached=True).

Cached views aren't deleted after the plan completes, so you can also inspect them for debugging purposes if you like. They are also NOT reused from planning episode to planning episode, unless you ask them to be with the --reuse option, below.

Re-use: If you add the option --reuse to a plan, like this

       python ugp.py --plan foo --reuse v1.gp v2.gp

then instead of building a new plan for storing v1 and v2, GuineaPig will reuse the existing files (which should be previously cached or stored views). So, if you always do

       python ugp.py --plan foo --reuse *.gp

then GuineaPig will have a sort of incremental, make-like behavior: it will reuse everything it can.

Under the Hood

Plans are constructed collaboratively by View instances, and a the GuineaPig class instance. The steps of the plan typically involve invoking the script with arguments like --view=foo and --do=bar, eg like these:

   python ugp.py --rel=u --do=doGroupMap < mytest.txt | sort -k1 > u.gpri
   python ugp.py --rel=u --do=doStoreRows  < u.gpri > u.gp

Guinea pig tries to optimize its plan by stitching together view definitions when possible, but it may need to create 'checkpoint' files, like the 'u.gpri' file above (the extension is for "guinea pig reducer input").

This approach means that the context in which your field-extraction functions work is pretty limited---they are executed by many different unknown processes, in general. So if you try anything fancy in terms of communication (e.g., using a ReplaceEach view to collect information in a process-local variable X and then using X in some subsequent view) it probably won't work.

Way Under the Hood

Every view has an associated checkpoint, which is the last checkpoint file that is need to create that view, and rowGenerator, which is an iterator that produces the rows, one by one, assuming that's it run in a context where the checkpoint is available from stdin.

The checkpoint plan for a view produces the checkpoint. The storage plan stores the view. Guinea pig tries to minimize checkpoints.

Abstract view types are are Reader views, Transformation views (like ReplaceEach), or MapReduce views. The checkpoint for a reader is the file it reads. The checkpoint for a Transformation is the checkpoint for the view its transforming. The checkpoint for a MapReduce is the reducer input.

In planning, the planner keeps track of which cached views will be materialized to avoid regenerating them. So,

  • when returning a storage plan, it checks to see if the files has 'been created' by a previous subplan, and if not marks the file that will be created by the plan.
  • when you --reuse a view, the planner marks this as having been previously 'created', so even the first subplan will be replaced with a null plan.
  • at run time, when the rowGenerator is called, it checks if the cache file actually exists to determine if it should generate or reuse the cached data.