Difference between revisions of "Guinea Pig"
Line 60: | Line 60: | ||
</pre> | </pre> | ||
− | The | + | The <code>Group''</code> view is actually quite flexible: the "by" clause is a lambda that extracts an arbitrary value that defines the groups, and in the absence of a "reducingTo" clause, the result of grouping is a tuple (key,[row1,...,rowN]) where the rowi's are the rows that have the indicated "key" (as extracted by the "by" clause). The "reduceTo" argument is an optimization, which you can define with an instance of the ReduceTo class. For instance, instead of using the |
ReduceToCount() subclass, you could have used | ReduceToCount() subclass, you could have used | ||
<pre> | <pre> |
Revision as of 13:07, 14 May 2014
Contents
Quick Start
to add:
- example of macros
- discuss lambda's versus fields
- sort view definitions
- caching in quick start
- reducer definition
--Wcohen (talk) 17:37, 9 May 2014 (EDT)
Running wordcount.py
Set up a directory that contains the file gp.py
and a
second script called wordcount.py
which contains this
code:
# always start like this from gp import * import sys # supporting routines can go here def tokens(line): for tok in line.split(): yield tok.lower() #always subclass Planner class WordCount(Planner): wc = ReadLines('corpus.txt') | FlattenBy(by=tokens) | Group(by=lambda x:x, reducingWith=ReduceToCount()) # always end like this if __name__ == "__main__": WordCount().main(sys.argv)
Then type the command:
% python tutorial/wordcount.py --store wc
After a couple of seconds it will return, and you can see the wordcounts with
% head wc.gp
Understanding the wordcount example
There's also a less concise but easier-to-explain wordcount file,
longer-wordcount.py
, with this view definition:
class WordCount(Planner): lines = ReadLines('corpus.txt') words = Flatten(lines,by=tokens) wordCount = Group(words, by=lambda x:x, reducingTo=ReduceToCount())
The Group
view is actually quite flexible: the "by" clause is a lambda that extracts an arbitrary value that defines the groups, and in the absence of a "reducingTo" clause, the result of grouping is a tuple (key,[row1,...,rowN]) where the rowi's are the rows that have the indicated "key" (as extracted by the "by" clause). The "reduceTo" argument is an optimization, which you can define with an instance of the ReduceTo class. For instance, instead of using the
ReduceToCount() subclass, you could have used
ReduceTo(int,by=lambda accum,val:accum+1)
where the first argument is the type of the output (and defines the initial value of the accumulator, which here will be int()
, or zero) and the second is the function that is used to reduce values pairwise.
To use this pipeline: if you type
% python longer-wordcount.py
you'll get a brief usage message:
usage: --[store|pprint|plan|cat] view [--echo] [--target hadoop] [--reuse foo.gp bar.gp ...] [other options] --list
Typing
% python longer-wordcount.py --list
will list the views that are defined in the
file: lines
, words
,
and wordCount
If you pprint
one of these,
say wordCount
, you can see what it essentially is:
a Python data structure, with several named subparts
(like words
)
wordCount = Group(words,by=<function <lambda> at 0x10497aa28>,reducingTo=<guineapig.ReduceToCount object at 0x104979190>) | words = Flatten(lines, by=<function tokens at 0x1048965f0>).opts(stored=True) | | lines = ReadLines("corpus.txt")
The "pipe" notation is just a shortcut for nested views: words = ReadLines('corpus.txt') | Flatten(by=tokens)
is equivalent to words = Flatten(ReadLines('corpus.txt') , by=tokens)
or to the
two separate definitions for lines
and words
above. The named variables (like words
) can
be used, via devious pythonic tricks, to access the data structures from the command line. Hence the --store
command
above.
To store a view, GuineaPig will first convert a view structure into a plan for storing the view. To see a plan, you can type something like
% python longer-wordcount.py --plan wordCount
If you typed
% python longer-wordcount.py --plan wordCount | sh
this would equivalent to python longer-wordcount.py --store
wordCount
, modulo some details about how errors are reported.
Notice how this works: the view definition (a data structure) is converted to a plan (a shell script),
and the shell script is then executed, starting up some new processes while it executes. These new processes
invoke additional copies of python longer-wordcount.py
with special arguments, like
python longer-wordcount.py --view=words --do=doStoreRows < corpus.txt > words.gp
which tell Python perform smaller-scale operations associated with individual views, as steps in the overall plan.
Here the word
view is stored for later processing.
The motivation for doing all this is because this sort of process can also be distributed across a cluster using Hadoop streaming. If you're working on a machine that has Hadoop installed you can generate an alternative plan that uses Hadoop streaming:
% python longer-wordcount.py --plan wordCount --target hadoop
This produces a messier-looking plan that will store wordCount
on HDFS using a series of Hadoop streaming jobs.
Debugging Tips
Another command is --cat
which stores a vew and then prints it. So, to step through the program view-by-view you could type.
% python longer-wordcount.py --cat lines | head % python longer-wordcount.py --cat words | head % python longer-wordcount.py --cat wordCount | head
If you'd like to speed up the later steps by re-using the results of previous steps, you can do that with
% python longer-wordcount.py --cat lines | head % python longer-wordcount.py --cat words --reuse lines.gp | head % python longer-wordcount.py --cat wordCount --reuse lines.gp words.gp | head
If you want to get even further into the details, you can generate the plan and start running lines from it (or parts of them) one-by-one.
A More Complicated Example
Here's a more complex problem: finding words that are much more frequent in one corpus than another. We have an ad hoc scoring function, and we will run two copies of the wordcount pipeline and compare the results with a Join. A Join of k views will produce a k-tuple, with one component for each row of the input views, so we follow the join with a ReplaceEach that cleans up the structure and computes the final score.
from guineapig import * import sys import math def tokens(line): for tok in line.split(): yield tok.lower() def score(n1,n2): return math.log((n1 + 1.0)/(n1 + n2 + 2.0)) #always subclass Planner class WordCmp(Planner): def wcPipe(fileName): return ReadLines(fileName) | Flatten(by=tokens) | Group(by=lambda x:x, reducingTo=ReduceToCount()) wc1 = wcPipe('bluecorpus.txt') wc2 = wcPipe('redcorpus.txt') cmp = Join( Jin(wc1, by=lambda(word,n):word), Jin(wc2, by=lambda(word,n):word) ) \ | ReplaceEach(by=lambda((word1,n1),(word2,n2)):(word1, score(n1,n2))) result = Format(cmp, by=lambda(word,blueScore):'%6.4f %s' % (blueScore,word)) # always end like this if __name__ == "__main__": WordCmp().main(sys.argv)
Note that since the wordcount pipeline is just part of a data structure, we can write a "macro" by simply using a Python function which returns the data structure. (Of course, we can only store components of the macro if they are bound to some variables in the top-level Planner subclass you're defining.)
Guinea Pig vs Pig
GuineaPig is like Pig, but embedded in Python, not a new language. So there's less to learn.
In Pig (and relational algebra) operators like Join, etc take fields as arguments. Here, there's nothing exactly like a 'field' in relational algebra. Instead of a fields, we just use a function - usually a lambda function - that takes a row and returns some component of the row.
GuineaPig has two backends, shell and Hadoop streaming. Pig only supports Hadoop streaming. This means that you can test programs more quickly - there's not the overhead of starting up Hadoop jobs.
User Guide
Basic Concepts
A guinea pig view - usually abbreviated rel, r, s, t, - can be "stored" (materialized) to produce an unordered bag of "rows". A "row" is just an arbitrary python data structure (with a few constraints on how it can be serialized, see below). Views can be defined by classes like 'ReadLines', 'ReplaceEach', 'Flatten', 'Group', 'Join', etc.
Aside from debugging aids, about only thing that guinea pig does is to allow you to 'store' a view, which means to materialize it on disk.
The Planner subclass you write should be defined in a separate script file, which should end with code that creates a single instance and calls its main method with sys.argv, like this.
if __name__ == "__main__": MyPlanner().main(sys.argv)
An instance of a Planner class knows, via inheritance, how to create a "storage plan" for any view. For instance, if the script above is defined in a file ugp.py, you would use this command to produce a storage plan for the view foo.
python ugp.py --plan foo
The storage plan is a shell script, so to execute it you'd use
python ugp.py --plan foo | sh
When the view foo is saved, it is saved in a file named foo.gp. So to see some of the output you'd use
python ugp.py --plan foo | sh ; head foo.gp
Or as a shortcut you can say
python ugp.py --cat foo | head
Views are saved on disk, which leads to some constraints on what they can be. A row must be something that can be stored and retrieved by the RowSerializer, which uses 'repr' and 'str', and it cannot contain newlines in its serialized form.
View Types
In almost all of the views below, the first argument is a view, which is omitted when on the right-hand-side of a pipe operator.
ReadLines: The rows of this view are just strings from the file, in the first case, and python objects produced by running 'eval' on the strings in the second.
r = ReadLines('mytest.txt')
ReplaceEach...by: Here parseLine is a function, and each rows of this view is produced by parseLine(x) for some row x of view r.
parsed = ReplaceEach(r, by=parseLine)
Filter...by: like ReplaceEach, but extracts a subset of the rows of a relation, instead of transforming them.
filtered = Filter(t, by=lambda(row):row[0].endswith(".txt") )
Flatten...by: Like ReplaceEach, but allows you oto use a python function that yields multiple values. This is pretty much the same as a map-reduce Mapper.
def idwordGen(row): for w in row['words']: yield (row['id'],w) x = Flatten(y, by=idwordGen(row))
Group...by: produces a set of tuples (key, [row1,...,rowN]) where the first element is the 'key' extracted using the 'by' function, and [row1,...,rowN] is a list of rows from t that share this key.
u = Group(t, by=lambda(row):row[1], ... )
Group...by..reducingTo: as above, but reducingWith is a GuineaPig 'reducer' object. The output is the result of reducing the list for rows using that object - so here the result would be words plus document-frequencies, rather than words plus inverted indices. This is much like a map-reduce reducer.
u = Group(t, by=lambda(row):row[1], reducingTo=ReduceToCount())
Join: joins are multiway, and the argument is a list of "join input" objects, denoted Jin. A join input is a view v plus a field-extracting function f. The result of a join is a tuple of the form (row1,...,rowk) where rowi is a row in the i-th input of the join.
j = Join(Jin(view1,by=getId), Jin(view2,by=getId))
JoinTo...by: like Join, but designed to be used in a pipeline. It has a single join-input as its first argument, and uses a "by" argument to specify the field of the primary (pipeline) input to be joined in.
j = Filter(t, by=lambda(row):getId(row)>0 ) | JoinTo( Jin(view2,by=getId), by=getId)
Wrap: creates a view from any iterable python object. Mainly for debugging purposes....
w = Wrap( ['this is a test', 'here is another test'] )
Distinct: removes duplicates from a view.
d = Distinct(x)
Format...by: a special form of ReplaceEach where the 'by' function produces a string, and this string is NOT serialized normally when stored, but just printed. You can use this to get the final output.
f = Format(z, by=lambda(a,b):'a is %s, b is %s' % (a,b))
Details
You can attach options to a view like this
parsed = ReplaceEach(r, by=parseLine).opts(stored=True)
Ie, the method "v.opts()" returns a copy of v with some options set.
Eager storing: Right now the only option is stored=X.. Storing works like this: if a view v is marked as 'stored=True', then the first time a plan needs it, it will be stored explicitly. If the same plan needs v again, the values will be read from the stored file. By default a view is stored if it is used more than once (or if it's part of a certain type of chain of map-reduces, which needs caching to make planning tractable.) You can discourage eager storing for a view with opts(stored=False) as well as force it with opts(stored=True).
Stored views aren't deleted after the plan completes, so you can also inspect them for debugging purposes if you like. They are also NOT reused from planning episode to planning episode, unless you ask them to be with the --reuse option, below.
Re-use: If you add the option --reuse to a plan, like this
python ugp.py --plan foo --reuse v1.gp v2.gp
then instead of building a new plan for storing v1 and v2, GuineaPig will reuse any existing files (which should be previously stored views). So, if you always do
python ugp.py --plan foo --reuse *.gp
then GuineaPig will have a sort of incremental, make-like behavior: it will reuse everything it can.
Under the Hood
Plans are constructed collaboratively by View instances, and a the GuineaPig class instance. The steps of the plan typically involve invoking the script with arguments like --view=foo and --do=bar, eg like these:
python ugp.py --rel=u --do=doGroupMap < mytest.txt | sort -k1 > u.gpri python ugp.py --rel=u --do=doStoreRows < u.gpri > u.gp
Guinea pig tries to optimize its plan by stitching together view definitions when possible, but it may need to create 'checkpoint' files, like the 'u.gpri' file above (the extension is for "guinea pig reducer input").
This approach means that the context in which your field-extraction functions work is pretty limited---they are executed by many different unknown processes, in general. So if you try anything fancy in terms of communication (e.g., using a ReplaceEach view to collect information in a process-local variable X and then using X in some subsequent view) it probably won't work.
Way Under the Hood
Every view has an associated checkpoint, which is the last checkpoint file that is need to create that view, and rowGenerator, which is an iterator that produces the rows, one by one, assuming that's it run in a context where the checkpoint is available from stdin.
The checkpoint plan for a view produces the checkpoint. The storage plan stores the view. Guinea pig tries to minimize checkpoints.
Abstract view types are are Reader views, Transformation views (like ReplaceEach), or MapReduce views. The checkpoint for a reader is the file it reads. The checkpoint for a Transformation is the checkpoint for the view its transforming. The checkpoint for a MapReduce is the reducer input.
In planning, the planner keeps track of which views will be materialized to avoid regenerating them. So,
- when returning a storage plan, it checks to see if the files has 'been created' by a previous subplan, and if not marks the file that will be created by the plan.
- when you --reuse a view, the planner marks this as having been previously 'created', so even the first subplan will be replaced with a null plan.
- at run time, when the rowGenerator is called, it checks if the cache file actually exists to determine if it should generate or reuse the stored data.