MRS Guinea Pig
Contents
What it is
mrs_gp
stands for "Map-Reduce Streaming for Guinea Pig" and is designed to be an alternative backend for Guinea Pig. It has a similar interface to Hadoop streaming, which I assume in this document you know all about. It is implemented in single python source file, mrs_gp.py
, which is distributed with Guinea Pig.
I usually pronounce is as "missus gee pee".
How to use it without a server
I'm going to assume below that the command unix command "mrs_gp" invokes "python mrs_gp.py". One way to make this true would be to type
% alias mrs_gp='python my/path/to/GuineaPig/mrs_gp.py'
for the appropriate path to your copy of mrs_gp.py.
. If you have the directory containing mrs_gp.py.
you can also use the somewhat longer command python -m mrs_gp
.
To run a streaming map-reduce command, type something like
% mrs_gp --input DIR1 --output DIR2 --mapper [SHELL_COMMAND1] --reducer [SHELL_COMMAND2] --numReduceTasks [K]
Or you can use it as backend to Guinea Pig, as explained elsewhere.
The arguments to --input
and --output
are directories on your local filesystem. The input directory DIR1 should contain some number of text files and nothing else: every file in DIR1 will be fed as standard input to a mapper process defined by the string SHELL_COMMAND1. The standard outputs of the mappers will be hashed into K buckets accord to their "keys", and each bucket will then be sorted and sent to the standard input of the reducers. The standard output of the reducers will be stored in files in the output directory, DIR2. These files will usually be given arbitrary names, like part0007.
The previous contents of DIR2, if they exist, will be deleted.
The parallelism that mrs_gp
uses is mostly process-level, not thread-level. There are threads involved in the shuffle step, and typically there will also be multiple subprocesses:
- each of the N input files in DIR1 will have its own mapper process
- each of the N output files in DIR2 will have its own reducer process, and its own sort process.
The default number of reduce tasks is 1, and the default mapper command is cat
. If there is no reducer, then a "mapper-only" task is performed: each file in DIR1
will be mapped to a file with a corresponding name in DIR2
.
Why to use it without a server
This looks a lot like Hadoop streaming but it isn't useful for the same purposes. Hadoop is mostly used for I/O bound problems, and for these, mrs_gp
will not be any faster than running sequentially. For instance, since the files in DIR1 are all on the same disk, if that disk has only one disk head, it's not any faster to read 10 files with 100Mb each than to to read one file with 1000Mb. So for I/O bound tasks, mrs_gp
's parallelism is useless.
It can be useful if your machine has multiple cores, and your job is CPU-bound, not I/O bound.
Hint: One way to make a process less I/O bound is to do the reading from a RAM disk, rather than a regular disk. Once you've done that then the multiprocessing done by mrs_gp
will be more helpful.
How to use it with a server
As an alternative to a RAM disk, you can also set up a server process that will store inputs and outputs for mrs_gp
. The mrs_gp.py
file contains code to run both the server and client sides. To start a server, use
% mrs_gp --serve &
You want to put this in the background, since it doesn't return until the server halts. This will log data to the local file server.log
.
To check if the server is running, you can use
% mrs_gp --probe
To shut it down, use
% mrs_gp --shutdown
Finally, to submit a map-reduce task to the server, you can use the same arguments as before, but include the extra argument --task
.
% mrs_gp --task --input DIR1 --output DIR2 --mapper [SHELL_COMMAND1] --reducer [SHELL_COMMAND2] --numReduceTasks [K]
There is one difference in server-executed tasks: in addition to using local filesystem directories for DIR1 and DIR2, you can use "directories" which are stored in memory on the server. To do this, just prefix a directory name with "gpfs:". The gpfs "filesystem" is very simple: there are no hierarchical names, only directories and files. When the server shuts down, everything in the file system is lost - nothing is saved on disk, unless you save it. You can save it with a mrs_gp
task that uses a local filesystem location as its destination DIR2, or with the mrs_gp
filesystem commands below.
The gpfs:
mechanism will give you better parallel performance, but can use a lot of memory. It also gives you somewhat better tools for monitoring progress (see below).
Note: the server is doing something dangerous - it's getting a shell command from the internet (two of them, actually) and running them on your machine with your permissions. To make this somewhat safer, map-reduce tasks are only accepted from one client IP address - the address of the machine that is running the server. So currently you cannot run a server on one machine and submit tasks from another. I've also made it impossible to access files not reachable from a relative path from the directory that the server is running in.
Another note: The program actually uses both thread-level and process-level parallelism: threads are used for shuffling in parallel. I've found much better performance on some tasks using pypy
than cpython
.
Accessing the gpfs: filesystem
You can inspect things in the server's in-memory filesystem in two ways. First, you can point a browser at the location http://your.machine.name:1969/ls?html=1
. (Note - 1969 is the default port that the server runs on.) Second, you can use these commands:
% mrs_gp --fs ls % mrs_gp --fs ls DIR % mrs_gp --fs head DIR FILE % mrs_gp --fs tail DIR FILE [N] % mrs_gp --fs cat DIR FILE [N]
Briefly:
- "ls" with no arguments lists the directories in gpfs:
- "ls" DIR lists the files in a directory
- "head", "tail", and "cat" show the first N characters in a file (note the space between DIR and FILE), the last N characters, and the entire file. N defaults to 2048.
All of these commands write to standard output.
Two additional --fs
subcommands are "write", will append a line to a file -- which is used mostly for debugging and to append together and retrieve all the files in a directory. The latter's syntax is:
% mrs_gp --fs getmerge DIR
Inspecting Running And Completed Tasks
You can get a status report on the currently running (or last completed) task with the browser, or with the command
% mrs_gp --report
This will return something like this:
Options: --input : gpfs:sharded --mapper : python streamNaiveBayesLearner.py --streamTrain 100 --numReduceTasks : 3 --output : gpfs:events --reducer : python sum-events.py Statistics: mapper summary: 3/3 finished/started progress [###] reducer summary: 3/3 finished/started progress [###] shuffle summary: 3/3 finished/started progress [###] __top level task : finished in 3.264 sec _init mappers and shufflers : finished in 0.078 sec _init reduce buffer queues : finished in 0.001 sec _init reducers : finished in 0.114 sec _join mappers : finished in 1.843 sec _join reducer queues : finished in 0.000 sec _join reducers : finished in 1.226 sec mapper-from-gpfs:sharded/part00000 : finished in 1.605 sec chars: input 1990944(1.9M) output 6784645(6.5M) log 206 mapper-from-gpfs:sharded/part00001 : finished in 1.424 sec chars: input 1927812(1.8M) output 6587275(6.3M) log 206 mapper-from-gpfs:sharded/part00002 : finished in 1.586 sec chars: input 1935917(1.8M) output 6714692(6.4M) log 206 reducer-to-gpfs:events/part00000 : finished in 1.160 sec chars: input 6651662(6.3M) output 1299450(1.2M) log 0 reducer-to-gpfs:events/part00001 : finished in 1.263 sec chars: input 6927496(6.6M) output 1316031(1.3M) log 0 reducer-to-gpfs:events/part00002 : finished in 1.151 sec chars: input 6507454(6.2M) output 1311985(1.3M) log 0 shuffle from gpfs:sharded/part00000 : finished in 0.273 sec shuffle from gpfs:sharded/part00001 : finished in 0.146 sec shuffle from gpfs:sharded/part00002 : finished in 0.160 sec Subprocess Logs: chars: 206(0.0M) _logs/mapper-from-gpfs:sharded/part00000 chars: 206(0.0M) _logs/mapper-from-gpfs:sharded/part00001 chars: 206(0.0M) _logs/mapper-from-gpfs:sharded/part00002 chars: 0(0.0M) _logs/reducer-to-gpfs:events/part00000 chars: 0(0.0M) _logs/reducer-to-gpfs:events/part00001 chars: 0(0.0M) _logs/reducer-to-gpfs:events/part00002
Alternatively, you can use the browser on the URL http://your.machine.name:1969/report?html=1
to get a similar output.
This report has three parts:
- The options heading shows the original arguments of the task.
- The statistics heading shows first a summary of how many mappers, reducers, and shufflers have completed, as counts, and also with an ascii progress bar. (So "[###..]" would mean that 3 of 5 processes had completed. Then it shows more detail on all the running or completed subtasks.
- Finally, the subprocess logs heading lists the locations on the gpfs: filesystem where the standard error output of each subprocess is stored. (It might look like these names are hierarchical, but they are not: the directory is always
_logs
, and the filename is the rest - but it might usually will contain slashes as part of its name.) You can look at these logs in the using the--fs
subcommands if you need to in debugging. For example, the commandmrs --fs cat _logs mapper-from-gpfs:sharded/part00000
will show the standard error output for the first mapper.
In addition to saving subprocess "stderr" output in the directory logs
, mrs_gp
also maintains a directory called _history
which has one file for every past task. This contains a shorter version of the report.
The _logs
directory is re-initialized before every task -- i.e., you can only look at the logs for the current or last-finished task. The _history
accumulates more and more entries until you shut down the server.
Using mrs_gp with GuineaPig
MRS was designed as a potential backend for Guinea Pig. If you want to use there are a few steps to take, which are discussed on the Guinea Pig wiki page.