P2 tutorial

Last updated: Jan 16, 2003

This document teaches how the p2 program works through a number of examples. If you recall from class, p2 is a program which takes a command line, and runs it on a number of machines, and collects the resulting input back on the machine initiating the p2 command:
] p2 command -p machine-list
The command argument is a bash command pipeline; the machine-list argument is a list of machine names given in /net/machine-name form.

You only have access to four machines, ia00100-ia00103. For convenience, you may want to put the following in your .bashrc

  MACHINES="/net/ia00100 /net/ia00101 /net/ia00102 /net/ia00103" export MACHINES
The basic "hello-world" for p2 is the following:
] p2 uptime -p $MACHINES
P begun : Thu Jan 16 22:20:02 2003
 10:25pm  up 93 days,  5:59,  0 users,  load average: 0.05, 0.11, 0.06
 10:25pm  up 54 days, 4 min,  0 users,  load average: 0.00, 0.01, 0.00
 10:25pm  up 107 days,  7:56,  0 users,  load average: 0.00, 0.01, 0.00
 10:25pm  up 58 days, 15:51,  0 users,  load average: 0.02, 0.03, 0.00
P ended : Thu Jan 16 22:20:03 2003
That took  1 wallclock secs ( 0.00 usr  0.02 sys +  0.66 cusr  0.07 csys =  0.75 CPU)

Basics

Let's try to count the number of ARC files on each of these machines:
] p2 'ls /[0-3]/*.arg.gz | wc -l' -p $MACHINES
P begun : ...
bash: /bin/ls: Argument list too long
bash: /bin/ls: Argument list too long
bash: /bin/ls: Argument list too long
bash: /bin/ls: Argument list too long
P end : ...
That took  ... wallclock secs ( ... )
Oops, this didn't work. The problem is that Bash limits commands lines to around 64K; the expansion of /[0-3]/*.arg.gz generates a command line that's too long.

It turns out that echo is a bash builtin command that does _not_ have this 64K-argument limit. Thus, echo can be combined with xargs to overcome this limitation:

] p2 'echo /[0-3]/*.arc.gz | xargs ls | wc -l' -p $MACHINES
P begun : ...
7137
7640
7651
7601
P end : ...
That took  ... wallclock secs ( ... )

This is a little unsatisfying, we probably want to have a per-machine count. For example, you could try:

] p2 'echo `hostname` `echo /[0-3]/*.arc.gz | xargs ls | wc -l`' \
  -p $MACHINES
P begun : ...
ia00103.archive.org 7640
ia00101.archive.org 7137
ia00102.archive.org 7601
ia00100.archive.org 7651
P end : ...
That took  ... wallclock secs ( ... )

An alternative is to use the I environment variable, which is assigned by p2 to be the index of the machine on which the program is running:

] p2 'echo $I `echo /[0-3]/*.arc.gz | xargs ls | wc -l`' -p $MACHINES
P begun : ...
3 7640
0 7651
2 7601
1 7137
P end : ...
That took  ... wallclock secs ( ... )
Perhaps we are interested in the total number of files across all machines. In this case, we might do the following:
] p2 'echo /[0-3]/*.arc.gz | xargs ls | wc -l' -p $MACHINES \
  | awk '{s += $1;} END {print s;}'
P begun : ...
P end : ...
That took  ... wallclock secs ( ... )
30029
Notice what's happening here: the status information printed by p2 (e.g., "P begun/end" and "That took") is printed to stderr; the output of the various machines is sent to stdout. Thus, when we pipe the output of p2 to another program, only the output of subcommands go down the pipeline.

Combining

When using p2, it is often desirable to combine the results from individual machines. In fact, the last example of the previous section illustrated one way to do this: pipe the output of p2 into more commands.

In some circumstances it can be inefficient to use this piping technique to combine results. For these situations, p2 has a built-in mechanism for doing the combination that can be more efficient.

To understand this built-in mechanism, you need to understand a little better how p2 works. When p2 starts, it creates a named pipe for each machine on which it intends to distribute its computation. For each machine, p2 attaches the remote computation to the producer end of the pipe corresponding to that machine; that is, whatever the remote computation on machine X sends to stdout is sent into the named pipe for the machine X. On the machine running p2 itself, p2 starts a "combiner" process with these named pipes as arguments. The default combiner process runs av_cat; thus, implicitly, the output from all the examples in the previous program was produced by a command line that looks like the following:

  av_cat pipe-for-ia00100 pipe-for-ia00101 .. pipe-for-ia00103
The av_cat program uses asynchronous input to read from these named pipes and interleaves the results to its stdout.

The -c option to p2 allows you to replace the av_cat command with another command of your choice. Unfortunately, the utility of this option is somewhat limited because only a limited number of Unix commands take their input from a variable number of input files (most, e.g., awk, take their input from a single file).

