Difference between revisions of "MRS Guinea Pig"

From Cohen Courses
Jump to navigationJump to search
 
(14 intermediate revisions by the same user not shown)
Line 1: Line 1:
 
== What it is ==
 
== What it is ==
  
<code>mrs_gp</code> stands for "Map-Reduce Streaming for Guinea Pig" and is designed to be an alternative backend for [[Guinea Pig]].  It has a  similar interface to [https://hadoop.apache.org/docs/r1.2.1/streaming.html Hadoop streaming], which I assume in this document you know all about. It is implemented in single python source file, <code>mrs_gp.py</code>, which is distributed with [[Guinea Pig]].  (Currently, it's only in the git branch <code>mrs_gp_serve</code>).  
+
<code>mrs_gp</code> stands for "Map-Reduce Streaming for Guinea Pig" and is designed to be an alternative backend for [[Guinea Pig]].  It has a  similar interface to [https://hadoop.apache.org/docs/r1.2.1/streaming.html Hadoop streaming], which I assume in this document you know all about. It is implemented in single python source file, <code>mrs_gp.py</code>, which is distributed with [[Guinea Pig]].  
  
 
I usually pronounce is as "missus gee pee".
 
I usually pronounce is as "missus gee pee".
Line 14: Line 14:
 
</pre>
 
</pre>
  
for the appropriate path to your copy of <code>mrs_gp.py.</code>.  To run a streaming map-reduce command, type something like
+
for the appropriate path to your copy of <code>mrs_gp.py.</code>.  If you have the directory containing  <code>mrs_gp.py.</code> you can also use the somewhat longer command <code>python -m mrs_gp</code>.
 +
 
 +
To run a streaming map-reduce command, type something like
  
 
<pre>
 
<pre>
 
% mrs_gp --input DIR1 --output DIR2 --mapper [SHELL_COMMAND1] --reducer [SHELL_COMMAND2] --numReduceTasks [K]
 
% mrs_gp --input DIR1 --output DIR2 --mapper [SHELL_COMMAND1] --reducer [SHELL_COMMAND2] --numReduceTasks [K]
 
</pre>
 
</pre>
 +
 +
Or you can use it as backend to [[Guinea Pig]], as explained [[Guinea_Pig#Other_Goodies_Included_in_the_Source|elsewhere]].
  
 
The arguments to <code>--input</code> and <code>--output</code> are directories on your local filesystem.  The input directory DIR1 should contain some number of text files and nothing else: every file in DIR1 will be fed as standard input to a mapper process defined by the string SHELL_COMMAND1.  The standard outputs of the mappers will be hashed into K buckets accord to their "keys", and each bucket will then be sorted and sent to the standard input of the reducers.  The standard output of the reducers will be stored in files in the output directory, DIR2.  These files will usually be given arbitrary names, like part0007.   
 
The arguments to <code>--input</code> and <code>--output</code> are directories on your local filesystem.  The input directory DIR1 should contain some number of text files and nothing else: every file in DIR1 will be fed as standard input to a mapper process defined by the string SHELL_COMMAND1.  The standard outputs of the mappers will be hashed into K buckets accord to their "keys", and each bucket will then be sorted and sent to the standard input of the reducers.  The standard output of the reducers will be stored in files in the output directory, DIR2.  These files will usually be given arbitrary names, like part0007.   
Line 24: Line 28:
 
The previous contents of DIR2, if they exist, will be deleted.
 
The previous contents of DIR2, if they exist, will be deleted.
  
The parallelism that <code>mrs_gp</code> uses is mostly process-level, not thread-level.  There are threads involved in the shuffle step typically there will be multiple subprocesses
+
The parallelism that <code>mrs_gp</code> uses is mostly process-level, not thread-level.  There are threads involved in the shuffle step, and typically there will also be multiple subprocesses:
 
* each of the N input files in DIR1 will have its own mapper process
 
* each of the N input files in DIR1 will have its own mapper process
 
* each of the N output files in DIR2 will have its own reducer process, and its own sort process.
 
* each of the N output files in DIR2 will have its own reducer process, and its own sort process.
  
The default number of reduce tasks is 1, and the default mapper command is <code>cat</code>.  If there is no reducer, then a "mapper-only" task is performed: each file in <code>DIR1</code> will be mapped to a file with a corresponding name in <code>DIR2</code>
+
The default number of reduce tasks is 1, and the default mapper command is <code>cat</code>.  If there is no reducer, then a "mapper-only" task is performed: each file in <code>DIR1</code> will be mapped to a file with a corresponding name in <code>DIR2</code>.
  
 
==  Why to use it without a server ==
 
==  Why to use it without a server ==
  
This looks a lot like Hadoop streaming but really isn't at all useful for the same purposes.  Hadoop is mostly used for I/O bound problems, and for these, <code>mrs_gp</code> will not be any faster than running sequentially.  For instance, since the files in DIR1 are all on the same disk, if that disk has only one disk head, it's not any faster to read 10 files with 100Mb each than to to read one file with 1000Mb.  So for I/O bound tasks, <code>mrs_gp</code>'s parallelism is useless.
+
This looks a lot like Hadoop streaming but it isn't useful for the same purposes.  Hadoop is mostly used for I/O bound problems, and for these, <code>mrs_gp</code> will not be any faster than running sequentially.  For instance, since the files in DIR1 are all on the same disk, if that disk has only one disk head, it's not any faster to read 10 files with 100Mb each than to to read one file with 1000Mb.  So for I/O bound tasks, <code>mrs_gp</code>'s parallelism is useless.
  
 
It can be useful if your machine has multiple cores, and your job is CPU-bound, not I/O bound.   
 
It can be useful if your machine has multiple cores, and your job is CPU-bound, not I/O bound.   
Line 67: Line 71:
  
 
There is one difference in server-executed tasks: in addition to using local filesystem directories for DIR1 and DIR2, you can use "directories" which are stored in memory on the server.  To do this, just prefix a directory name with "gpfs:".  The gpfs "filesystem" is very simple: there are no hierarchical names, only directories and files.  When the server shuts down, everything in the file system is lost - nothing is saved on disk, unless you save it.  You can save it with a <code>mrs_gp</code> task that uses a local filesystem location as its destination DIR2, or with the <code>mrs_gp</code> filesystem commands below.
 
There is one difference in server-executed tasks: in addition to using local filesystem directories for DIR1 and DIR2, you can use "directories" which are stored in memory on the server.  To do this, just prefix a directory name with "gpfs:".  The gpfs "filesystem" is very simple: there are no hierarchical names, only directories and files.  When the server shuts down, everything in the file system is lost - nothing is saved on disk, unless you save it.  You can save it with a <code>mrs_gp</code> task that uses a local filesystem location as its destination DIR2, or with the <code>mrs_gp</code> filesystem commands below.
 +
 +
The <code>gpfs:</code> mechanism will give you better parallel performance, but can use a lot of memory.  It also gives you somewhat better tools for monitoring progress (see below).
 +
 +
'''Note''': the server is doing something dangerous - it's getting a shell command from the internet (two of them, actually) and running them on your machine with your permissions.  To make this somewhat safer, '''map-reduce tasks are only accepted from one client IP address - the address of the machine that is running the server'''.  So currently you cannot run a server on one machine and submit tasks from another.  I've also made it impossible to access files not reachable from a relative path from the directory that the server is running in.
 +
 +
'''Another note:'''  The program actually uses both thread-level and process-level parallelism: threads are used for shuffling in parallel.  I've found much better performance on some tasks using <code>pypy</code> than <code>cpython</code>.
  
 
==  Accessing the gpfs: filesystem ==
 
==  Accessing the gpfs: filesystem ==
Line 92: Line 102:
 
% mrs_gp --fs getmerge DIR
 
% mrs_gp --fs getmerge DIR
 
</pre>
 
</pre>
 +
 +
  
 
==  Inspecting Running And Completed Tasks ==
 
==  Inspecting Running And Completed Tasks ==
Line 147: Line 159:
 
In addition to saving subprocess "stderr" output in the directory <code>logs</code>, <code>mrs_gp</code> also maintains a directory called <code>_history</code> which has one file for every past task.  This contains a shorter version of the report.
 
In addition to saving subprocess "stderr" output in the directory <code>logs</code>, <code>mrs_gp</code> also maintains a directory called <code>_history</code> which has one file for every past task.  This contains a shorter version of the report.
  
The <code>_logs</code> directory is re-initialized before every task -- i.e., you can only look at the logs for the current or last-finished task.  The <code>_history</code> directory is persistent.
+
The <code>_logs</code> directory is re-initialized before every task -- i.e., you can only look at the logs for the current or last-finished task.  The <code>_history</code> accumulates more and more entries until you shut down the server.
 +
 
 +
== Using mrs_gp with GuineaPig ==
 +
 
 +
MRS was designed as a potential backend for Guinea Pig. If you want to use there are a few steps to take, which are
 +
discussed on the [[Guinea_Pig#In-memory_map-reduce_with_MRS_Guinea_Pig|Guinea Pig wiki page]].

Latest revision as of 09:58, 6 July 2016

What it is

mrs_gp stands for "Map-Reduce Streaming for Guinea Pig" and is designed to be an alternative backend for Guinea Pig. It has a similar interface to Hadoop streaming, which I assume in this document you know all about. It is implemented in single python source file, mrs_gp.py, which is distributed with Guinea Pig.

I usually pronounce is as "missus gee pee".

How to use it without a server

I'm going to assume below that the command unix command "mrs_gp" invokes "python mrs_gp.py". One way to make this true would be to type

% alias mrs_gp='python my/path/to/GuineaPig/mrs_gp.py'

for the appropriate path to your copy of mrs_gp.py.. If you have the directory containing mrs_gp.py. you can also use the somewhat longer command python -m mrs_gp.

To run a streaming map-reduce command, type something like

% mrs_gp --input DIR1 --output DIR2 --mapper [SHELL_COMMAND1] --reducer [SHELL_COMMAND2] --numReduceTasks [K]

Or you can use it as backend to Guinea Pig, as explained elsewhere.

The arguments to --input and --output are directories on your local filesystem. The input directory DIR1 should contain some number of text files and nothing else: every file in DIR1 will be fed as standard input to a mapper process defined by the string SHELL_COMMAND1. The standard outputs of the mappers will be hashed into K buckets accord to their "keys", and each bucket will then be sorted and sent to the standard input of the reducers. The standard output of the reducers will be stored in files in the output directory, DIR2. These files will usually be given arbitrary names, like part0007.

The previous contents of DIR2, if they exist, will be deleted.

The parallelism that mrs_gp uses is mostly process-level, not thread-level. There are threads involved in the shuffle step, and typically there will also be multiple subprocesses:

  • each of the N input files in DIR1 will have its own mapper process
  • each of the N output files in DIR2 will have its own reducer process, and its own sort process.

The default number of reduce tasks is 1, and the default mapper command is cat. If there is no reducer, then a "mapper-only" task is performed: each file in DIR1 will be mapped to a file with a corresponding name in DIR2.

Why to use it without a server

This looks a lot like Hadoop streaming but it isn't useful for the same purposes. Hadoop is mostly used for I/O bound problems, and for these, mrs_gp will not be any faster than running sequentially. For instance, since the files in DIR1 are all on the same disk, if that disk has only one disk head, it's not any faster to read 10 files with 100Mb each than to to read one file with 1000Mb. So for I/O bound tasks, mrs_gp's parallelism is useless.

It can be useful if your machine has multiple cores, and your job is CPU-bound, not I/O bound.

Hint: One way to make a process less I/O bound is to do the reading from a RAM disk, rather than a regular disk. Once you've done that then the multiprocessing done by mrs_gp will be more helpful.

How to use it with a server

As an alternative to a RAM disk, you can also set up a server process that will store inputs and outputs for mrs_gp. The mrs_gp.py file contains code to run both the server and client sides. To start a server, use

% mrs_gp --serve &

You want to put this in the background, since it doesn't return until the server halts. This will log data to the local file server.log.

To check if the server is running, you can use

% mrs_gp --probe

To shut it down, use

% mrs_gp --shutdown

Finally, to submit a map-reduce task to the server, you can use the same arguments as before, but include the extra argument --task.

% mrs_gp --task --input DIR1 --output DIR2 --mapper [SHELL_COMMAND1] --reducer [SHELL_COMMAND2] --numReduceTasks [K]

There is one difference in server-executed tasks: in addition to using local filesystem directories for DIR1 and DIR2, you can use "directories" which are stored in memory on the server. To do this, just prefix a directory name with "gpfs:". The gpfs "filesystem" is very simple: there are no hierarchical names, only directories and files. When the server shuts down, everything in the file system is lost - nothing is saved on disk, unless you save it. You can save it with a mrs_gp task that uses a local filesystem location as its destination DIR2, or with the mrs_gp filesystem commands below.

The gpfs: mechanism will give you better parallel performance, but can use a lot of memory. It also gives you somewhat better tools for monitoring progress (see below).

Note: the server is doing something dangerous - it's getting a shell command from the internet (two of them, actually) and running them on your machine with your permissions. To make this somewhat safer, map-reduce tasks are only accepted from one client IP address - the address of the machine that is running the server. So currently you cannot run a server on one machine and submit tasks from another. I've also made it impossible to access files not reachable from a relative path from the directory that the server is running in.

Another note: The program actually uses both thread-level and process-level parallelism: threads are used for shuffling in parallel. I've found much better performance on some tasks using pypy than cpython.

Accessing the gpfs: filesystem

You can inspect things in the server's in-memory filesystem in two ways. First, you can point a browser at the location http://your.machine.name:1969/ls?html=1. (Note - 1969 is the default port that the server runs on.) Second, you can use these commands:

% mrs_gp --fs ls
% mrs_gp --fs ls DIR
% mrs_gp --fs head DIR FILE
% mrs_gp --fs tail DIR FILE [N]
% mrs_gp --fs cat DIR FILE [N]

Briefly:

  • "ls" with no arguments lists the directories in gpfs:
  • "ls" DIR lists the files in a directory
  • "head", "tail", and "cat" show the first N characters in a file (note the space between DIR and FILE), the last N characters, and the entire file. N defaults to 2048.

All of these commands write to standard output.

Two additional --fs subcommands are "write", will append a line to a file -- which is used mostly for debugging and to append together and retrieve all the files in a directory. The latter's syntax is:

% mrs_gp --fs getmerge DIR


Inspecting Running And Completed Tasks

You can get a status report on the currently running (or last completed) task with the browser, or with the command

% mrs_gp --report

This will return something like this:

Options:
 --input                                  : gpfs:sharded
 --mapper                                 : python streamNaiveBayesLearner.py --streamTrain 100
 --numReduceTasks                         : 3
 --output                                 : gpfs:events
 --reducer                                : python sum-events.py
Statistics:
 mapper  summary: 3/3 finished/started progress [###]
 reducer summary: 3/3 finished/started progress [###]
 shuffle summary: 3/3 finished/started progress [###]
 __top level task                        : finished in 3.264 sec
 _init mappers and shufflers             : finished in 0.078 sec
 _init reduce buffer queues              : finished in 0.001 sec
 _init reducers                          : finished in 0.114 sec
 _join mappers                           : finished in 1.843 sec
 _join reducer queues                    : finished in 0.000 sec
 _join reducers                          : finished in 1.226 sec
 mapper-from-gpfs:sharded/part00000      : finished in 1.605 sec chars: input 1990944(1.9M) output 6784645(6.5M) log 206
 mapper-from-gpfs:sharded/part00001      : finished in 1.424 sec chars: input 1927812(1.8M) output 6587275(6.3M) log 206
 mapper-from-gpfs:sharded/part00002      : finished in 1.586 sec chars: input 1935917(1.8M) output 6714692(6.4M) log 206
 reducer-to-gpfs:events/part00000        : finished in 1.160 sec chars: input 6651662(6.3M) output 1299450(1.2M) log 0
 reducer-to-gpfs:events/part00001        : finished in 1.263 sec chars: input 6927496(6.6M) output 1316031(1.3M) log 0
 reducer-to-gpfs:events/part00002        : finished in 1.151 sec chars: input 6507454(6.2M) output 1311985(1.3M) log 0
 shuffle from gpfs:sharded/part00000     : finished in 0.273 sec
 shuffle from gpfs:sharded/part00001     : finished in 0.146 sec
 shuffle from gpfs:sharded/part00002     : finished in 0.160 sec
Subprocess Logs:
 chars: 206(0.0M)  _logs/mapper-from-gpfs:sharded/part00000
 chars: 206(0.0M)  _logs/mapper-from-gpfs:sharded/part00001
 chars: 206(0.0M)  _logs/mapper-from-gpfs:sharded/part00002
 chars: 0(0.0M)  _logs/reducer-to-gpfs:events/part00000
 chars: 0(0.0M)  _logs/reducer-to-gpfs:events/part00001
 chars: 0(0.0M)  _logs/reducer-to-gpfs:events/part00002

Alternatively, you can use the browser on the URL http://your.machine.name:1969/report?html=1 to get a similar output.

This report has three parts:

  • The options heading shows the original arguments of the task.
  • The statistics heading shows first a summary of how many mappers, reducers, and shufflers have completed, as counts, and also with an ascii progress bar. (So "[###..]" would mean that 3 of 5 processes had completed. Then it shows more detail on all the running or completed subtasks.
  • Finally, the subprocess logs heading lists the locations on the gpfs: filesystem where the standard error output of each subprocess is stored. (It might look like these names are hierarchical, but they are not: the directory is always _logs, and the filename is the rest - but it might usually will contain slashes as part of its name.) You can look at these logs in the using the --fs subcommands if you need to in debugging. For example, the command mrs --fs cat _logs mapper-from-gpfs:sharded/part00000 will show the standard error output for the first mapper.

In addition to saving subprocess "stderr" output in the directory logs, mrs_gp also maintains a directory called _history which has one file for every past task. This contains a shorter version of the report.

The _logs directory is re-initialized before every task -- i.e., you can only look at the logs for the current or last-finished task. The _history accumulates more and more entries until you shut down the server.

Using mrs_gp with GuineaPig

MRS was designed as a potential backend for Guinea Pig. If you want to use there are a few steps to take, which are discussed on the Guinea Pig wiki page.