Difference between revisions of "MRS Guinea Pig"
Line 66: | Line 66: | ||
</pre> | </pre> | ||
− | There is one difference in server-executed tasks: in addition to using local filesystem directories for DIR1 and DIR2, you can use "directories" which are stored in memory on the server. To do this, just prefix a directory name with "gpfs:". | + | There is one difference in server-executed tasks: in addition to using local filesystem directories for DIR1 and DIR2, you can use "directories" which are stored in memory on the server. To do this, just prefix a directory name with "gpfs:". The gpfs "filesystem" is very simple: there are no hierarchical names, only directories and files. When the server shuts down, everything in the file system is lost - nothing is saved on disk, unless you save it. You can save it with a <code>mrs_gp</code> task that uses a local filesystem location as its destination DIR2, or with the <code>mrs_gp</code> filesystem commands below. |
− | + | == Accessing the gpfs: filesystem == | |
+ | |||
+ | You can inspect things in the server's in-memory filesystem in two ways. First, you can point a browser at the location <code>http://your.machine.name:1969/ls?html=1</code>. (Note - 1969 is the default port that the server runs on.) Second, you can use these commands: | ||
+ | |||
+ | <pre> | ||
+ | % mrs --fs ls | ||
+ | % mrs --fs ls DIR | ||
+ | % mrs --fs head DIR FILE | ||
+ | % mrs --fs tail DIR FILE [N] | ||
+ | % mrs --fs cat DIR FILE [N] | ||
+ | </pre> | ||
+ | |||
+ | Briefly: | ||
+ | * "ls" with no arguments lists the directories in gpfs: | ||
+ | * "ls" DIR lists the files in a directory | ||
+ | * "head", "tail", and "cat" show the first N characters in a file (note the space between DIR and FILE), the last N characters, and the entire file. N defaults to 2048. | ||
+ | |||
+ | All of these commands write to standard output. | ||
+ | |||
+ | Two additional <code>--fs</code> subcommands are "write", will append a line to a file -- which is used mostly for debugging and to append together and retrieve all the files in a directory. The latter's syntax is: | ||
+ | |||
+ | <pre> | ||
+ | % mrs --fs getmerge DIR | ||
+ | </pre> | ||
+ | |||
+ | == Monitoring Running Tasks == | ||
+ | |||
+ | You can get a status report on the currently running (or last completed) task with the browser, or with the command | ||
+ | |||
+ | <pre> | ||
+ | % mrs --report | ||
+ | </pre> | ||
+ | |||
+ | This will return something like this: | ||
+ | <pre> | ||
+ | Options: | ||
+ | --input : gpfs:sharded | ||
+ | --mapper : python streamNaiveBayesLearner.py --streamTrain 100 | ||
+ | --numReduceTasks : 3 | ||
+ | --output : gpfs:events | ||
+ | --reducer : python sum-events.py | ||
+ | Statistics: | ||
+ | mapper summary: 3/3 finished/started progress [###] | ||
+ | reducer summary: 3/3 finished/started progress [###] | ||
+ | shuffle summary: 3/3 finished/started progress [###] | ||
+ | __top level task : finished in 3.264 sec | ||
+ | _init mappers and shufflers : finished in 0.078 sec | ||
+ | _init reduce buffer queues : finished in 0.001 sec | ||
+ | _init reducers : finished in 0.114 sec | ||
+ | _join mappers : finished in 1.843 sec | ||
+ | _join reducer queues : finished in 0.000 sec | ||
+ | _join reducers : finished in 1.226 sec | ||
+ | mapper-from-gpfs:sharded/part00000 : finished in 1.605 sec chars: input 1990944(1.9M) output 6784645(6.5M) log 206 | ||
+ | mapper-from-gpfs:sharded/part00001 : finished in 1.424 sec chars: input 1927812(1.8M) output 6587275(6.3M) log 206 | ||
+ | mapper-from-gpfs:sharded/part00002 : finished in 1.586 sec chars: input 1935917(1.8M) output 6714692(6.4M) log 206 | ||
+ | reducer-to-gpfs:events/part00000 : finished in 1.160 sec chars: input 6651662(6.3M) output 1299450(1.2M) log 0 | ||
+ | reducer-to-gpfs:events/part00001 : finished in 1.263 sec chars: input 6927496(6.6M) output 1316031(1.3M) log 0 | ||
+ | reducer-to-gpfs:events/part00002 : finished in 1.151 sec chars: input 6507454(6.2M) output 1311985(1.3M) log 0 | ||
+ | shuffle from gpfs:sharded/part00000 : finished in 0.273 sec | ||
+ | shuffle from gpfs:sharded/part00001 : finished in 0.146 sec | ||
+ | shuffle from gpfs:sharded/part00002 : finished in 0.160 sec | ||
+ | Subprocess Logs: | ||
+ | chars: 206(0.0M) _logs/mapper-from-gpfs:sharded/part00000 | ||
+ | chars: 206(0.0M) _logs/mapper-from-gpfs:sharded/part00001 | ||
+ | chars: 206(0.0M) _logs/mapper-from-gpfs:sharded/part00002 | ||
+ | chars: 0(0.0M) _logs/reducer-to-gpfs:events/part00000 | ||
+ | chars: 0(0.0M) _logs/reducer-to-gpfs:events/part00001 | ||
+ | chars: 0(0.0M) _logs/reducer-to-gpfs:events/part00002 | ||
+ | </pre> | ||
+ | |||
+ | This report has three parts: | ||
+ | * The '''options''' heading shows the original arguments of the task. | ||
+ | * The '''statistics''' heading shows first a summary of how many mappers, reducers, and shufflers have completed, as counts, and also with an ascii progress bar. (So "[###..]" would mean that 3 of 5 processes had completed. Then it shows more detail on all the running or completed subtasks. | ||
+ | * Finally, the '''subprocess logs''' heading lists the locations on the gpfs: filesystem where the standard error output of each subprocess is stored. (It might look like these names are hierarchical, but they are not: the directory is always <code>_logs</code>, and the filename is the rest - but it might usually will contain slashes as part of its name.) You can look at these logs in the using the <code>--fs</code> subcommands if you need to in debugging. For example, the command <code>mrs --fs cat _logs mapper-from-gpfs:sharded/part00000</code> will show the standard error output for the first mapper. |
Revision as of 14:30, 11 November 2015
Contents
What it is
mrs_gp
stands for "Map-Reduce Streaming for Guinea Pig" and is designed to be an alternative backend for Guinea Pig. It has a similar interface to Hadoop streaming, which I assume in this document you know all about. It is implemented in single python source file, mrs_gp.py
, which is distributed with Guinea Pig. (Currently, it's only in the git branch mrs_gp_serve
).
I usually pronounce is as "missus gee pee".
How to use it without a server
I'm going to assume below that the command unix command "mrs" invokes "python mrs_gp.py". One way to make this true would be to type
% alias mrs='python my/path/to/GuineaPig/mrs_gp.py'
for the appropriate path to your copy of mrs_gp.py.
. To run a streaming map-reduce command, type something like
% mrs --input DIR1 --output DIR2 --mapper [SHELL_COMMAND1] --reducer [SHELL_COMMAND2] --numReduceTasks [K]
The arguments to --input
and --output
are directories on your local filesystem. The input directory DIR1 should contain some number of text files and nothing else: every file in DIR1 will be fed as standard input to a mapper process defined by the string SHELL_COMMAND1. The standard outputs of the mappers will be hashed into K buckets accord to their "keys", and each bucket will then be sorted and sent to the standard input of the reducers. The standard output of the reducers will be stored in files in the output directory, DIR2. These files will usually be given arbitrary names, like part0007.
The previous contents of DIR2, if they exist, will be deleted.
The parallelism that mrs_gp
uses is mostly process-level, not thread-level. There are threads involved in the shuffle step typically there will be multiple subprocesses
- each of the N input files in DIR1 will have its own mapper process
- each of the N output files in DIR2 will have its own reducer process, and its own sort process.
The default number of reduce tasks is 1, and the default mapper and reducer commands are cat
.
Why to use it without a server
This looks a lot like Hadoop streaming but really isn't at all useful for the same purposes. Hadoop is mostly used for I/O bound problems, and for these, mrs_gp
will not be any faster than running sequentially. For instance, since the files in DIR1 are all on the same disk, if that disk has only one disk head, it's not any faster to read 10 files with 100Mb each than to to read one file with 1000Mb. So for I/O bound tasks, mrs_gp
's parallelism is useless.
It can be useful if your machine has multiple cores, and your job is CPU-bound, not I/O bound.
Hint: One way to make a process less I/O bound is to do the reading from a RAM disk, rather than a regular disk. Once you've done that then the multiprocessing done by mrs_gp
will be more helpful.
How to use it with a server
As an alternative to a RAM disk, you can also set up a server process that will store inputs and outputs for mrs_gp
. The mrs_gp.py
file contains code to run both the server and client sides. To start a server, use
% mrs --serve &
You want to put this in the background, since it doesn't return until the server halts. This will log data to the local file server.log
.
To check if the server is running, you can use
% mrs --probe
To shut it down, use
% mrs --shutdown
Finally, to submit a map-reduce task to the server, you can use the same arguments as before, but include the extra argument --task
.
% mrs --task --input DIR1 --output DIR2 --mapper [SHELL_COMMAND1] --reducer [SHELL_COMMAND2] --numReduceTasks [K]
There is one difference in server-executed tasks: in addition to using local filesystem directories for DIR1 and DIR2, you can use "directories" which are stored in memory on the server. To do this, just prefix a directory name with "gpfs:". The gpfs "filesystem" is very simple: there are no hierarchical names, only directories and files. When the server shuts down, everything in the file system is lost - nothing is saved on disk, unless you save it. You can save it with a mrs_gp
task that uses a local filesystem location as its destination DIR2, or with the mrs_gp
filesystem commands below.
Accessing the gpfs: filesystem
You can inspect things in the server's in-memory filesystem in two ways. First, you can point a browser at the location http://your.machine.name:1969/ls?html=1
. (Note - 1969 is the default port that the server runs on.) Second, you can use these commands:
% mrs --fs ls % mrs --fs ls DIR % mrs --fs head DIR FILE % mrs --fs tail DIR FILE [N] % mrs --fs cat DIR FILE [N]
Briefly:
- "ls" with no arguments lists the directories in gpfs:
- "ls" DIR lists the files in a directory
- "head", "tail", and "cat" show the first N characters in a file (note the space between DIR and FILE), the last N characters, and the entire file. N defaults to 2048.
All of these commands write to standard output.
Two additional --fs
subcommands are "write", will append a line to a file -- which is used mostly for debugging and to append together and retrieve all the files in a directory. The latter's syntax is:
% mrs --fs getmerge DIR
Monitoring Running Tasks
You can get a status report on the currently running (or last completed) task with the browser, or with the command
% mrs --report
This will return something like this:
Options: --input : gpfs:sharded --mapper : python streamNaiveBayesLearner.py --streamTrain 100 --numReduceTasks : 3 --output : gpfs:events --reducer : python sum-events.py Statistics: mapper summary: 3/3 finished/started progress [###] reducer summary: 3/3 finished/started progress [###] shuffle summary: 3/3 finished/started progress [###] __top level task : finished in 3.264 sec _init mappers and shufflers : finished in 0.078 sec _init reduce buffer queues : finished in 0.001 sec _init reducers : finished in 0.114 sec _join mappers : finished in 1.843 sec _join reducer queues : finished in 0.000 sec _join reducers : finished in 1.226 sec mapper-from-gpfs:sharded/part00000 : finished in 1.605 sec chars: input 1990944(1.9M) output 6784645(6.5M) log 206 mapper-from-gpfs:sharded/part00001 : finished in 1.424 sec chars: input 1927812(1.8M) output 6587275(6.3M) log 206 mapper-from-gpfs:sharded/part00002 : finished in 1.586 sec chars: input 1935917(1.8M) output 6714692(6.4M) log 206 reducer-to-gpfs:events/part00000 : finished in 1.160 sec chars: input 6651662(6.3M) output 1299450(1.2M) log 0 reducer-to-gpfs:events/part00001 : finished in 1.263 sec chars: input 6927496(6.6M) output 1316031(1.3M) log 0 reducer-to-gpfs:events/part00002 : finished in 1.151 sec chars: input 6507454(6.2M) output 1311985(1.3M) log 0 shuffle from gpfs:sharded/part00000 : finished in 0.273 sec shuffle from gpfs:sharded/part00001 : finished in 0.146 sec shuffle from gpfs:sharded/part00002 : finished in 0.160 sec Subprocess Logs: chars: 206(0.0M) _logs/mapper-from-gpfs:sharded/part00000 chars: 206(0.0M) _logs/mapper-from-gpfs:sharded/part00001 chars: 206(0.0M) _logs/mapper-from-gpfs:sharded/part00002 chars: 0(0.0M) _logs/reducer-to-gpfs:events/part00000 chars: 0(0.0M) _logs/reducer-to-gpfs:events/part00001 chars: 0(0.0M) _logs/reducer-to-gpfs:events/part00002
This report has three parts:
- The options heading shows the original arguments of the task.
- The statistics heading shows first a summary of how many mappers, reducers, and shufflers have completed, as counts, and also with an ascii progress bar. (So "[###..]" would mean that 3 of 5 processes had completed. Then it shows more detail on all the running or completed subtasks.
- Finally, the subprocess logs heading lists the locations on the gpfs: filesystem where the standard error output of each subprocess is stored. (It might look like these names are hierarchical, but they are not: the directory is always
_logs
, and the filename is the rest - but it might usually will contain slashes as part of its name.) You can look at these logs in the using the--fs
subcommands if you need to in debugging. For example, the commandmrs --fs cat _logs mapper-from-gpfs:sharded/part00000
will show the standard error output for the first mapper.