Simple, Fast, Easy Parallelism in Shell Pipelines

The typical shell1 pipeline looks something like this:

src | worker | sink

Usually src will output lines of data, and worker acts as a filter, processing each line and sending some transformed lines to sink. So there is a possibility of parallel processing on the basis of lines: We could have multiple worker tasks2 which each process different lines of data from src and output transformed lines to sink.3

Of course, there already exists xargs and GNU parallel4, which have the ability to run multiple tasks simultaneously. What's the difference?

Overall, xargs is mainly useful for executing a command repeatedly with a varying set of arguments, not processing a lot of data. A technique that allows a pool of worker tasks2, executing in parallel, to process incoming lines as they arrive on stdin, would be strictly more general. You could even run xargs within such a technique, or nest such a technique within itself; you can't run xargs from xargs.

Writing a parallel pipeline in any shell looks like this:3

src | { worker & 
        worker & 
        worker & } | sink

This will start up three workers in the background, which will all read data from src in parallel, and write output to sink. The output of src is not copied to all the workers; each byte of output becomes input to exactly one worker.5

Since all the worker tasks are reading input at the same time, we might run into a common concurrency issue: one worker task might get the first part of a line, while the rest of the line goes to another worker task. This is called "interleaving", and if we allowed it to happen it would cause the input to the workers to be completely corrupt.

Here's an example of using this parallel processing technique without protecting against interleaving. Note that in bash, we need to place <&0 in front of the first background command. This just means "this command should read input from file descriptor 0; that is, stdin", which happens by default in most shells, but won't happen in bash due to a bug that will be fixed in the next release.

yes ooo | head -n 8 | pv --quiet --rate-limit 4 | { <&0 tr o z & 
                                                        tr o x & }
x
x
zzzzzz
zzz
zzz
zzz
x
zzz
xxx

Pretty severe interleaving! We used pv to throttle the input going to the workers to increase the amount of interleaving that would happen.

To deal with the issue of interleaving, we need to introduce two new commands. pad will pad incoming lines of data to a fixed size, and unpad removes that padding. Then we make the following small modification:

src | pad | { unpad | worker &
              unpad | worker & 
              unpad | worker & } | sink

Communicating with fixed-size blocks means that pad and unpad will never interleave, due to pipe atomicity. This fixed size must be determined in advance, and should be at least as large as the largest possible line that src can emit. There is an upper limit6 on this fixed size; on Linux, it's 4096 bytes; and if a line is longer than that, it will be truncated to 4096 bytes, discarding all extra data. If your lines are longer than 4096 bytes, you will lose data! But most people don't work with lines that are anywhere close to 4096 bytes long, so they have little to worry about if they are careful. If you do work with such long lines then you can work around the limit in any number of ways.7

pad and unpad can be defined as follows:8, 6

# pad to the maximize size we can do and still be atomic on this system
pipe_buf=$(getconf PIPE_BUF /)
function pad() {
    # redirect stderr (file descriptor 2) to /dev/null to get rid of noise
    dd conv=block cbs=$pipe_buf obs=$pipe_buf 2>/dev/null
}
function unpad() {
    dd conv=unblock cbs=$pipe_buf ibs=$pipe_buf 2>/dev/null
}

For convenience, you could insert this snippet into your .bashrc.

Let's look at a fixed version:

yes ooo | head -n 8 | pv --quiet --rate-limit 4 | pad | { <&0 unpad | tr o z & 
                                                              unpad | tr o x & }
zzz
zzz
zzz
xxx
xxx
xxx
xxx
xxx

Great.

Now let's use these capabilities productively. To factor numbers!

# define some giant constants
seqs=5000000000000000000
seqe=18000000000000000000
shufn=100000
# generate lines containing $shufn numbers randomly selected from $seqs to $seqe
shuf --input-range $seqs-$seqe -n $shufn | pad | {
    # pad and unpad them, and factor each number
    <&0 unpad | factor | pad &
    unpad | factor | pad &
    unpad | factor | pad &
    unpad | factor | pad &
} | unpad

Since factor is CPU-bound, we want to run it on multiple CPU cores at once. Since these worker tasks will be able to run simultaneously on different cores, this will be substantially faster than the single-worker case.9 We pad and unpad the output as well to avoid it being interleaved, since factor could perform partial writes or writes over the maximum atomic size. Placing pad and unpad on the output is safer unless you know for sure that the output is atomic.10

Try comparing different numbers of workers to the single-worker case:

# define some giant constants
seqs=5000000000000000000
seqe=18000000000000000000
shufn=100000
shuf --input-range $seqs-$seqe -n $shufn | factor

Now let's look at a more sophisticated example. I want to send a bunch of HTTPS requests to some server, perhaps for scraping or testing. Python, the usual tool for something like this, is just too slow for my purposes; so I want to use something fast, like a shell script.11 I can just do the following:

# specify the server we're connecting to
host="api.example.com"
# We will produce our HTTP requests with printf; we will perform a
# printf "$format" somenumber
# for each input number, which outputs a complete HTTP request to send off
format="GET /api/heartbeat/%d HTTP/1.1
Host: $host

