Difference between revisions of "Guinea Pig"
(6 intermediate revisions by the same user not shown) | |||
Line 284: | Line 284: | ||
For the GHC cluster the right location for GP_STREAMJAR is <code>/usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.1.jar</code>. | For the GHC cluster the right location for GP_STREAMJAR is <code>/usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.1.jar</code>. | ||
+ | |||
+ | For the opencloud/stoat machines the location is <code>/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar</code> | ||
== Debugging and Working with Hadoop == | == Debugging and Working with Hadoop == | ||
Line 301: | Line 303: | ||
% python longer-wordcount.py --cat wordCount | head | % python longer-wordcount.py --cat wordCount | head | ||
</pre> | </pre> | ||
+ | (Note: by default <code>head</code> will echo the first 10 lines of <code>stdin</code>, and by default Python will throw an error when it tries to write the 11th line to the <code>head</code> process. Depending on the complexity of the view this error might be thrown while GuineaPig is echoing the commands it's executing, or when it's printing lines from the view.) | ||
If you'd like to speed up the later steps by re-using the results of previous steps, you can do that with | If you'd like to speed up the later steps by re-using the results of previous steps, you can do that with | ||
Line 364: | Line 367: | ||
The file <code>smallvoc-tfidf.py</code> contains some examples of Hadoop options, in this case, used to turn on and off compression of temporary output. | The file <code>smallvoc-tfidf.py</code> contains some examples of Hadoop options, in this case, used to turn on and off compression of temporary output. | ||
+ | |||
+ | === Multi-file Guinea Pig Programs === | ||
+ | |||
+ | To run your program on Hadoop worker machine, each worker needs access to all the code. In the examples above, the programs are all in a single file, and Guinea Pig will take care of distributing this single file. If your top-level planner code needs to use code in another file, you need to tell GuineaPig about that file using the <code>ship()</code> method. | ||
+ | |||
+ | For example, suppose your <code>tokenize</code> method for wordcount is in a separate file that you import. You might do this: | ||
+ | |||
+ | <pre> | ||
+ | from guineapig import * | ||
+ | import mytokenizer | ||
+ | import sys | ||
+ | |||
+ | class WordCount(Planner): | ||
+ | |||
+ | wc = ReadLines('wc-in/brown') \ | ||
+ | | Flatten(by=mytokenizer.tokens) \ | ||
+ | | Group(by=lambda x:x, reducingTo=ReduceToCount()) | ||
+ | |||
+ | if __name__ == "__main__": | ||
+ | planner = WordCount() | ||
+ | planner.ship('mytokenizer.py') | ||
+ | planner.main(sys.argv) | ||
+ | </pre> | ||
+ | |||
+ | I.e., after you create the planner, but before you invoke the <code>main</code> method, you specify the name of the file you want to ship. Guinea Pig will locate that file on your PYTHONPATH and on the worker machines, that file will be placed in the working directory of the worker process. You can use the same trick to distribute data files. | ||
=== Using Guinea Pig on EC2 === | === Using Guinea Pig on EC2 === | ||
Line 906: | Line 934: | ||
* There are several additional <code>ReadLines</code>-like views which can be used to read, for instance, blank-line-delimited blocks of text and CSV files. | * There are several additional <code>ReadLines</code>-like views which can be used to read, for instance, blank-line-delimited blocks of text and CSV files. | ||
* There are some views which can be used to log intermediate results -- i.e., print them to stderr as the computation happens. | * There are some views which can be used to log intermediate results -- i.e., print them to stderr as the computation happens. | ||
− | * There is a compiler for an alternative experiment backend for Guinea Pig called [[MRS Guinea Pig]]. | + | * There is a compiler for an alternative experiment backend for Guinea Pig called [[MRS Guinea Pig]] - which is discussed below. |
+ | |||
+ | === In-memory map-reduce with MRS Guinea Pig === | ||
MRS Guinea Pig is a sort of locally-running version of Hadoop streaming that uses process-level parallelism. It can either keep the shards on your local file system---which is only useful if you have a ''compute-bound'' process---or in memory, if you run it in server mode. | MRS Guinea Pig is a sort of locally-running version of Hadoop streaming that uses process-level parallelism. It can either keep the shards on your local file system---which is only useful if you have a ''compute-bound'' process---or in memory, if you run it in server mode. | ||
Line 940: | Line 970: | ||
<pre> | <pre> | ||
% python mrs-wordcount.py --store wc --opts target:mrs | % python mrs-wordcount.py --store wc --opts target:mrs | ||
− | </ | + | </pre> |
To use MRS in server mode, you need to | To use MRS in server mode, you need to | ||
Line 1,251: | Line 1,281: | ||
a context where the checkpoint is available on stdin. | a context where the checkpoint is available on stdin. | ||
− | The ''checkpoint plan'' for a view produces the checkpoint. The ''storage | + | The ''checkpoint plan'' for a view produces the checkpoint. The ''storage plan'' stores the view. |
− | plan'' stores the view. | ||
Abstract view types are are Reader views, Transformation views (like | Abstract view types are are Reader views, Transformation views (like | ||
ReplaceEach), or MapReduce views. The checkpoint for a reader is the file it | ReplaceEach), or MapReduce views. The checkpoint for a reader is the file it | ||
reads. The checkpoint for a Transformation is the checkpoint for the | reads. The checkpoint for a Transformation is the checkpoint for the | ||
− | view | + | view it is transforming. The checkpoint for a MapReduce is the reducer |
input. | input. |
Latest revision as of 12:01, 14 September 2017
Contents
- 1 Quick Start
- 2 Debugging and Working with Hadoop
- 3 Advanced GuineaPig
- 3.1 A More Complicated Example With Joins
- 3.2 A More Complex Example Using Group
- 3.3 Using Combiners for Group
- 3.4 Exploiting Local State with ReplaceEachPartition
- 3.5 Parameters and Programmatically Accessing Guinea Pig
- 3.6 Views that Include Custom Data Structures
- 3.7 Side Views in GuineaPig
- 3.8 Reusing previously computed views
- 3.9 Other Goodies Included in the Source
- 3.10 In-memory map-reduce with MRS Guinea Pig
- 4 Review and Discussion of Guinea Pig vs Pig
- 5 Reference Guide
Quick Start
What you should know before you start
Guinea Pig is similar to Pig, but intended to be easier to learn and debug. It's probably a good idea to be somewhat familiar with Pig before you start this tutorial.
I also assume a reading knowledge of Python and core Unix commands. If you're
going to use Hadoop, you should also be somewhat familiar with (at least) the Hadoop file system and the hadoop fs
subcommands.
Aside: versions
The most default, and most, recent version of GuineaPig is imported when you use import guineapig
. Most of the changes between 1.1 and 1.3 are not API changes, so they should not affect the tutorial. If you want to use a particular version of Guinea Pig, then import it explicitly.
Installation and Running wordcount.py
Download the tutorial tarball and unpack it. As an alternative: you can also get the code by cloning the git repository https://github.com/TeamCohen/GuineaPig.git and run it by simply putting the directory that contains the guineapig.py file on your PYTHONPATH.
The tutorial contains a contains a few small corpora and the file wordcount.py
which has this code:
from guineapig import * import sys # supporting routines can go here def tokens(line): for tok in line.split(): yield tok.lower() #always subclass Planner class WordCount(Planner): wc = ReadLines('corpus.txt') | Flatten(by=tokens) | Group(by=lambda x:x, reducingTo=ReduceToCount()) # always end like this if __name__ == "__main__": WordCount().main(sys.argv)
Then type the command:
% python wordcount.py --store wc
After a couple of seconds it will return, and you can see the wordcounts with
% head gpig_views/wc.gp
Understanding the wordcount example
A longer example
In the example above, wc
is a GuineaPig view. I use the term "view" for both (1) a procedural
definition of how to process data, using a particular set of operators and (2) data produced by a view. The rest of this tutorial will help you understand
views and how to write them. To start, the tutorial directory also has a less concise but easier-to-explain wordcount file,
longer-wordcount.py
, with this view definition:
class WordCount(Planner): lines = ReadLines('corpus.txt') words = Flatten(lines,by=tokens) wordCount = Group(words, by=lambda x:x, reducingTo=ReduceToCount())
You see this is actually defined in three phases:
- read the lines from a file,
- convert the stream of lines into a stream of tokens, and
- one to count the tokens.
I'll use this example to explain some of the key ideas in GuineaPig.
Objects instead of records, and functions instead of fields
GuineaPig has no notion of records or fields. Instead, views contain sets of Python objects. Conceptually the sets are unordered and can only be streamed through sequentially. These objects can be anything: e.g., the output of ReadLines is a stream of Python strings, one per line. (More usually the objects are tuples.) I will use the term row for an object in a view.
To access part of an row, as when you 'Flatten' or 'Group' by a field, you use a function that takes the object and
returns the appropriate subpart. Many of the view constructors in Guinea Pig include a keyword by=....
which returns part of a row. This part can be a pre-computed property of a row (like a field in a database setting), or any arbitrary function that it applied to the row. In the 'Flatten' view in this example, the 'by' keyword specifies the function tokens
,
which is a Python function that iterates over the tokens. The 'by' function for a 'Flatten' view should convert a single row into an iterator of objects, which are then spliced together, producing - in this case - a stream of tokens.
Python also lets you construct an unnamed
function on-the-fly with the lambda
key word. The 'by'
argument in the 'Group' view here is just the identity function, so here we're grouping by an
entire row (which at this point, is just a token).
The Group
view is quite flexible. In the absence of a "reducingTo" keyword,
the result of grouping is a tuple (key,[row1,...,rowN]) where the rowi's are the rows that have the
indicated "key" (as extracted by the "by" clause). So, if we had used the line
wordCount = Group(words, by=lambda x:x)
then the resulting rows would be tuples like ("aardvark", ["aardvark","aardvark",...]) where the list is the list of all occurrences in the corpus of the token. Instead here we used a "reduceTo" argument, which is an optimization. The value for a "reduceTo" keyword should be an instance of the ReduceTo class. As an example, the ReduceToCount() class is such a subclass, defined as
ReduceTo(int,by=lambda accum,val:accum+1)
More generally when you defined a "ReduceTo" object, the first argument is the type of the output (and defines the initial value of the accumulator, which here will be the output of int()
, or zero) and the second is the function that is used to reduce values pairwise.
Once again: note the use of functions as parameters in Group
and Flatten
. Guinea Pig has no notion of records: rows can be any python object (although they are often tuples). So if you want to do the equivalent of extracting a field from a record, you should use a function. Python's lambda functions and argument unpacking features make this fairly convenient, as the examples below show.
Using the pipeline
To use this pipeline: if you type
% python longer-wordcount.py
you'll get a brief usage message:
Guinea Pig 1.3.2 (c) William Cohen 2014,2015 usage: python longer-wordcount.py --(store|pprint|tasks|plan|cat) VIEW [OPTIONS] [PARAMS] --reuse VIEW1 VIEW2 ... python longer-wordcount.py --list Subcommands that take a VIEW as argument: --tasks VIEW: print the abstract map-reduce tasks needed to materialize the VIEW --pprint VIEW: print the data structure associated with the VIEW --plan VIEW: print the commands that invoke each abstract map-reduce task --store VIEW: materialize the named VIEW and store it in the view directory --cat VIEW: store the VIEW and then print each line to stdout The --list subcommand lists possible VIEWs defined by this program. OPTIONS are specified as "--opts key:value,...", where legal keys for "opts", with default values, are: echo:0 viewdir:gpig_views parallel:5 streamJar:/usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.2.0.1.3.0.0-107.jar target:shell Values in the "opts" key/value pairs are assumed to be URL-escaped. PARAMS are specified as "--params key:value,..." and the associated dictionary is accessible to user programs via the function GPig.getArgvParams(). There's more help at http://curtis.ml.cmu.edu/w/courses/index.php/Guinea_Pig
Typing
% python longer-wordcount.py --list
will list the views that are defined in the
file: lines
, words
,
and wordCount
. If you pprint
one of these,
say wordCount
, you can see what it essentially is:
a Python data structure, with several named subparts
(like words
), which pprint
will present as a tree. (The vertical bars in this printout
are just to indicate the indent level, by the way, so don't confuse them with the "pipe" notation used for chaining together views.)
wordCount = Group(words,by=<function <lambda> at 0x10497aa28>,reducingTo=<guineapig.ReduceToCount object at 0x104979190>) | words = Flatten(lines, by=<function tokens at 0x1048965f0>).opts(stored=True) | | lines = ReadLines("corpus.txt")
Returning to the previous wordcount example, the "pipe" notation is just a shortcut for
nested views: words = ReadLines('corpus.txt') | Flatten(by=tokens)
is equivalent to words = Flatten(ReadLines('corpus.txt') , by=tokens)
or to the
two separate definitions for lines
and words
above. The named variables (like words
) can
be used, via devious Pythonic tricks, to access the data structures from the command line. Hence the --store
command
above.
To store a view, GuineaPig will first convert a view structure into a plan for storing the view. To see this plan, you can type something like
% python longer-wordcount.py --tasks wordCount
and the output will tell you the sequence of map-reduce tasks that are needed to produce the view wordCount
.
In this case, there's one task, which is described by a short "explanation" for humans, and by a shell command that implements the task
for computers:
====================================================================== map-reduce task 1: corpus.txt => wordCount - +-------------------- explanation -------------------- - | read corpus.txt with lines - | flatten to words - | group to wordCount - +-------------------- commands -------------------- - | python longer-wordcount.py --view=wordCount --do=doGroupMap < corpus.txt | LC_COLLATE=C sort -k1 | python longer-wordcount.py --view=wordCount --do=doStoreRows > gpig_views/wordCount.gp
If you just want the commands you can use the command:
python longer-wordcount.py --plan wordCount
And in fact, if you typed
python longer-wordcount.py --plan wordCount | sh
this would be equivalent to python longer-wordcount.py --store
wordCount
, modulo some details about how errors are reported.
Looking at the command is actually useful:
python longer-wordcount.py --view=wordCount --do=doGroupMap < corpus.txt \ | LC_COLLATE=C sort -k1 \ | python longer-wordcount.py --view=wordCount --do=doStoreRows \ > gpig_views/wordCount.gp
This is how GuineaPig works: the view definition (a data structure) is converted to a plan (a shell script),
and the shell script is then executed, starting up some new processes while it executes. Going back to the example: The final result of the 'wordCount' plan to store the view wordCount
is to create a file called
wordCount.gp
in a certain directory, the GuineaPig viewdir, which is by default gpig_views
.
These new processes
invoke additional copies of python longer-wordcount.py
with special arguments. If you
look at the plan for the view 'words', for instance, you will see:
python longer-wordcount.py --view=words --do=doStoreRows < corpus.txt > words.gp
which tell Python perform smaller-scale operations associated with individual views, as steps in the overall plan. You can invoke these substeps yourself if you want, and this is sometimes useful for testing: for instance, if you could type:
% python longer-wordcount.py --view=words --do=doStoreRows this is a test: hello world! ^D
To see what the 'words' step actually does on a sample input.
As another example,here's what the output of the doGroupMap operation looks like:
wcohen@shell2:~/pyhack/guineapig/tutorial$ head -1 corpus.txt Quite a difference. Think it has anything to do with all that coke? wcohen@shell2:~/pyhack/guineapig/tutorial$ python longer-wordcount.py --view=wordCount --do=doGroupMap < corpus.txt| head -10 'quite' 'quite' 'a' 'a' 'difference.' 'difference.' 'think' 'think' 'it' 'it' 'has' 'has' 'anything' 'anything' 'to' 'to' 'do' 'do' 'with' 'with' Traceback (most recent call last): File "longer-wordcount.py", line 19, in <module> ... [Python's usual complaint's about writing to a "head" command] ... IOError: [Errno 32] Broken pipe
The steps in a plan do not correspond one-to-one with the steps in the view definition: some optimization happens. We'll talk about this a little more below when we discuss debugging techniques.
Using Hadoop
The motivation for doing all this is because this sort of process can also be distributed across a cluster using Hadoop streaming. On a machine with hadoop installed, you can easily generate an alternative plan that uses Hadoop streaming:
% python longer-wordcount.py --plan wordCount --opts target:hadoop,viewdir:/user/wcohen/gpig_views
This produces a messier-looking plan that will store wordCount
on HDFS using a series of Hadoop streaming jobs.
To do this planning (let alone execution), we'll need to manually do a little setup: in particular, we need
to first create a HDFS location for GuineaPig, and give a complete, non-relative path to that location with the viewdir
option. You also need to tell GuineaPig where to find the Hadoop streaming jar, via the streamJar
option
or the GP_STREAMJAR environment variable.
To summarize, you should be able to do something like
% hadoop fs -mkdir /user/wcohen/gpig_views % export GP_STREAMJAR=/some/place/on/your/hadoop/installation % hadoop fs -copyFromLocal corpus.txt /user/wcohen % python longer-wordcount.py --store wordCount --opts target:hadoop,viewdir:/user/wcohen/gpig_views
to run the wordcount example on Hadoop.
Note: as of version 1.3.2: values of keys given in the --opts command are assumed to be URL-escaped, so it's now
possible to pass in a viewdir location that contains a colon. For example, you can pass in the view directory
hdfs://localhost:9000/users/wcohen/gpig_views
with --opts viewdir:hdfs%3A//localhost%3A9000/users/wcohen/gpig_views
.
For the GHC cluster the right location for GP_STREAMJAR is /usr/local/hadoop/contrib/streaming/hadoop-streaming-1.0.1.jar
.
For the opencloud/stoat machines the location is /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar
Debugging and Working with Hadoop
Debugging Tips
If you're code's not working you can use any Python tools to debug it. You should not print any debug messages to standard output, but you can print them to standard error. This is true for "normal" as well as Hadoop runs.
If you want to step through the various stages of your view definition, it's quite easy to do, by storing the intermediate results and inspecting them.
To make this a little more convenient, there's another command-line option, --cat
, which stores a view and then prints it.
The cat command only works in local (non-Hadoop mode).
So, to step through the wordcount program view-by-view you could type:
% python longer-wordcount.py --cat lines | head % python longer-wordcount.py --cat words | head % python longer-wordcount.py --cat wordCount | head
(Note: by default head
will echo the first 10 lines of stdin
, and by default Python will throw an error when it tries to write the 11th line to the head
process. Depending on the complexity of the view this error might be thrown while GuineaPig is echoing the commands it's executing, or when it's printing lines from the view.)
If you'd like to speed up the later steps by re-using the results of previous steps, you can do that with
% python longer-wordcount.py --cat lines | head % python longer-wordcount.py --cat words --reuse lines | head % python longer-wordcount.py --cat wordCount --reuse lines words | head
This will re-use previously-computed views, if they are present in the viewdir
directory and marked as complete.
You can also use any filename with the right basename instead of the view name as the argument to --reuse
, so
it usually works to say something like:
% python longer-wordcount.py --cat wordCount --reuse gpig_views/*.gp | head
As noted above, if you want to get even further into the details of what's happening, you can generate the plan and start running lines from it (or parts of them) one-by-one.
Debugging Hadoop
When you run on Hadoop, the logs contain a web page with job status (the Tracking URL). If your job fails, the Python error stack is not very interesting - it does say which Hadoop command failed, but not where in the Python program it failed. To see that, you need to go to the Tracking URL and navigate through to the error logs for a failing job.
hadoop -input ... -output ... -mapper ... ... 14/08/07 03:49:24 INFO streaming.StreamJob: Running job: job_201407141501_0266 14/08/07 03:49:24 INFO streaming.StreamJob: To kill this job, run: 14/08/07 03:49:24 INFO streaming.StreamJob: /usr/local/hadoop-1.0.1/libexec/../bin/hadoop job -Dmapred.job.tracker=ghc81.ghc.andrew.cmu.edu:9001 -kill job_201407141501_0266 14/08/07 03:49:24 INFO streaming.StreamJob: Tracking URL: http://ghc81.ghc.andrew.cmu.edu:50030/jobdetails.jsp?jobid=job_201407141501_0266 14/08/07 03:49:25 INFO streaming.StreamJob: map 0% reduce 0% 14/08/07 03:50:15 INFO streaming.StreamJob: map 100% reduce 100% 14/08/07 03:50:15 INFO streaming.StreamJob: To kill this job, run: 14/08/07 03:50:15 INFO streaming.StreamJob: /usr/local/hadoop-1.0.1/libexec/../bin/hadoop job -Dmapred.job.tracker=ghc81.ghc.andrew.cmu.edu:9001 -kill job_201407141501_0266 14/08/07 03:50:15 INFO streaming.StreamJob: Tracking URL: http://ghc81.ghc.andrew.cmu.edu:50030/jobdetails.jsp?jobid=job_201407141501_0266 14/08/07 03:50:15 ERROR streaming.StreamJob: Job not successful. ... ... Traceback (most recent call last): File "gpig/extra.py", line 115, in <module> wc.main(sys.argv) File "/afs/andrew.cmu.edu/usr3/pompraka/gpig/guineapig.py", line 1106, in main self.runMain(argv) File "/afs/andrew.cmu.edu/usr3/pompraka/gpig/guineapig.py", line 1139, in runMain plan.execute(self, echo=self.opts['echo']) File "/afs/andrew.cmu.edu/usr3/pompraka/gpig/guineapig.py", line 776, in execute subprocess.check_call(shellcom,shell=True) File "/usr/local/lib/python2.7/subprocess.py", line 511, in check_call raise CalledProcessError(retcode, cmd) subprocess.CalledProcessError: Command 'hadoop jar ....' returned non-zero exit status 1
On the Gates cluster, one way to open the Tracking URL is to ssh -X
in to the cluster, and open a browser. (You can't open those URLs with an external browser for security reasons).
If you think you've fixed the problem, there's also a shortcut way of testing it, without running the whole workflow. You can also cut-and-paste the specific Hadoop command that failed (after subprocess.CalledProcessError:
, manually clear the -output
HDFS location, and re-run the failing command. If it succeeds and gives the right output, you can go on to the next bug.
Hadoop Options
GuineaPig will generally generate all the Hadoop options that are needed for your code to run, but you may sometimes want to pass additional options in to Hadoop, for example to improve efficiency. You can do this in several ways.
- You can use the command-line option
--hopts
. This takes as an argument one string, which will be passed in to Hadoop more or less intact whenever it is called. (Actually, it will be split by spaces and the resulting list will be joined by spaces and then passed in.) - You can modify the planner object with the
hopts
method. This takes a list of strings, which will be joined by spaces and passed in to Hadoop. - You can modify a view with the method
opts(hopts=foo)
. The argumentfoo
is again a list of strings, and these arguments will be passed in to Hadoop for the invocations that are associated specifically with the generation of this view. The view-specific options will be placed after the planner-level options, so they will override planner-level options.
Some Hadoop versions require special placement for "definition" options, which are those that are of the form -D param=value
. If you need to do use these options, you can use the hdefs
instead of hopts
in any of the ways listed above.
The file smallvoc-tfidf.py
contains some examples of Hadoop options, in this case, used to turn on and off compression of temporary output.
Multi-file Guinea Pig Programs
To run your program on Hadoop worker machine, each worker needs access to all the code. In the examples above, the programs are all in a single file, and Guinea Pig will take care of distributing this single file. If your top-level planner code needs to use code in another file, you need to tell GuineaPig about that file using the ship()
method.
For example, suppose your tokenize
method for wordcount is in a separate file that you import. You might do this:
from guineapig import * import mytokenizer import sys class WordCount(Planner): wc = ReadLines('wc-in/brown') \ | Flatten(by=mytokenizer.tokens) \ | Group(by=lambda x:x, reducingTo=ReduceToCount()) if __name__ == "__main__": planner = WordCount() planner.ship('mytokenizer.py') planner.main(sys.argv)
I.e., after you create the planner, but before you invoke the main
method, you specify the name of the file you want to ship. Guinea Pig will locate that file on your PYTHONPATH and on the worker machines, that file will be placed in the working directory of the worker process. You can use the same trick to distribute data files.
Using Guinea Pig on EC2
Setting Up EC2
I'm old-school, so I like the command line interface (CLI), and will use EC2-Classic. Follow these instructions to set up the CLI tools on your machine. Currently this includes
- downloading the right CLI code package;
- having a local installation of Java;
- setting the environment variable EC2_HOME appropriately;
- putting $EC2_HOME/bin on your path;
- creating a AWS access key, which has a public and private part, by
- allocating an access key, if you haven't done that yet, and
- defining the environment variables AWS_ACCESS_KEY and AWS_SECRET_KEY
I define the secret key by sourcing a bash file that contains a line export AWS_SECRET_KEY=....
, and of course,
keep that file readable only to me, by setting the permissions with chmod 600 ...
. Otherwise, it's not secret.
If you've done all that right, then the command
% ec2-describe-regions
should work properly. Next you need to prepare to we will follow the instructions to launch an instance by:
- using the
ec2-create-keypair
command to set up a keypair, which is stored in another secret (protected) file - mine is namedgpig-key-pair.pem
- creating a security group, e.g., with
ec2-create-group gpig-security-group -d "Gpig security group"
Now, authorize access from the machine you're working on:
ec2-authorize gpig-security-group -p 22 -s 128.2.211.59/32
where 128.2.211.59 is replaced by your IP address.
Creating a Cluster
After EMR is set up you need to create a cluster. (You only need to set up EMR once).
The create-cluster command is very customizable but one that works would be
% aws emr create-cluster --ami-version 3.8.0 --ec2-attributes KeyName=MyKeyPair \ --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge InstanceGroupType=CORE,InstanceCount=2,InstanceType=m3.xlarge \
The instance-groups stuff defines the cluster you want - this one is tiny, with three nodes. The KeyName, which should have the name of the keypair you created above, is how the new cluster will know whether or not to let you in. This will output something like:
{ "ClusterId": "j-JEX5UT60ELD5" }
which is the name of the cluster. It will take some time (10min?) to start up, and then you can log into the master using your keypair:
% aws emr ssh --cluster-id j-JEX5UT60ELD5 --key-pair-file MyKeyPair.pem
You can use your cluster just as you would any other Hadoop cluster, but when you are all done - TERMINATE IT. The meter keeps running until you do!
Advanced GuineaPig
A More Complicated Example With Joins
Here's a more complex problem: finding words that are much more frequent in one corpus than another. We have an ad hoc scoring function, and we will run two copies of the wordcount pipeline and compare the results with a Join
.
from guineapig import * import sys import math def tokens(line): for tok in line.split(): yield tok.lower() def score(n1,n2): return math.log((n1 + 1.0)/(n1 + n2 + 2.0)) #always subclass Planner class WordCmp(Planner): def wcPipe(fileName): return ReadLines(fileName) | Flatten(by=tokens) | Group(by=lambda x:x, reducingTo=ReduceToCount()) wc1 = wcPipe('bluecorpus.txt') wc2 = wcPipe('redcorpus.txt') cmp = Join( Jin(wc1, by=lambda(word,n):word), Jin(wc2, by=lambda(word,n):word) ) \ | ReplaceEach(by=lambda((word1,n1),(word2,n2)):(word1, score(n1,n2))) result = Format(cmp, by=lambda(word,blueScore):'%6.4f %s' % (blueScore,word)) # always end like this if __name__ == "__main__": WordCmp().main(sys.argv)
There are several new things here, so let's discuss them one-by-one.
First, since the wordcount pipeline is just part of a data structure, we can write a "macro" by simply using a Python function which returns the data structure.
The wcPipe
uses this trick to save some typing in building two copies of the original wordcount pipeline.
Second, we're using unpacking to access rows which are tuples. This is a common pattern in Python, often used, e.g., to return multiple values from a function, as in
def divide(x, y): quotient = x/y remainder = x % y return (quotient, remainder) (q, r) = divide(22, 7)
In particular, the output of the wordcount pipelines are tuples: (word,count)
. A lambda function for these rows has the form shown: lambda (word,n):word
returns the first element of the tuple. Alternatively, I could have written lambda x:x[0].
An important point: a lambda function does NOT have parenthesis around it's arguments, so lambda (a,b):...
is quite different from
lambda a,b:...
. The former takes ONE argument, which must be a tuple or list with exactly two elements, and binds a
to the first
and b
to the second. The second takes TWO arguments. If you used a defined function, you use any of these to get the same effect:
def getWord((word,n)): return word def getWord(row): return row[0]
Third, we're using a Join
view. The input of a join is defined by a substructure Jin
which specifies a view and a "by" function. Joining k views will produce a k-tuple, with one component for each row of the input views, so we follow the join with a ReplaceEach that cleans up the structure and computes the final score. Again we're using argument unpacking to access the newly-created element, which is now a tuple-of-tuples of the form ((word1,n1),(word2,n2))
where, of course, word1 and word2 are the same. Python's argument unpacking handles this nicely.
(Note that argument unpacking is not quite unification, if you know what that is: e.g., you could not use ((word,n),(word,n))
here.)
Fourth, Format
is almost exactly like a ReplaceEach
: the only difference is that when the view is stored, instead of printing the repr
of each row, the str
for each row is printed. This is used for the final output of a pipeline.
A More Complex Example Using Group
There are two important variants of the Group view. One which we discussed above is the "reducingTo" argument. The value of this
should be an instance of a class called ReduceTo
, which will will be applied pairwise to all the elements of the rows in the group.
Specifically, each Reduce
instance has two parts: a baseType
, and a "by" function. The baseType
is usually a standard Python class like int
, float
, or list
. The "by" takes two arguments,
an accumulator and the next value from the list, and returns a new accumulator value. The initial accumulator value is provided by baseType
.
When called on a list a1....an
, what happens is the following.
- The initial value of the accumulator is set as
accum0 = baseType()
. - The "by" function
f
is called to produce the next accumulator:accum1 = by(accum0,a1)
- The "by" function
f
is called to produce the next accumulator:accum2 = by(accum0,a2)
- ... and so on.
You can create a custom reduce-to instance, by providing these arguments. For instance, to aggregate the word counts in wc
by prefix,
you could use:
pc1 = Group(wc, by=lambda (word,count):word[:k], reducingTo=ReduceTo(int, lambda accum,(word,count): accum+count))
Here the "by" function for the ReduceTo
takes a stream of (word,count)
pairs, produced by
grouping on the first k letters of the word, and adds up the counts.
A slightly more efficient alternative is to use the retaining
option to Group. This is a function that's called
before the rows in a group are sent to the ReduceTo
class. It can be used to simplify the rows, so that the
ReduceTo
's "by" function is shorter. For instance, here we can just keep the counts, and discard the word
part of each pair in a Group:
pc2 = Group(wc, by=lambda (word,count):word[:k], retaining=lambda (word,count):count, reducingTo=ReduceTo(int, lambda accum,val:accum+val))
ReduceTo(int, lambda accum,val:accum+val)
is actually included in GuineaPig as ReduceToSum()
, so in the code below,
the three implementations are equivalent.
from guineapig import * import sys def tokens(line): for tok in line.split(): yield tok.lower() k = 2 class PrefCount(Planner): wc = ReadLines('corpus.txt') | Flatten(by=tokens) | Group(by=lambda x:x, reducingTo=ReduceToCount()) pc1 = Group(wc, by=lambda (word,count):word[:k], reducingTo=ReduceTo(int, lambda accum,(word,count): accum+count)) pc2 = Group(wc, by=lambda (word,count):word[:k], retaining=lambda (word,count):count, reducingTo=ReduceTo(int, lambda accum,val:accum+val)) pc3 = Group(wc, by=lambda (word,count):word[:k], retaining=lambda (word,count):count, reducingTo=ReduceToSum()) if __name__ == "__main__": PrefCount().main(sys.argv)
Using Combiners for Group
The Group
view also supports Hadoop combiners, via an additional combiningTo
keyword. For example, another implementation of wordcount could be
class WordCount(Planner): wc = ReadLines('corpus.txt') | Flatten(by=tokens) \ | Group(retaining=lambda x:1, reducingTo=ReduceToSum(), combiningTo=ReduceToSum())
Combiners are basically used in the same way as reducers, but in two phases: first the retained elements in a group are combined, then the combined values are reduced. When you're using Hadoop as a target, combiners are run on the local worker machine, and the combined data is then sent off to a potentially different worker for reducing. So using combiners can lower network traffic and make the program more efficient. Combiners are not used in local mode.
Exploiting Local State with ReplaceEachPartition
In a standard ReplaceEach
, the by
function is stateless - it
performs the same operation on each row. ReplaceEachPartition
is similar, but has instead a by
function which takes as input an iterator over rows, and yields some number of replacements. The intended use of
ReplaceEachPartition
is for mappers where it is necessary to have state. For example, a mapper might
use some expensive-to-set up object (say, an NLP parser) to perform its operation: in this case you don't want to read in
and set up the parser for every sentence.
The exploit local state, you need to construct an iterator that has state. Here's a simple example: a GuineaPig version of cat -n
, which replaces each line x
in an input file with a pair (n,x)
where n
is a line number....at least in local mode.
from guineapig import * import sys def MakeIdAdder(startId): #create a closure that keeps the local state def idAdder(lines): nextId = startId for line in lines: yield (nextId,line) nextId += 1 #return the closure return idAdder class AddIds(Planner): result = ReadLines('corpus.txt') | ReplaceEachPartition(by=MakeIdAdder(1)) if __name__ == "__main__": AddIds().main(sys.argv)
Note: this only works as described in local mode. Specifically, this will only provide line numbers relative to the "partition" -- i.e., the set of rows that a process iterates over. In local model, all rows will be part of the same "partition", but in Hadoop mode, every shard of data will be in a different "partition". In general you won't know how many partitions will exist when your code is run. So it's fine to use local state for efficiency, but you should be careful of relying on it for more.
Parameters and Programmatically Accessing Guinea Pig
In Pig, parameters can be passed in from the command line, and used in
Pig programs, where they are marked as parameters with a dollar sign
prefix. In Guinea Pig, you can pass in a set of parameters with the
option --params key1:val1,kay2:val2,...
. To access them,
you use the special function GPig.getArgvParams()
, which
returns a Python dict
mapping strings to strings. For
instance, this program
from guineapig import * import sys import os def tokens(line): for tok in line.split(): yield tok.lower() class WordCount(Planner): D = GPig.getArgvParams() wc = ReadLines(D['corpus']) | Flatten(by=tokens) | Group(reducingTo=ReduceToCount()) if __name__ == "__main__": WordCount().main(sys.argv)
could be invoked with
python param-wordcount.py --params corpus:bluecorpus.txt --store wc
GPig.getArgvParams()
has one optional parameter: you can specify a list of parameters that are required. For instance, if you instead used GPig.getArgvParams(required=['corpus'])
then if your program is invoked without the corpus
parameter set, it will immediately throw an error, like this one:
bash-3.2$ python param-wordcount.py Traceback (most recent call last): File "param-wordcount.py", line 11, in <module> class WordCount(Planner): File "param-wordcount.py", line 13, in WordCount D = GPig.getArgvParams(required=['corpus']) File "/Users/wcohen/Documents/code/GuineaPig/tutorial/guineapig.py", line 69, in getArgvParams assert p in result, '--params must contain a value for "'+p+"', e.g., --params "+p+":FOO" AssertionError: --params must contain a value for "corpus', e.g., --params corpus:FOO
Without the required
option, the code will throw an error anyway, as written, but not an especially meaningful one.
Of course, sometimes you'll want a little more control over a Guinea Pig script. In Pig, there are APIs for Java and Python which let you can create one (or more) Pig script objects, which can be parameterized and run. This is hard to do with the mechanisms I've described so far: a Guinea Pig script isn't actually an object, but a specially-configured subclass of Planner.
However, you can also create a Guinea Pig script by configuring an
instance of a Planner object. The process is the same,
conceptually, but a little less concise. Here's an instance version of
the WordCount program---note you have to complete the instance by
calling setup()
after all views have been added.
from guineapig import * import sys def wordCountScript(): def tokens(line): for tok in line.split(): yield tok.lower() p = Planner() p.lines = ReadLines('corpus.txt') p.words = Flatten(p.lines,by=tokens) p.wordCount = Group(p.words, by=lambda x:x, reducingTo=ReduceToCount()) p.setup() #always end with this return p # always end like this if __name__ == "__main__": wordCountScript().main(sys.argv)
I also moved the definition of tokens
to a less
globally-accessible place. Planner instances are treated the same as
planner subclasses with these exceptions:
- You need to call
setup
explicitly after configuring an Planner instance (i.e., after defining all the views.) - Planner instances have local parameters, which are accessible via the instance variable
param
(also a Pythondict
). The local params can be set in two ways:- Via the
--params
command line argument. - In the
Planner()
constructor, by adding keyword arguments. In case of conflict, the constructor's values take precedence.
- Via the
To illustrate this a little more, here's an even more complex wordcount. This one supports the command-line usage
% python multi-wordcount.py pref corpus1 corpus2 ...
and it will build a wordcount file for all the words in each corpus
which start with the prefix pref
, saving them in
files named wc-for-corpus1
, ..., etc.
First, we'll write some code to generate an appropriate planner
instance. This one takes two arguments -- the corpus name, and a
prefix -- and returns a configured Planner()
object.
def wc(corpus=None,prefix=None): """Return a planner instance that can word-count a corpus.""" p = Planner(corpus=corpus,prefix=prefix) #self.param is automatically constructed p.r = ReadLines(p.param['corpus']) def tokens(line): for tok in line.split(): yield tok.lower() p.s = Flatten(p.r, by=tokens) if p.param['prefix']: p.f = Filter(p.s, by=lambda w:w.startswith(p.param['prefix'])) else: p.f = p.s p.t = Group(p.f, reducingTo=ReduceToCount()) p.setup() #must call AFTER you finish adding views return p
This should be pretty familiar: notice the the configuration can be
conditional on the parameters (we're using that here to not introduce a
Filter
for a null prefix). Also note we close by calling
setup()
on the new planner.
Now let's show you the main
. When a step within a plan
is invoked, you need to dispatch to the right main procedure, so the
first step is see if this is the case. If not, then you invoke a main
which parses out the command-line arguments pref corpus1 corpus2
...
. This code will iterate over the corpus arguments,
positions 2:N in the sys.argv
list, and create a
planner instance using the function wc()
above:
if __name__ == "__main__": #run subplan step, if necessary if Planner.partOfPlan(sys.argv): wc().main(sys.argv) else: #run any main you want - this one's usage is python mult-wordcount.py prefix corpus1 corpus2 .... prefix = sys.argv[1] for a in sys.argv[2:]: #create the planner instance planner = wc(corpus=a,prefix=prefix) #decide where to store the wordcount data v = planner.getView('t') targetFile = 'wc-for-'+os.path.basename(a) print 'storing this view in',targetFile v.pprint() #print the view v.storagePlan().execute(planner) #store it #move the stored file to its final location os.rename(v.storedFile(), targetFile)
The code gives several examples of things you can do with a view: grab a particular view, print it, and store it by executing the storage plan for it.
A warning: When your script executes a plan, it is executed by repeatedly invoking that script itself as a shell command, with special arguments. If your script's main code doesn't support that, the script won't work. So programmatic access of this sort is limited: for instance, it would be impossible (or at least tricky) to have more than one Planner
object in your script, because the main wouldn't know which one to invoke when Planner.partOfPlan(sys.argv)
is true.
Views that Include Custom Data Structures
The examples so far have used built-in Python types as rows. It's fine to use your own types, but if you do,
GuineaPig needs to know how to save them to disk (serialize them) and deserialize them. By default, it will serialize
objects with Python's repr
function, and deserialize them by running a broken version of eval
- in
fact a version that has been very deliberately broken, for security reasons, to not call any Python functions at all.
If you want to use your own data structures, you need to provide a less broken eval
routine to deserialize the rows.
Specifically, you should do the following (in version 1.3 and up).
if __name__ == "__main__": wc = WordCount().setEvaluator(GPig.SafeEvaluator({'WordStat':WordStat})) wc.main(sys.argv)
where the argument to SafeEvaluator
is an dictionary that maps the class names you'd like to evaluate to the appropriate, locally-defined classes.
(More generally, you could pass any object tat implements an eval
method to setEvaluator
, but you should
do it this way.)
Here's a complete example that uses the Python collections.namedtuple
package to
define a custom data structure, which is used to find the most common words in a corpus.
from guineapig import * import sys import math import collections def tokens(line): for tok in line.split(): yield tok.lower() WordStat = collections.namedtuple('WordStat','word count') class WordCount(Planner): wcTuple = ReadLines('corpus.txt') | Flatten(by=tokens) | Group(by=lambda x:x, reducingTo=ReduceToCount()) wcNamedTuple = ReplaceEach(wcTuple, by=lambda (w,n):WordStat(word=w,count=n)).opts(stored=True) commonWords = Filter(wcNamedTuple, by=lambda ws:ws.count>=200) | Group(by=WordStat.count.fget) # always end like this if __name__ == "__main__": wc = WordCount().setEvaluator(GPig.SafeEvaluator({'WordStat':WordStat})) wc.main(sys.argv)
Side Views in GuineaPig
GuineaPig's view definitions are composed incrementally in a way that is efficient for large views. However, if you want to use a small view, one that can be easily loaded into memory, the view definitions are sometimes a clumsy way of doing this. Sometimes these sorts of additional view inputs to a pipeline are called side views.
GuineaPig contains a view class called Augment
that lets you
- construct a series of side views, using the planner
- bundle them together into whatever data structure you like - let's call this the side information
- add this side information to every row of an existing view - specifically by constructing a new pair
(r,s)
wherer
is the old row ands
is the side information.
Usually you follow this with some other operation, say a ReplaceEach
, which will actually use the side information to do something.
Note that the side views are (usually) loaded only once (or if using Hadoop, once per mapper), when the view data structure is created, and
only one copy of the side information is created - i.e., generally s
will be a pointer to a data structure that is shared across the
different rows. Modifying s
will have unpredictable results, so don't do it.
Here's a simple example.
from guineapig import * import sys import math import logging def tokens(line): for tok in line.split(): yield tok.lower() class WordProb(Planner): wc = ReadLines('corpus.txt') | Flatten(by=tokens) | Group(by=lambda x:x, reducingTo=ReduceToCount()) total = Group(wc, by=lambda x:'ANY',reducingTo=ReduceTo(float,lambda accum,(word,count):accum+count)) | ReplaceEach(by=lambda (dummy,n):n) wcWithTotal = Augment(wc, sideview=total, loadedBy=lambda v:GPig.onlyRowOf(v)) prob = ReplaceEach(wcWithTotal, by=lambda ((word,count),n): (word,count,n,float(count)/n)) # always end like this if __name__ == "__main__": WordProb().main(sys.argv)
The total
view
simply adds up all the wordcount tuples using a Group
.
Before wcWithTotals
is stored, the system will first
make sure that total
is stored, and copy that stored view
into the working directory. Then it will call the function
associated with the wcWithTotals
's "loadedBy" argument. (If you run
this on Hadoop, it will upload the preloaded view's stored
version appropriately to the worker nodes).
In general the usage allows a list of preloaded views: just replace the argument sideview
with
sideviews
. The "loadedBy" function is called with the appropriate number of views as arguments - so if you
had two views, you might say something like
Augment(...., sideviews=[v1,v2], loadedBy=lambda v1,v2:...)
If you want to read in the contents of the views, you're encouraged to use the utility functions GPig.rowsOf(view) or GPig.onlyRowOf(view). If you want to access the file directly, you should open the file view.distributableFile().
How it works: The way side views are implemented is worth discussing, because it's not invisible to you. A plan that includes
an Augment
view will include a step that copies the side view's contents into your working directory (i.e., the directory
that pwd
prints out). In Hadoop, this will involve a hadoop fs -getmerge...
operation. In local mode,
this will just involve a copy. In Hadoop, the side view will then be distributed to worker processes via a -file
option.
Reusing previously computed views
Re-use: If you add the option --reuse to a plan, like this
python ugp.py --plan foo --reuse v1.gp v2.gp
then instead of building a new plan for storing v1 and v2, GuineaPig will reuse any existing files (which should be previously stored views). So, in local mode, if you always do
python ugp.py --plan foo --reuse gpig_views/*.gp
then GuineaPig will have a sort of incremental, make-like behavior: it will reuse everything it can.
Aside: if you try to reuse
a view that doesn't
exist, you get an error only when the plan executes, and can't
find a file or HDFS input that it expects. GuineaPig will not
check that the views you plan to --reuse
actually
exist (because on Hadoop, doing those checks are somewhat expensive.)
Aside: you can't use globbing (i.e., "*.gp") to get a list of completed views if you're working with Hadoop, but you can achieve a similar effect with a little more work, for instance, like this:
python ugp.py --plan foo ... --reuse `hadoop fs -ls /user/wcohen/gpig_views | cut -d/ -f5"`
But be careful, gpig_views/*.gp
will always give a list of
the locally-stored views, even if you're using target:hadoop
.
Other Goodies Included in the Source
If you clone the github repository, you will find some additional files not used in the tutorial. These are mostly not as well tested, but might be of use.
gpextras.py
contains some useful extensions to Guinea Pig. To use them, put gpextras.py
on your PYTHONPATH
and include the line import gpextras
in your Guinea Pig file. Briefly,
- There are several additional
ReadLines
-like views which can be used to read, for instance, blank-line-delimited blocks of text and CSV files. - There are some views which can be used to log intermediate results -- i.e., print them to stderr as the computation happens.
- There is a compiler for an alternative experiment backend for Guinea Pig called MRS Guinea Pig - which is discussed below.
In-memory map-reduce with MRS Guinea Pig
MRS Guinea Pig is a sort of locally-running version of Hadoop streaming that uses process-level parallelism. It can either keep the shards on your local file system---which is only useful if you have a compute-bound process---or in memory, if you run it in server mode.
To try out the MRS you should first read the wiki page for it, then you need to add a couple of lines to your Guinea Pig file: specifically you need to import gpextras.py
, and then call the method planner.registerCompiler('mrs',gpextras.MRSCompiler)
on your Planner instance: for example,
from guineapig import * import sys import gpextras def tokens(line): ... class WordCount(Planner): wc = ReadLines('corpus.txt') | Flatten(by=tokens) | Group(by=lambda x:x, reducingTo=ReduceToCount()) if __name__ == "__main__": p = WordCount() p.registerCompiler('mrs',gpextras.MRSCompiler) p.main(sys.argv)
To use the MRS backend, you need to do two things. First, arrange it so that the shell variable GP_MRS_COMMAND
invokes the mrs_gp.py
, eg
% export GP_MRS_COMMAND="python /path/to/mrs_gp.py"
Second, you invoke your program with the option target:mrs
.
% python mrs-wordcount.py --store wc --opts target:mrs
To use MRS in server mode, you need to
- invoke MRS with the
--task
option, - make sure that a server is running when you run Guinea Pig
- tell Guinea Pig to store views in the server, not on the file system.
To accomplish this, you might do something like this (remember %3A will be decoded to a colon):
% export GP_MRS_COMMAND="python /path/to/mrs_gp.py --task" % python /path/to/mrs_gp.py --serve & % python mrs-wordcount.py --store wc --opts target:mrs,viewdir:gpfs%3A
The outputs will be stored on the server, and you can access them with the --fs
subcommands
for mrs_gp
. Alternatively, you could move them out of the server's memory with the appropriate
mrs_gp
map-reduce commands, specifying local files as the output.
Review and Discussion of Guinea Pig vs Pig
GuineaPig is like Pig, but embedded in Python, not a new language. So if you know Python, there's less to learn. If not, at least you're learning something elegant and stable.
GuineaPig has two backends: shell commands, and Hadoop streaming. Pig only supports Hadoop. This means that you can test programs more quickly in GuineaPig- there's not the overhead of starting up Hadoop jobs.
In Pig (and relational algebra) operators like Join, etc take fields as arguments. Here, there's nothing exactly like a 'field' in relational algebra. Instead of a fields, we just use a function - usually a lambda function - that takes a row object and returns some component of the row, and any object can serve as a "row".
GuineaPig does not have any special-purpose mechanism for macros, as Pig does - but it's easy to get that effect using Python functions which return views.
Reference Guide
Basic Concepts and Command-Line Usage
A guinea pig view - below abbreviated rel, r, s, t, - can be "stored" (materialized) to produce an unordered bag of "rows". A "row" is just an arbitrary Python data structure (with a few constraints on how it can be serialized, see below). Views can be defined by classes like 'ReadLines', 'ReplaceEach', 'Flatten', 'Group', 'Join', etc.
Aside from debugging aids, about only thing that Guinea Pig does is to allow you to 'store' a view, which means to materialize it on disk.
The Planner subclass you write should be defined in a separate script file, which should end with code that creates a single instance and calls its main method with sys.argv, like this.
if __name__ == "__main__": MyPlanner().main(sys.argv)
An instance of a Planner class knows, via inheritance, how to create a "storage plan" for any view. For instance, if the script above is defined in a file ugp.py, you would use this command to produce a storage plan for the view foo.
python ugp.py --plan foo
The storage plan is a shell script, so to execute it you'd use one of these:
python ugp.py --plan foo | sh python ugp.py --store foo
When the view foo is saved, it is saved in a file named foo.gp. So to see some of the output you'd use
python ugp.py --plan foo | sh ; head foo.gp
Or as a shortcut you can say
python ugp.py --cat foo | head
Views are saved on disk, which leads to some constraints on what they can be. A row must be something that can be stored and retrieved by the RowSerializer, which uses 'repr' and 'str', and it cannot contain newlines in its serialized form.
You can specify certain options to the system using the command-line option --opts
, whose
value is a comma-separated list of colon-separated option,value pairs. If you invoke the planner main
without any command-line arguments, it will list these options for you. The current options are:
- echo [0]: if non-zero, echo shell commands.
- viewdir [gpig_views]: directory where stored views will be placed.
- parallel[5]: number of parallel tasks in Hadoop.
- streamJar[environment var $GP_STREAMJAR or /usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.2.0.1.3.0.0-107.jar]: location of Hadoop streaming Jar file.
- target[shell]: the other valid option is
hadoop
You can specify user parameters using the command-line option --params
which has values in the
same format. You can access these using the function GPig.getArgvParams()
, which returns a Python
dict
mapping strings to strings.
Known Bugs and Gotchas
- Storing null python values (i.e., "
None
") or rows containing them is unpredictable, in particular when they are used as keys for joining or grouping.
View Types
In almost all of the views below, the first argument is a view, which is omitted when on the right-hand-side of a pipe operator.
Augment
Augment: loads one or more views from disk, and produces an object that encapsulates the information in those views. This object is then paired with the rows of the primary view.
a = Augment(x, sideviews=[y,z], loadedBy=lambda v1,v2:...) a = Augment(x, sideview=y, loadedBy=lambda v:...)
The loadedBy
argument is passed the sideview objects, and is called before the view is executed.
It returns some Python object s
. The result of the view is a stream of pairs (r,s)
where
r
is from the view that is the first argument of Augment
(x in the example).
Distinct
Distinct: removes duplicates from a view.
d = Distinct(x)
Filter
Filter...by: like ReplaceEach, but extracts a subset of the rows of a
relation, instead of transforming them. The following example retains only the
rows r
where the first element is a string that ends with ".txt".
filtered = Filter(t, by=lambda(row):row[0].endswith(".txt") )
Flatten aka FlatMap
Flatten...by: Like ReplaceEach, but allows you to use a Python function that yields multiple values. This is pretty much the same as a map-reduce Mapper.
def idwordGen(row): for w in row['words']: yield (row['id'],w) x = Flatten(y, by=idwordGen(row))
FlatMap is a synonym for Flatten.
Format
Format...by: a special form of ReplaceEach where the 'by' function produces a string, and this string is NOT serialized normally when stored, but just printed.
f = Format(z, by=lambda(a,b):'a is %s, b is %s' % (a,b))
You can use this to get the final output. Don't try and use Format output as input to another view.
Group
Group...by: produces a set of tuples (key, [row1,...,rowN]) where the first element is the 'key' extracted using the 'by' function, and [row1,...,rowN] is a list of rows from t that share this key.
u = Group(t, by=lambda row:row[1], ... )
Group...by...retaining: produces a set of tuples (key, [row1',...,rowN']) where
the first element is the 'key' extracted using the 'by' function,
and [row1',...,rowN'] is a list of rows from t that share this key,
after being transformed by the retaining
function.
u = Group(t, by=lambda row:row[1], retaining=lambda row:row[2] )
Group...by..reducingTo: reducingTo's value is is a GuineaPig 'ReduceTo'
object. A ReduceTo
is created with two arguments:
- a
basetype
that provides the first value of the accumulator, when called withiut arguments (e.g., the basetype might beint
, asint()
returns a 0. - a
"by"
function which takes two arguments, an accumulator and the next value from the list, and produces the next value of the accumulator.
There are also some standard cases: ReduceToSum()
, ReduceToCount()
, and ReduceToList()
.
The output is the result of reducing the list for rows using that object - so here the result would be words plus document-frequencies, rather than words plus inverted indices. This is much like a map-reduce reducer.
u = Group(t, by=lambda row:row[1], reducingTo=ReduceToCount())
Group...by..retaining..reducingTo: these options can all be combined, in which case the "retaining" function is applied before the "reducingTo" process.
Group...by..combiningTo: the value is is another GuineaPig 'ReduceTo' object, which is used as a combiner in Hadoop mode.
Join
Join: joins are multiway, and the argument is a list of "join
input" objects, denoted Jin. A join input is a view v plus a
field-extracting function f. The result of a join is a tuple of
the form (row1,...,rowk)
where rowi is a row in the i-th input of
the join.
j = Join(Jin(view1,by=getId), Jin(view2,by=getId))
For two-way joins only, you can specify outer joins (left outer, right outer, and full outer) by using the "outer" parameter of the Jin
constructor. The tuple for a left outer join for an unmatched row row1
will have the form
(row1,None)
.
j = Join(Jin(view1,by=getId), Jin(view2,by=getId,outer=True)) #right outer join j = Join(Jin(view1,by=getId,outer=True), Jin(view2,by=getId)) #left outer join j = Join(Jin(view1,by=getId,outer=True), Jin(view2,by=getId,outer=True)) #full outer join
JoinTo...by: like Join, but designed to be used in a pipeline. It has a single join-input as its first argument, and uses a "by" argument to specify the field of the primary (pipeline) input to be joined in.
j = Filter(t, by=lambda(row):getId(row)>0 ) | JoinTo( Jin(view2,by=getId), by=getId)
ReadCSV
ReadCSV: The rows of this view are just lists, each one of which is produced by parsing a line from the file as a comma-separated value string. Any keyword arguments will be passed in to the initializer of a csv.reader object that's used to convert the file.
r = ReadCSV('mytest.csv',delimiter='|')
ReadLines
ReadLines: The rows of this view are just strings from the file, in the first case, and python objects produced by running 'eval' on the strings in the second.
r = ReadLines('mytest.txt')
ReplaceEach aka Map
ReplaceEach...by: Replaces each row with the result of applying some
function to that row (i.e., similar to a map
.) In this example parseLine is a function, and each row of
this view is produced by calling parseLine(x)
for some row x
of view r
.
parsed = ReplaceEach(r, by=parseLine)
Map is a synonym for ReplaceEach.
ReplaceEachPartition aka MapPartitions
ReplaceEachPartition...by: Replaces each "partition" of rows with the result of applying some function to that partition. The input and output of the function are iterables, which iterate over rows and transformed rows respectively. Usually this is function is a closure, which maintains some internal state.
def parseLines(): def statefulMapper(lines): parser = loadParser() for line in lines: yield parser.parse(line) return statefulMapper
parsed = ReplaceEachPartition(r, by=parseLines())
MapPartitions is a synonym for ReplaceEachPartition.
Union
Union: Combines the rows in one or more views, removing any duplicates. Since duplicates are removed, Union(x) has the same effect as Distinct(x).
all = Union(v, w, x)
UnionTo is a variant of Union that can be used on the right hand side of a pipe.
all = v | UnionTo(w, x)
View Options
You can attach options to a view like this
parsed = ReplaceEach(r, by=parseLine).opts(stored=True)
Ie, the method "v.opts()" returns a copy of v with some options set. There are only a few options.
Eager storing: Eager storing is turned on with the option stored=True.. Storing works like this: if a view v is marked as 'stored=True', then the first time a plan needs it, it will be stored explicitly. If the same plan needs v again, the values will be read from the stored file. By default a view is eager-stored if it is used more than once. (Also, a view may be marked for eager storage to suppress certain classes of optimizations, as well: in particular, the inner view of an Augment view will be marked for eager storage, and also views that are part of a chain of map-reduce operations, like consecutive joins.) You can discourage eager storing for a view with opts(stored=False) as well as force it with opts(stored=True).
External storing: Setting the option storedAt=path/to/file
will turn on eager storing, and also specify that the file will be stored at the given location. This is usually used to export output from GuineaPig.
Parallelism: Setting parallel=K
will modify the number of reducers used to create the view, in Hadoop mode.
Stored views aren't deleted after the plan completes, so you can also inspect them for debugging purposes if you like. They are also NOT reused from planning episode to planning episode, unless you ask them to be with the --reuse option, below.
Hadoop options: Again in Hadoop mode, setting the options hopts=foo
or hdefs=foo
will pass options in to the Hadoop commands used to build this view, as described elsewhere in this document.
Under the Hood
Guinea pig tries to optimize its plan by stitching together view definitions when possible. Generally, operations like Flatten and ReplaceEach, which just transform a sequence of rows, will be combined together, but operations which require a sorting step like Join and Group will not.
When you use Hadoop - and even if you don't! - the context in which your "by" functions work is pretty limited. In particular, they are executed by many different unknown processes, in general. So if you try anything fancy in terms of communication (e.g., using a ReplaceEach view to collect information in a process-local variable X and then using X in some subsequent view) it probably won't work.
In terms of implementation, every view has an associated checkpoint, which is the latest file that is need to create that view, and a rowGenerator, which is an iterator that produces the rows, one by one, assuming that's it run in a context where the checkpoint is available on stdin.
The checkpoint plan for a view produces the checkpoint. The storage plan stores the view.
Abstract view types are are Reader views, Transformation views (like ReplaceEach), or MapReduce views. The checkpoint for a reader is the file it reads. The checkpoint for a Transformation is the checkpoint for the view it is transforming. The checkpoint for a MapReduce is the reducer input.