Guinea Pig
Contents
- 1 Quick Start
- 1.1 Aside: versions
- 1.2 What you should know before you start
- 1.3 Running wordcount.py
- 1.4 Understanding the wordcount example
- 1.5 Debugging Tips
- 1.6 A More Complicated Example With Joins
- 1.7 A More Complex Example Using Group
- 1.8 Parameters and Programmatically Accessing Guinea Pig
- 1.9 Views that Include Custom Data Structures
- 1.10 Side Views in GuineaPig
- 1.11 Review and Discussion of Guinea Pig vs Pig
- 2 Using Guinea Pig on EC2
- 3 Reference Guide
Quick Start
Aside: versions
The most default, and most, recent version of GuineaPig is imported when you use import guineapig
. Most of the changes between 1.1 and 1.3 are not API changes, so they should not affect the tutorial. If you want to use a particular version of Guinea Pig, then import it explicitly.
What you should know before you start
Guinea Pig is similar to Pig, but intended to be easier to learn and debug. It's probably a good idea to be somewhat familiar with Pig before you start this tutorial.
I also assume a reading knowledge of Python and core Unix commands. If you're
going to use Hadoop, you should also be somewhat familiar with (at least) the Hadoop file system and the hadoop fs
subcommands.
Running wordcount.py
Download the tutorial tarball and unpack it.
It contains a contains a few small corpora and the file wordcount.py
which has this code:
# always start like this from guineapig import * import sys # supporting routines can go here def tokens(line): for tok in line.split(): yield tok.lower() #always subclass Planner class WordCount(Planner): wc = ReadLines('corpus.txt') | FlattenBy(by=tokens) | Group(by=lambda x:x, reducingWith=ReduceToCount()) # always end like this if __name__ == "__main__": WordCount().main(sys.argv)
Then type the command:
% python wordcount.py --store wc
After a couple of seconds it will return, and you can see the wordcounts with
% head gpig_views/wc.gp
Understanding the wordcount example
A longer example
In the example above, wc
is a GuineaPig view. A use the term "view" for noth (1) a procedural
definition of how to process data, using a particular set of operators and (2) data produced by a view. The rest of this tutorial will help you understand
views and how to write them. To start, the tutorial directory also has a less concise but easier-to-explain wordcount file,
longer-wordcount.py
, with this view definition:
class WordCount(Planner): lines = ReadLines('corpus.txt') words = Flatten(lines,by=tokens) wordCount = Group(words, by=lambda x:x, reducingTo=ReduceToCount())
You see this is actually defined in three phases: one to read the lines from a file, one to convert them into a stream of tokens, and one to count the tokens. I'll use this example to explain some of the key ideas in GuineaPig.
Objects instead of records, and functions instead of fields
GuineaPig has no notion of records or fields. Instead, views contain sets of Python objects. Conceptually the sets are unordered and can only be streamed through sequentially. These objects can be anything: e.g., the output of ReadLines is a stream of Python strings, one per line. Usually the objects are tuples. Here I'll use the term row for an object in a view.
To access part of an row, as when
you 'Group' by a field, you use a function that takes the object and
returns the appropriate subpart. Python lets you construct an unnamed
function on-the-fly with the lambda
key word. The 'by'
argument here is just the identity function, so here we're grouping by an
entire row.
The Group
view is actually quite flexible, since the "by" clause is an
arbitrary function that that defines the groups. In the absence of a "reducingTo" clause,
the result of grouping is a tuple (key,[row1,...,rowN]) where the rowi's are the rows that have the
indicated "key" (as extracted by the "by" clause).
The "reduceTo" argument is an optimization, which you can define with an instance of the ReduceTo class.
For instance, the
ReduceToCount() class is such a subclass, defined as
ReduceTo(int,by=lambda accum,val:accum+1)
where the first argument is the type of the output (and defines the initial value of the accumulator, which here will be the output of int()
, or zero) and the second is the function that is used to reduce values pairwise.
Once again: note the use of functions as parameters in Group
and Flatten
. Guinea Pig has no notion of records: rows can be any python object (although they are often tuples). So if you want to do the equivalent of extracting a field from a record, you should use a function. Python's lambda functions and argument unpacking features make this fairly convenient, as the examples below show.
Using the pipeline
To use this pipeline: if you type
% python longer-wordcount.py
you'll get a brief usage message:
usage: --[store|pprint|plan|cat] view [--opts key:val,...] [--params key:val,...] --reuse view1 view2 ...] --[list] current legal keys for "opts", with default values: echo:0 viewdir:gpig_views parallel:5 streamJar:/usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.2.0.1.3.0.0-107.jar target:shell There's more help at http://curtis.ml.cmu.edu/w/courses/index.php/Guinea_Pig
Typing
% python longer-wordcount.py --list
will list the views that are defined in the
file: lines
, words
,
and wordCount
. If you pprint
one of these,
say wordCount
, you can see what it essentially is:
a Python data structure, with several named subparts
(like words
), which pprint
will present as a tree. (The vertical bars in this printout
are just to indicate the indent level, by the way, so don't confuse them with the "pipe" notation used for chaining together views.)
wordCount = Group(words,by=<function <lambda> at 0x10497aa28>,reducingTo=<guineapig.ReduceToCount object at 0x104979190>) | words = Flatten(lines, by=<function tokens at 0x1048965f0>).opts(stored=True) | | lines = ReadLines("corpus.txt")
Returning to the previous wordcount example, the "pipe" notation is just a shortcut for
nested views: words = ReadLines('corpus.txt') | Flatten(by=tokens)
is equivalent to words = Flatten(ReadLines('corpus.txt') , by=tokens)
or to the
two separate definitions for lines
and words
above. The named variables (like words
) can
be used, via devious Pythonic tricks, to access the data structures from the command line. Hence the --store
command
above.
To store a view, GuineaPig will first convert a view structure into a plan for storing the view. To see this plan, you can type something like
% python longer-wordcount.py --plan wordCount
If you typed
% python longer-wordcount.py --plan wordCount | sh
this would equivalent to python longer-wordcount.py --store
wordCount
, modulo some details about how errors are reported.
Currently the plan has these steps:
# echo map/reduce wordCount/wordCount: read corpus.txt with lines...flatten to words...group to wordCount python longer-wordcount.py --view=wordCount --do=doGroupMap < corpus.txt \ | sort -k1 | python longer-wordcount.py --view=wordCount --do=doStoreRows \ > gpig_views/wordCount.gp
The plan reveals how GuineaPig works: the view definition (a data structure) is converted to a plan (a shell script),
and the shell script is then executed, starting up some new processes while it executes. These new processes
invoke additional copies of python longer-wordcount.py
with special arguments, like
python longer-wordcount.py --view=words --do=doStoreRows < corpus.txt > words.gp
which tell Python perform smaller-scale operations associated with individual views, as steps in the overall plan.
The finally result of the plan to store the view wordCount
is to create a file called
wordCount.gp
in a certain directory, the GUineaPig viewdir, which is by default gpig_views
.
If you like, you can see what these steps do by invoking them, or parts of them, yourself: for instance, here's what the output of the doGroupMap operation looks like:
wcohen@shell2:~/pyhack/guineapig/tutorial$ head -1 corpus.txt Quite a difference. Think it has anything to do with all that coke? wcohen@shell2:~/pyhack/guineapig/tutorial$ python longer-wordcount.py --view=wordCount --do=doGroupMap < corpus.txt| head -10 'quite' 'quite' 'a' 'a' 'difference.' 'difference.' 'think' 'think' 'it' 'it' 'has' 'has' 'anything' 'anything' 'to' 'to' 'do' 'do' 'with' 'with' Traceback (most recent call last): File "longer-wordcount.py", line 19, in <module> ... [Python's usual complaint's about writing to a "head" command] ... IOError: [Errno 32] Broken pipe
The steps in the plan do not correspond one-to-one with the steps in the view definition: some optimization happens. We'll talk about this a little more below when we discuss debugging techniques.
Using Hadoop
The motivation for doing all this is because this sort of process can also be distributed across a cluster using Hadoop streaming. On a machine with hadoop installed, you can easily generate an alternative plan that uses Hadoop streaming:
% python longer-wordcount.py --plan wordCount --opts target:hadoop,viewdir:/user/wcohen/gpig_views
This produces a messier-looking plan that will store wordCount
on HDFS using a series of Hadoop streaming jobs.
To do this planning (let alone execution), we'll need to manually do a little setup: in particular, we need
to first create a HDFS location for GuineaPig, and give a complete, non-relative path to that location with the viewdir
option. You also need to tell GuineaPig where to find the Hadoop streaming jar, via the streamJar
option
or the GP_STREAMJAR environment variable.
To summarize, you should be able to do something like
% hadoop fs -mkdir /user/wcohen/gpig_views % export GP_STREAMJAR=/some/place/on/your/hadoop/installation % hadoop fs -copyFromLocal corpus.txt /user/wcohen % python longer-wordcount.py --store wordCount --opts target:hadoop,viewdir:/user/wcohen/gpig_views
to run the wordcount example on Hadoop.
Note: as of version 1.3.2: values of keys given in the --opts command are assumed to be URL-escaped, so it's now
possible to pass in a viewdir location that contains a colon. For example, you can pass in the view directory
hdfs://localhost:9000/users/wcohen/gpig_views
with --opts viewdir:hdfs%3A//localhost%3A9000/users/wcohen/gpig_views
.
For the GHC cluster the right location for GP_STREAMJAR is /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.1.jar
.
Debugging Tips
If you're code's not working you can use any Python tools to debug it. You should not print any debug messages to standard output, but you can print them to standard error. This is true for "normal" as well as Hadoop runs.
If you want to step through the various stages of your view definition, it's quite easy to do, by storing the intermediate results and inspecting them.
To make this a little more convenient, there's another command-line option, --cat
, which stores a view and then prints it.
The cat command only works in local (non-Hadoop mode).
So, to step through the wordcount program view-by-view you could type:
% python longer-wordcount.py --cat lines | head % python longer-wordcount.py --cat words | head % python longer-wordcount.py --cat wordCount | head
If you'd like to speed up the later steps by re-using the results of previous steps, you can do that with
% python longer-wordcount.py --cat lines | head % python longer-wordcount.py --cat words --reuse lines | head % python longer-wordcount.py --cat wordCount --reuse lines words | head
This will re-use previously-computed views, if they are present in the viewdir
directory and marked as complete.
You can also use any filename with write basename instead of the view name as the argument to --reuse
, so
it usually works to say something like:
% python longer-wordcount.py --cat wordCount --reuse gpig_views/*.gp | head
As noted above, if you want to get even further into the details of what's happening, you can generate the plan and start running lines from it (or parts of them) one-by-one.
Debugging Hadoop
When you run on Hadoop, the logs contain a web page with job status (the Tracking URL). If your job fails, the Python error stack is not very interesting - it does say which Hadoop command failed, but not where in the Python program it failed. To see that, you need to go to the Tracking URL and navigate through to the error logs for a failing job.
hadoop -input ... -output ... -mapper ... ... 14/08/07 03:49:24 INFO streaming.StreamJob: Running job: job_201407141501_0266 14/08/07 03:49:24 INFO streaming.StreamJob: To kill this job, run: 14/08/07 03:49:24 INFO streaming.StreamJob: /usr/local/hadoop-1.0.1/libexec/../bin/hadoop job -Dmapred.job.tracker=ghc81.ghc.andrew.cmu.edu:9001 -kill job_201407141501_0266 14/08/07 03:49:24 INFO streaming.StreamJob: Tracking URL: http://ghc81.ghc.andrew.cmu.edu:50030/jobdetails.jsp?jobid=job_201407141501_0266 14/08/07 03:49:25 INFO streaming.StreamJob: map 0% reduce 0% 14/08/07 03:50:15 INFO streaming.StreamJob: map 100% reduce 100% 14/08/07 03:50:15 INFO streaming.StreamJob: To kill this job, run: 14/08/07 03:50:15 INFO streaming.StreamJob: /usr/local/hadoop-1.0.1/libexec/../bin/hadoop job -Dmapred.job.tracker=ghc81.ghc.andrew.cmu.edu:9001 -kill job_201407141501_0266 14/08/07 03:50:15 INFO streaming.StreamJob: Tracking URL: http://ghc81.ghc.andrew.cmu.edu:50030/jobdetails.jsp?jobid=job_201407141501_0266 14/08/07 03:50:15 ERROR streaming.StreamJob: Job not successful. ... ... Traceback (most recent call last): File "gpig/extra.py", line 115, in <module> wc.main(sys.argv) File "/afs/andrew.cmu.edu/usr3/pompraka/gpig/guineapig.py", line 1106, in main self.runMain(argv) File "/afs/andrew.cmu.edu/usr3/pompraka/gpig/guineapig.py", line 1139, in runMain plan.execute(self, echo=self.opts['echo']) File "/afs/andrew.cmu.edu/usr3/pompraka/gpig/guineapig.py", line 776, in execute subprocess.check_call(shellcom,shell=True) File "/usr/local/lib/python2.7/subprocess.py", line 511, in check_call raise CalledProcessError(retcode, cmd) subprocess.CalledProcessError: Command 'hadoop jar ....' returned non-zero exit status 1
On the Gates cluster, one way to open the Tracking URL is to ssh -X
in to the cluster, and open a browser. (You can't open those URLs with an external browser for security reasons).
If you think you've fixed the problem, there's also a shortcut way of testing it, without running the whole workflow. You can also cut-and-paste the specific Hadoop command that failed (after subprocess.CalledProcessError:
, manually clear the -output
HDFS location, and re-run the failing command. If it succeeds and gives the right output, you can go on to the next bug.
A More Complicated Example With Joins
Here's a more complex problem: finding words that are much more frequent in one corpus than another. We have an ad hoc scoring function, and we will run two copies of the wordcount pipeline and compare the results with a Join
.
from guineapig import * import sys import math def tokens(line): for tok in line.split(): yield tok.lower() def score(n1,n2): return math.log((n1 + 1.0)/(n1 + n2 + 2.0)) #always subclass Planner class WordCmp(Planner): def wcPipe(fileName): return ReadLines(fileName) | Flatten(by=tokens) | Group(by=lambda x:x, reducingTo=ReduceToCount()) wc1 = wcPipe('bluecorpus.txt') wc2 = wcPipe('redcorpus.txt') cmp = Join( Jin(wc1, by=lambda(word,n):word), Jin(wc2, by=lambda(word,n):word) ) \ | ReplaceEach(by=lambda((word1,n1),(word2,n2)):(word1, score(n1,n2))) result = Format(cmp, by=lambda(word,blueScore):'%6.4f %s' % (blueScore,word)) # always end like this if __name__ == "__main__": WordCmp().main(sys.argv)
There are several new things here, so let's discuss them one-by-one.
First, since the wordcount pipeline is just part of a data structure, we can write a "macro" by simply using a Python function which returns the data structure.
The wcPipe
uses this trick to save some typing in building two copies of the original wordcount pipeline.
Second, we're using unpacking to access rows which are tuples. This is a common pattern in Python, often used, e.g., to return multiple values from a function, as in
def divide(x, y): quotient = x/y remainder = x % y return (quotient, remainder) (q, r) = divide(22, 7)
In particular, the output of the wordcount pipelines are tuples: (word,count)
. A lambda function for these rows has the form shown: lambda (word,n):word
returns the first element of the tuple. Alternatively, I could have written lambda x:x[0].
An important point: a lambda function does NOT have parenthesis around it's arguments, so lambda (a,b):...
is quite different from
lambda a,b:...
. The former takes ONE argument, which must be a tuple or list with exactly two elements, and binds a
to the first
and b
to the second. The second takes TWO arguments. If you used a defined function, you use any of these to get the same effect:
def getWord((word,n)): return n def getWord(row): return row[0]
Third, we're using a Join
view. The input of a join is defined by a substructure Jin
which specifies a view and a "by" function. Joining k views will produce a k-tuple, with one component for each row of the input views, so we follow the join with a ReplaceEach that cleans up the structure and computes the final score. Again we're using argument unpacking to access the newly-created element, which is now a tuple-of-tuples of the form ((word1,n1),(word2,n2))
where, of course, word1 and word2 are the same. Python's argument unpacking handles this nicely.
(Note that argument unpacking is not quite unification, if you know what that is: e.g., you could not use ((word,n),(word,n))
here.)
Fourth, Format
is almost exactly like a ReplaceEach
: the only difference is that when the view is stored, instead of printing the repr
of each row, the str
for each row is printed. This is used for the final output of a pipeline.
A More Complex Example Using Group
There are two important variants of the Group view. One which we discussed above is the "reducingTo" argument. The value of this
should be an instance of a class called ReduceTo
, which will will be applied pairwise to all the elements of the rows in the group.
Specifically, each Reduce
instance has two parts: a baseType
, and a "by" function. The baseType
is usually a standard Python class like int
, float
, or list
. The "by" takes two arguments,
an accumulator and the next value from the list, and returns a new accumulator value. The initial accumulator value is provided by baseType
.
When called on a list a1....an
, what happens is the following.
- The initial value of the accumulator is set as
accum0 = baseType()
. - The "by" function
f
is called to produce the next accumulator:accum1 = by(accum0,a1)
- The "by" function
f
is called to produce the next accumulator:accum2 = by(accum0,a2)
- ... and so on.
You can create a custom reduce-to instance, by providing these arguments. For instance, to aggregate the word counts in wc
by prefix,
you could use:
pc1 = Group(wc, by=lambda (word,count):word[:k], reducingTo=ReduceTo(int, lambda accum,(word,count): accum+count))
Here the "by" function for the ReduceTo
takes a stream of (word,count)
pairs, produced by
grouping on the first k letters of the word, and adds up the counts.
A slightly more efficient alternative is to use the retaining
option to Group. This is a function that's called
before the rows in a group are sent to the ReduceTo
class. It can be used to simplify the rows, so that the
ReduceTo
's "by" function is shorter. For instance, here we can just keep the counts, and discard the word
part of each pair in a Group:
pc2 = Group(wc, by=lambda (word,count):word[:k], retaining=lambda (word,count):count, reducingTo=ReduceTo(int, lambda accum,val:accum+val))
ReduceTo(int, lambda accum,val:accum+val)
is actually included in GuineaPig as ReduceToSum()
, so in the code below,
the three implementations are equivalent.
from guineapig import * import sys def tokens(line): for tok in line.split(): yield tok.lower() k = 2 class PrefCount(Planner): wc = ReadLines('corpus.txt') | Flatten(by=tokens) | Group(by=lambda x:x, reducingTo=ReduceToCount()) pc1 = Group(wc, by=lambda (word,count):word[:k], reducingTo=ReduceTo(int, lambda accum,(word,count): accum+count)) pc2 = Group(wc, by=lambda (word,count):word[:k], retaining=lambda (word,count):count, reducingTo=ReduceTo(int, lambda accum,val:accum+val)) pc3 = Group(wc, by=lambda (word,count):word[:k], retaining=lambda (word,count):count, reducingTo=ReduceToSum()) if __name__ == "__main__": PrefCount().main(sys.argv)
Parameters and Programmatically Accessing Guinea Pig
In Pig, parameters can be passed in from the command line, and used in
Pig programs, where they are marked as parameters with a dollar sign
prefix. In Guinea Pig, you can pass in a set of parameters with the
option --params key1:val1,kay2:val2,...
. To access them,
you use the special function GPig.getArgvParams()
, which
returns a Python dict
mapping strings to strings. For
instance, this program
from guineapig import * import sys import os def tokens(line): for tok in line.split(): yield tok.lower() class WordCount(Planner): D = GPig.getArgvParams() wc = ReadLines(D['corpus']) | Flatten(by=tokens) | Group(reducingTo=ReduceToCount()) if __name__ == "__main__": WordCount().main(sys.argv)
could be invoked with
python param-wordcount.py --params corpus:bluecorpus.txt --store wc
Of course, sometimes you'll want a little more control over a Guinea Pig script. In Pig, there are APIs for Java and Python which let you can create one (or more) Pig script objects, which can be parameterized and run. This is hard to do with the mechanisms I've described so far: a Guinea Pig script isn't actually an object, but a specially-configured subclass of Planner.
However, you can also create a Guinea Pig script by configuring an
instance of a Planner object. The process is the same,
conceptually, but a little less concise. Here's an instance version of
the WordCount program---note you have to complete the instance by
calling setup()
after all views have been added.
from guineapig import * import sys def wordCountScript(): def tokens(line): for tok in line.split(): yield tok.lower() p = Planner() p.lines = ReadLines('corpus.txt') p.words = Flatten(p.lines,by=tokens) p.wordCount = Group(p.words, by=lambda x:x, reducingTo=ReduceToCount()) p.setup() #always end with this return p # always end like this if __name__ == "__main__": wordCountScript().main(sys.argv)
I also moved the definition of tokens
to a less
globally-accessible place. Planner instances are treated the same as
planner subclasses with these exceptions:
- You need to call
setup
explicitly after configuring an Planner instance (i.e., after defining all the views.) - Planner instances have local parameters, which are accessible via the instance variable
param
(also a Pythondict
). The local params can be set in two ways:- Via the
--params
command line argument. - In the
Planner()
constructor, by adding keyword arguments. In case of conflict, the constructor's values take precedence.
- Via the
To illustrate this a little more, here's an even more complex wordcount. This one supports the command-line usage
% python multi-wordcount.py pref corpus1 corpus2 ...
and it will build a wordcount file for all the words in each corpus
which start with the prefix pref
, saving them in
files named wc-for-corpus1
, ..., etc.
First, we'll write some code to generate an appropriate planner
instance. This one takes two arguments -- the corpus name, and a
prefix -- and returns a configured Planner()
object.
def wc(corpus=None,prefix=None): """Return a planner instance that can word-count a corpus.""" p = Planner(corpus=corpus,prefix=prefix) #self.param is automatically constructed p.r = ReadLines(p.param['corpus']) def tokens(line): for tok in line.split(): yield tok.lower() p.s = Flatten(p.r, by=tokens) if p.param['prefix']: p.f = Filter(p.s, by=lambda w:w.startswith(p.param['prefix'])) else: p.f = p.s p.t = Group(p.f, reducingTo=ReduceToCount()) p.setup() #must call AFTER you finish adding views return p
This should be pretty familiar: notice the the configuration can be
conditional on the paramers (we're using that here to not introduce a
Filter
for a null prefix). Also note we close by calling
setup()
on the new planner.
Now let's show you the main
. When a step within a plan
is invoked, you need to dispatch to the right main procedure, so the
first step is see if this is the case. If not, then you invoke a main
which parses out the command-line arguments pref corpus1 corpus2
...
. This code will iterate over the corpus arguments,
positions 2:N in the sys.argv
list, and create a
planner instance using the function wc()
above:
if __name__ == "__main__": #run subplan step, if necessary if Planner.partOfPlan(sys.argv): wc().main(sys.argv) else: #run any main you want - this one's usage is python mult-wordcount.py prefix corpus1 corpus2 .... prefix = sys.argv[1] for a in sys.argv[2:]: #create the planner instance planner = wc(corpus=a,prefix=prefix) #decide where to store the wordcount data v = planner.getView('t') targetFile = 'wc-for-'+os.path.basename(a) print 'storing this view in',targetFile v.pprint() #print the view v.storagePlan().execute(planner) #store it #move the stored file to its final location os.rename(v.storedFile(), targetFile)
The code gives several examples of things you can do with a view: grab a particular view, print it, and store it by executing the storage plan for it.
Views that Include Custom Data Structures
The examples so far have used built-in Python types as rows. It's fine to use your own types, but if you do,
GuineaPig needs to know how to save them to disk (serialize them) and deserialize them. By default, it will serialize
objects with Python's repr
function, and deserialize them by running a broken version of eval - in
fact a version that has been very deliberately broken, for security reasons, to not call any Python functions at all.
If you want to use your own data structures, you need to provide a less broken eval
routine to deserialize the rows.
Specifically, you should do the following (in version 1.3 and up).
if __name__ == "__main__": wc = WordCount().setEvaluator(GPig.SafeEvaluator({'WordStat':WordStat})) wc.main(sys.argv)
where the argument to SafeEvaluator
is an dictionary that maps the class names you'd like to evaluate to the appropriate, locally-defined classes.
(More generally, you could pass any object tat implements an eval
method to setEvaluator
, but you should
do it this way.)
Here's a complete example that uses the Python collections.namedtuple
package to
define a custom data structure, which is used to find the most common words in a corpus.
from guineapig import * import sys import math import collections def tokens(line): for tok in line.split(): yield tok.lower() WordStat = collections.namedtuple('WordStat','word count') class WordCount(Planner): wcTuple = ReadLines('corpus.txt') | Flatten(by=tokens) | Group(by=lambda x:x, reducingTo=ReduceToCount()) wcNamedTuple = ReplaceEach(wcTuple, by=lambda (w,n):WordStat(word=w,count=n)).opts(stored=True) commonWords = Filter(wcNamedTuple, by=lambda ws:ws.count>=200) | Group(by=WordStat.count.fget) # always end like this if __name__ == "__main__": wc = WordCount().setEvaluator(GPig.SafeEvaluator({'WordStat':WordStat})) wc.main(sys.argv)
Side Views in GuineaPig
GuineaPig's view definitions are composed incrementally in a way that is efficient for large views. However, if you want to use a small view, one that can be easily loaded into memory, the view definitions are sometimes a clumsy way of doing this. Sometimes these sorts of additional view inputs to a pipeline are called side views.
GuineaPig contains a view class called Augment
that lets you
- construct a series of side views, using the planner
- bundle them together into whatever data structure you like - let's call this the side information
- add this side information to every row of an existing view - specifically by constructing a new pair
(r,s)
wherer
is the old row ands
is the side information.
Usually you follow this with some other operation, say a ReplaceEach
, which will actually use the side information to do something.
Note that the side views are (usually) loaded only once (or if using Hadoop, once per mapper), when the view data structure is created, and
only one copy of the side information is created - i.e., generally s
will be a pointer to a data structure that is shared across the
different rows. Modifying s
will have unpredictable results, so don't do it.
Here's a simple example.
from guineapig import * import sys import math import logging def tokens(line): for tok in line.split(): yield tok.lower() class WordProb(Planner): wc = ReadLines('corpus.txt') | Flatten(by=tokens) | Group(by=lambda x:x, reducingTo=ReduceToCount()) total = Group(wc, by=lambda x:'ANY',reducingTo=ReduceTo(float,lambda accum,(word,count):accum+count)) | ReplaceEach(by=lambda (dummy,n):n) wcWithTotal = Augment(wc, sideview=total, loadedBy=lambda v:GPig.onlyRowOf(v)) prob = ReplaceEach(wcWithTotal, by=lambda ((word,count),n): (word,count,n,float(count)/n)) # always end like this if __name__ == "__main__": WordProb().main(sys.argv)
The total
view
simply adds up all the wordcount tuples using a Group
.
Before wcWithTotals
is stored, the system will first
make sure that total
is stored, and copy that stored view
into the working directory. Then it will call the function
associated with the wcWithTotals
's "loadedBy" argument. (If you run
this on Hadoop, it will upload the preloaded view's stored
version appropriately to the worker nodes).
In general the usage allows a list of preloaded views: just replace the argument sideview
with
sideviews
. The "loadedBy" function is called the appropriate number of views as arguments.
If you want to read in the contents of the views, you're encouraged to use the utility functions GPig.rowsOf(view) or GPig.onlyRowOf(view).
If you want to access the file directly, you should open the file view.distributableFile().
How it works: The way side views are implemented is worth discussing, because it's not invisible to you. A plan that includes
an Augment
view will include a step that copies the side view's contents into your working directory (i.e., the directory
that pwd
prints out). In Hadoop, this will involve a hadoop fs -getmerge...
operation. In local mode,
this will just involve a copy. In Hadoop, the side view will then be distributed to worker processes via a -file
option.
Review and Discussion of Guinea Pig vs Pig
GuineaPig is like Pig, but embedded in Python, not a new language. So if you know Python, there's less to learn. If not, at least you're learning something elegant and stable.
GuineaPig has two backends: shell commands, and Hadoop streaming. Pig only supports Hadoop. This means that you can test programs more quickly in GuineaPig- there's not the overhead of starting up Hadoop jobs.
In Pig (and relational algebra) operators like Join, etc take fields as arguments. Here, there's nothing exactly like a 'field' in relational algebra. Instead of a fields, we just use a function - usually a lambda function - that takes a row object and returns some component of the row, and any object can serve as a "row".
GuineaPig does not have any special-purpose mechanism for macros, as Pig does - but it's easy to get that effect using Python functions which return views.
Using Guinea Pig on EC2
Setting up EC2
Reference Guide
Basic Concepts and Command-Line Usage
A guinea pig view - below abbreviated rel, r, s, t, - can be "stored" (materialized) to produce an unordered bag of "rows". A "row" is just an arbitrary Python data structure (with a few constraints on how it can be serialized, see below). Views can be defined by classes like 'ReadLines', 'ReplaceEach', 'Flatten', 'Group', 'Join', etc.
Aside from debugging aids, about only thing that Guinea Pig does is to allow you to 'store' a view, which means to materialize it on disk.
The Planner subclass you write should be defined in a separate script file, which should end with code that creates a single instance and calls its main method with sys.argv, like this.
if __name__ == "__main__": MyPlanner().main(sys.argv)
An instance of a Planner class knows, via inheritance, how to create a "storage plan" for any view. For instance, if the script above is defined in a file ugp.py, you would use this command to produce a storage plan for the view foo.
python ugp.py --plan foo
The storage plan is a shell script, so to execute it you'd use one of these:
python ugp.py --plan foo | sh python ugp.py --store foo
When the view foo is saved, it is saved in a file named foo.gp. So to see some of the output you'd use
python ugp.py --plan foo | sh ; head foo.gp
Or as a shortcut you can say
python ugp.py --cat foo | head
Views are saved on disk, which leads to some constraints on what they can be. A row must be something that can be stored and retrieved by the RowSerializer, which uses 'repr' and 'str', and it cannot contain newlines in its serialized form.
You can specify certain options to the system using the command-line option --opts
, whose
value is a comma-separated list of colon-separated option,value pairs. If you invoke the planner main
without any command-line arguments, it will list these options for you. The current options are:
- echo [0]: if non-zero, echo shell commands.
- viewdir [gpig_views]: directory where stored views will be placed.
- parallel[5]: number of parallel tasks in Hadoop.
- streamJar[environment var $GP_STREAMJAR or /usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.2.0.1.3.0.0-107.jar]: location of Hadoop streaming Jar file.
- target[shell]: the other valid option is
hadoop
You can specify user parameters using the command-line option --params
which has values in the
same format. You can access these using the function GPig.getArgvParams()
, which returns a Python
dict
mapping strings to strings.
Known Bugs and Gotchas
- Storing null python values (i.e., "
None
") or rows containing them is unpredictable, in particular when they are used as keys for joining or grouping.
View Types
In almost all of the views below, the first argument is a view, which is omitted when on the right-hand-side of a pipe operator.
Augment
Augment: loads one or more views from disk, and produces an object that encapsulates the information in those views. This object is then paired with the rows of the primary view.
a = Augment(x, sideviews=[y,z], loadedBy=lambda v1,v2:...) a = Augment(x, sideview=y, loadedBy=lambda v:...)
The loadedBy
argument is passed the sideview objects, and is called before the view is executed.
It returns some Python object s
. The result of the view is a stream of pairs (r,s)
where
r
is from the view that is the first argument of Augment
(x in the example).
Distinct
Distinct: removes duplicates from a view.
d = Distinct(x)
Filter
Filter...by: like ReplaceEach, but extracts a subset of the rows of a
relation, instead of transforming them. The following example retains only the
rows r
where the first element is a string that ends with ".txt".
filtered = Filter(t, by=lambda(row):row[0].endswith(".txt") )
Flatten aka FlatMap
Flatten...by: Like ReplaceEach, but allows you to use a Python function that yields multiple values. This is pretty much the same as a map-reduce Mapper.
def idwordGen(row): for w in row['words']: yield (row['id'],w) x = Flatten(y, by=idwordGen(row))
FlatMap is a synonym for Flatten.
Format
Format...by: a special form of ReplaceEach where the 'by' function produces a string, and this string is NOT serialized normally when stored, but just printed. You can use this to get the final output.
f = Format(z, by=lambda(a,b):'a is %s, b is %s' % (a,b))
Group
Group...by: produces a set of tuples (key, [row1,...,rowN]) where the first element is the 'key' extracted using the 'by' function, and [row1,...,rowN] is a list of rows from t that share this key.
u = Group(t, by=lambda row:row[1], ... )
Group...by...retaining: produces a set of tuples (key, [row1',...,rowN']) where
the first element is the 'key' extracted using the 'by' function,
and [row1',...,rowN'] is a list of rows from t that share this key,
after being transformed by the retaining
function.
u = Group(t, by=lambda row:row[1], retaining=lambda row:row[2] )
Group...by..reducingTo: reducingTo's value is is a GuineaPig 'ReduceTo'
object. A ReduceTo
is created with two arguments:
- a
basetype
that provides the first value of the accumulator, when called withiut arguments (e.g., the basetype might beint
, asint()
returns a 0. - a
"by"
function which takes two arguments, an accumulator and the next value from the list, and produces the next value of the accumulator.
There are also some standard cases: ReduceToSum()
, ReduceToCount()
, and ReduceToList()
.
The output is the result of reducing the list for rows using that object - so here the result would be words plus document-frequencies, rather than words plus inverted indices. This is much like a map-reduce reducer.
u = Group(t, by=lambda row:row[1], reducingTo=ReduceToCount())
Group...by..retaining..reducingTo: these options can all be combined, in which case the "retaining" function is applied before the "reducingTo" process.
Join
Join: joins are multiway, and the argument is a list of "join
input" objects, denoted Jin. A join input is a view v plus a
field-extracting function f. The result of a join is a tuple of
the form (row1,...,rowk)
where rowi is a row in the i-th input of
the join.
j = Join(Jin(view1,by=getId), Jin(view2,by=getId))
For two-way joins only, you can specify outer joins (left outer, right outer, and full outer) by using the "outer" parameter of the Jin
constructor. The tuple for a left outer join for an unmatched row row1
will have the form
(row1,None)
.
j = Join(Jin(view1,by=getId), Jin(view2,by=getId,outer=True)) #right outer join j = Join(Jin(view1,by=getId,outer=True), Jin(view2,by=getId)) #left outer join j = Join(Jin(view1,by=getId,outer=True), Jin(view2,by=getId,outer=True)) #full outer join
JoinTo...by: like Join, but designed to be used in a pipeline. It has a single join-input as its first argument, and uses a "by" argument to specify the field of the primary (pipeline) input to be joined in.
j = Filter(t, by=lambda(row):getId(row)>0 ) | JoinTo( Jin(view2,by=getId), by=getId)
ReadCSV
ReadCSV: The rows of this view are just lists, each one of which is produced by parsing a line from the file as a comma-separated value string. Any keyword arguments will be passed in to the initializer of a csv.reader object that's used to convert the file.
r = ReadCSV('mytest.csv',delimiter='|')
ReadLines
ReadLines: The rows of this view are just strings from the file, in the first case, and python objects produced by running 'eval' on the strings in the second.
r = ReadLines('mytest.txt')
ReplaceEach aka Map
ReplaceEach...by: Replaces each row with the result of applying some
function to that row (i.e., similar to a map
.) In this example parseLine is a function, and each row of
this view is produced by calling parseLine(x)
for some row x
of view r
.
parsed = ReplaceEach(r, by=parseLine)
Map is a synonym for ReplaceEach.
Union
Union: Combines the rows in one or more views, removing any duplicates. Since duplicates are removed, Union(x) has the same effect as Distinct(x).
all = Union(v, w, x)
UnionTo is a variant of Union that can be used on the right hand side of a pipe.
all = v | UnionTo(w, x)
Details
You can attach options to a view like this
parsed = ReplaceEach(r, by=parseLine).opts(stored=True)
Ie, the method "v.opts()" returns a copy of v with some options set.
Eager storing: Right now the only option is stored=X.. Storing works like this: if a view v is marked as 'stored=True', then the first time a plan needs it, it will be stored explicitly. If the same plan needs v again, the values will be read from the stored file. By default a view is stored if it is used more than once. (Also, a view may be marked for eager storage to suppress certain classes of optimizations, as well: in particular, the inner view of an Augment view will be marked for eager storage, and also views that are part of a chain of map-reduce operations, like consecutive joins.) You can discourage eager storing for a view with opts(stored=False) as well as force it with opts(stored=True).
Stored views aren't deleted after the plan completes, so you can also inspect them for debugging purposes if you like. They are also NOT reused from planning episode to planning episode, unless you ask them to be with the --reuse option, below.
Re-use: If you add the option --reuse to a plan, like this
python ugp.py --plan foo --reuse v1.gp v2.gp
then instead of building a new plan for storing v1 and v2, GuineaPig will reuse any existing files (which should be previously stored views). So, in local mode, if you always do
python ugp.py --plan foo --reuse gpig_views/*.gp
then GuineaPig will have a sort of incremental, make-like behavior: it will reuse everything it can.
Aside: if you try to reuse
a view that doesn't
exist, you get an error only when the plan executes, and can't
find a file or HDFS input that it expects. GuineaPig will not
check that the views you plan to --reuse
actually
exist (because on Hadoop, doing those checks are somewhat expensive.)
Aside: you can't use globbing (i.e., "*.gp") to get a list of completed views if you're working with Hadoop, but you can achieve a similar effect with a little more work, for instance, like this:
python ugp.py --plan foo ... --reuse `hadoop fs -ls /user/wcohen/gpig_views | cut -d/ -f5"`
But be careful, gpig_views/*.gp
will always give a list of
the locally-stored views, even if you're using target:hadoop
.
Under the Hood
Guinea pig tries to optimize its plan by stitching together view definitions when possible. Generally, operations like Flatten and ReplaceEach, which just transform a sequence of rows, will be combined together, but operations which require a sorting step like Join and Group will not.
When you use Hadoop - and even if you don't! - the context in which your "by" functions work is pretty limited. In particular, they are executed by many different unknown processes, in general. So if you try anything fancy in terms of communication (e.g., using a ReplaceEach view to collect information in a process-local variable X and then using X in some subsequent view) it probably won't work.
In terms of implementation, every view has an associated checkpoint, which is the latest file that is need to create that view, and a rowGenerator, which is an iterator that produces the rows, one by one, assuming that's it run in a context where the checkpoint is available on stdin.
The checkpoint plan for a view produces the checkpoint. The storage plan stores the view.
Abstract view types are are Reader views, Transformation views (like ReplaceEach), or MapReduce views. The checkpoint for a reader is the file it reads. The checkpoint for a Transformation is the checkpoint for the view its transforming. The checkpoint for a MapReduce is the reducer input.