"
function worker() {
    # unpad the input, pass each line to xargs for printf-formatting, 
    # and pass the resulting request to s_client
    # use pv to throttle our requests to 8 per second per worker.
    unpad | pv --quiet --line-mode --rate-limit 8 |  \
    xargs -n 1 printf "$format" | openssl s_client $host | pad
}
# generate an endless stream of increasing integers and pad them to a fixed size
seq inf | pad | {
<&0 worker &
    worker &
    worker &
    worker &
} | unpad

And that's all there is to it! openssl s_client establishes a TLS connection to the provided host, then sends stdin to the host and copies the host's replies to stdout. So this will endlessly send heartbeat GET requests to api.example.com, in parallel over 4 TCP connections, and we'll get the results on stdout.

One small caveat, which is unimportant for most usage. Note that a pipe doesn't directly connect one process to another. The kernel maintains a buffer for the pipe, called the pipe buffer. Writes go into the pipe buffer and reads come out of the pipe buffer. And since each process reads and writes data as quickly as possible, and unpad can read and write very quickly indeed, unpad might outpace the later parts of the worker pipeline. In that case, some lines of input would sit idle in the pipe buffer between unpad and the rest of the pipeline.

This won't normally be a problem, but if you've exhausted all the input, then you might have one or several workers with full pipe buffers, while other workers don't have any more input to process. Thus at the end of the input, there might be less parallel processing going on than is possible. Again, only a small issue, but I thought it was best to mention it. A quick hack around this is to throttle (with pv) right after unpad in the worker pipeline, which limits the amount of pipe buffers that could be filled to just one; alternatively, throttle before pad when generating the input. Ideally, dd would have a throttling option built-in, which would allow wholly eliminating the problem… I'm working on a patch.

Again one last reminder: If your lines are over 4096 bytes long, you will lose data if you don't use a workaround.7 And pad and unpad can only be omitted from the worker output if you know the worker output is atomic.10 Now, with this caveats, go forth and use this knowledge to construct high-performance concurrent systems out of shell scripts!

But wait, what about GNU parallel? Actually, it turns out that GNU parallel does give us this capability. It supports sending lines of input to commands over stdin with its --pipe argument and sending multiple lines to the same task with its --round-robin argument. And it doesn't have a 4096-byte-long line limit, and you don't need to worry about wrapping your workers with pad and unpad. So, you should probably just use GNU parallel. :)

Footnotes:

1

Everything in this article applies to every normal, Bourne-shell-inspired shell, like bash or zsh.

2

For the sake of clarity, in this article, I'm using the word "task" instead of "process" to refer to an operating-system process.

3

If you're worried about interleaving, just read on.

4

Spoilers: GNU parallel does what I describe in this article, but better - skip to the end if you want to read about it.

5

If you want all output from src to be copied to all the workers, perhaps because you want to run several different commands on the input, you can use tee and "process substitution".

src | tee >(worker1) >(worker2) >(worker3) >/dev/null | sink

The output of each worker will go to sink. We send the output of tee to /dev/null, since tee will just ouptut the unprocessed data from src.

You probably want to send the output of each worker to a different place, rather than all to the same sink. (If you do want to send them all to the same sink, you need to pad the output of the worker and unpad before the sink, as described in this article.) That kind of advanced redirection of process output is better described elsewhere, such as the Bash manual (available on your system through info bash) or the Bash hackers wiki. You may also want to look at this StackOverflow question as an alternative way to combine the output of these parallel workers.

6

From the glibc manual:

Reading or writing pipe data is atomic if the size of data written is not greater than PIPE_BUF.

So our upper limit for line length is PIPE_BUF. Keep in mind that PIPE_BUF is completely different from the size of kernel pipe buffer; the kernel pipe buffer is (presumably) much larger than PIPE_BUF.

We can get a kernel-minimum value of PIPE_BUF with getconf PIPE_BUF /. We need to pass a path to getconf because this value can vary on a per-pipe basis, and getconf (or more specifically the underlying fpathconf(3p) system call) gives you the ability to specify the pipe that you are checking PIPE_BUF for.

7

For example, you could put the data in a temporary file and pass file paths on each line. Heck, you could even pass the path of a named pipe on each line, and communicate the data that way; that would be a wild hack.

8

Note that this implementation of pad and unpad will strip trailing spaces from lines. So if for some reason your input depends on the number of spaces at the end of each line, then you'll need different pad and unpad functions.

9

If you have multiple cores, anyway. But who doesn't, these days?

10

Using pad and unpad on the output of parallel workers is not always necessary, but it is generally a safer way to do things. We can control interleaving of the output with other techniques too, such as stdbuf -oL, to force output to be written in units of whole lines, which will be atomic as long as those lines are short enough. The foe of atomicity in this context is buffering of the output, so see this BashFAQ question for more information about controlling or disabling buffering. Again, if you don't know for sure that the output of the worker is line-buffered, or otherwise written in atomic-sized chunks, it is best to use pad and unpad for safety.

11

This is sarcasm, and also mockery of Python. Shell scripts are notoriously slow… yet this shell script is way, way faster than the idiomatic Python solution here. Of course, all of the work here is being done by programs written in C, so it's cheating a bit, but that's what the shell is all about.

Created: 2016-06-14 Tue 17:30

Emacs 24.5.1 (Org mode 8.2.10)

Validate