After av_cat (the default), the most popular way of combining p2 results is using av_sort -m. The -m here tells sort to merge streams that have already been sorted. Typically, when sorting large results using p2, the command argument given to p2 is structured in a way to output sorted results, then -c 'av_sort -m' is used to merge these sorted results into a single, sorted stream.

Let's say that we wanted to produce a list of the ARC files on our $MACHINES sorted by ascending size. We could do so using the following p2 commandline:

] p2 'echo /[0-3]/*.arc.gz | xargs wc -c | grep -v total \
  | sort +0n -1' -c 'av_sort -m +0n -1' -p $MACHINES
P begun : ...
....lots of output....
P end : ...
That took  ... wallclock secs ( ... )
(The grep -v total is needed because wc prints a line labeled "total" which we want to omit. The +0n -1 arguments to sort tell sort to sort lines based on the first column, and to treat that column as a number rather than text.)

Temporary files

It is sometimes convenient to generate results using multiple calls to p2 in which earlier p2 calls leave temporary files to be used by later calls. Let's look at an example.

Let's say we wanted to find missing DAT files. Here's how we might do it. First, we might make a list of all the ARC files, stripping off the .arc.gz:

] p2 'echo /[0-3]/*.arc.gz | xargs ls | sed "s/.arc.gz//" \
  | sort | av_prepend `hostname` > /0/tmp/raymie1' -p $MACHINES
P begun : ...
P end : ...
That took  ... wallclock secs ( ... )
Notice the use of av_prepend. This sticks some fixed text -- in this case, the name of the host -- in front of each line. If there are any differences, then, when we look at output, this will help us figure out where those differences are.

Now, we do the same thing for DAT files:

] p2 'echo /[0-3]/*.dat.gz | xargs ls | sed "s/.dat.gz//" | sort | av_prepend `hostname` > /0/tmp/raymie2' -p $MACHINES
P begun : ...
P end : ...
That took  ... wallclock secs ( ... )
Finally, we compare the results:
] p2 'diff /0/tmp/raymie[12]' -p $MACHINES
P begun : ...
P end : ...
That took  ... wallclock secs ( ... )

This shows that there are no missing .dat files.

The following is important. You must clean up your temporary files!!!

] p2 'rm /0/tmp/raymie[12]' -p $MACHINES
P begun : ...
P end : ...
That took  ... wallclock secs ( ... )

More realistic example

The above examples deliberately avoided reading the actual data. To simply uncompress to /dev/null all the data in the DAT files alone takes over two hours. Thus, you should be really, really, really that your p2 command line does what you want before you submit it.

To build certainty in your program/scripts before submitting a full run, it is wise to follow the following process:

  1. Without using p2, run your command against one or a few ARC/DAT files and check that the output is reasonable.

  2. After that, use p2 to run your command against a small number of ARC/DAT files on each machine.

  3. If everything is working, do the full run.
Let's follow this procedure to solve the following problem: count the number of outlinks pointing to each top-level domain (e.g., .com, .org, etc.).

We can solve this problem looking at DAT files because they contain link information. In particular, for each outlink, a DAT file contains a line of the form: l host[:port]/path So the basic idea is to grep for lines starting with l, extract the domain from those lines, and then count for each domain. Let's start by picking a handfull of DAT files to play with. Log into any cluster machine and build a file containing five random DAT files:

] ssh ia00102
] cd /0
] ls *.dat.gz | av_randomize | head -5 > ~/files.txt
Now, let's build a command line that will do the counting we want on just those files. Extracting the lines of interest is straight forward. From there, Perl would be a great language in which to do the counting. If you don't know Perl, one can actually manage to do this using a pipeline of command-line tools:
] cat ~/files.txt | xargs -n 2 zcat | egrep "^l " \
  | sed "s/^l \([^:/]*\).*/\1/" \
  | av_reverse | cut -d . -f 1 | av_reverse \
  | sort | uniq -c | sort +0n -1

Note the -n 2 in xargs: this tells xargs to give its command a maximum of two arguments. This forces at least two executions of zcat. Forcing such a break-up is a good idea during testing. Such breaks often cause unexpected problems with pipelines which occur during large runs but will not occur during small runs unless you force the issue.

After checking the output a bit, we gain confidence and move to the next level, which is to test it on a few files on each machine in the cluster:

] p2 'echo /[0-3]/*.dat.gz |xargs ls| av_randomize |head -5 > ~/f$I'\
  -p $MACHINES
] p2 'cat ~/f$I | xargs -n 2 zcat | egrep "^l " \
      | sed "s/^l \([^:/]*\).*/\1/" \
      | av_reverse | cut -d . -f 1 | av_reverse | sort ' \
     -c 'av_sort -m' \
     -p $MACHINES \
  | uniq -c | sort +0n -1
Again, a little Perl scripting would make for a much more efficient computation. However, the above approach is good for illustrating the use of vaious p2 features discussed above and also the power of the Unix text-processing tools.
During the lab, please do not attempt to run process significant amounts of data!