Simple, Fast, Easy Parallelism in Shell Pipelines
The typical shell1 pipeline looks something like this:
src | worker | sink
Usually src
will output lines of data,
and worker
acts as a filter,
processing each line and sending some transformed lines to sink
.
So there is a possibility of parallel processing on the basis of lines:
We could have multiple worker
tasks2 which each process different lines of data from src
and output transformed lines to sink
.3
Of course, there already exists xargs
and GNU parallel4,
which have the ability to run multiple tasks simultaneously.
What's the difference?
- When
xargs
runs a command, it provides a line of input as an argument to that command. But the typical data-processing Unix command reads lines of input from stdin, not from its arguments on the command line, so many commands simply don't make sense to use withxargs
. xargs
starts a new task2 for each line of input. That is impractical for programs with a slow initialization. For example, this is painful for programs that open a network connection.- Because a new task is started for every line, workers can't keep state about the data they have seen.
For example,
uniq
cannot be done in parallel withxargs
. This ability to keep state as input is processed is useful for many, many data-processing tasks.
Overall, xargs
is mainly useful for executing a command repeatedly with a varying set of arguments, not processing a lot of data.
A technique that allows
a pool of worker tasks2, executing in parallel,
to process incoming lines as they arrive on stdin,
would be strictly more general.
You could even run xargs
within such a technique, or nest such a technique within itself;
you can't run xargs
from xargs
.
Writing a parallel pipeline in any shell looks like this:3
src | { worker & worker & worker & } | sink
This will start up three workers in the background,
which will all read data from src in parallel,
and write output to sink.
The output of src
is not copied to all the workers; each byte of output becomes input to exactly one worker.5
Since all the worker tasks are reading input at the same time, we might run into a common concurrency issue: one worker task might get the first part of a line, while the rest of the line goes to another worker task. This is called "interleaving", and if we allowed it to happen it would cause the input to the workers to be completely corrupt.
Here's an example of using this parallel processing technique without protecting against interleaving.
Note that in bash, we need to place <&0
in front of the first background command.
This just means "this command should read input from file descriptor 0; that is, stdin",
which happens by default in most shells,
but won't happen in bash due to a bug that will be fixed in the next release.
yes ooo | head -n 8 | pv --quiet --rate-limit 4 | { <&0 tr o z & tr o x & }
x |
x |
zzzzzz |
zzz |
zzz |
zzz |
x |
zzz |
xxx |
Pretty severe interleaving! We used pv to throttle the input going to the workers to increase the amount of interleaving that would happen.
To deal with the issue of interleaving, we need to introduce two new commands.
pad
will pad incoming lines of data to a fixed size,
and unpad
removes that padding.
Then we make the following small modification:
src | pad | { unpad | worker & unpad | worker & unpad | worker & } | sink
Communicating with fixed-size blocks means that pad
and unpad
will never interleave,
due to pipe atomicity.
This fixed size must be determined in advance,
and should be at least as large as the largest possible line that src
can emit.
There is an upper limit6 on this fixed size;
on Linux, it's 4096 bytes;
and if a line is longer than that, it will be truncated to 4096 bytes, discarding all extra data.
If your lines are longer than 4096 bytes, you will lose data!
But most people don't work with lines that are anywhere close to 4096 bytes long,
so they have little to worry about if they are careful.
If you do work with such long lines then you can work around the limit in any number of ways.7
pad
and unpad
can be defined as follows:8, 6
# pad to the maximize size we can do and still be atomic on this system pipe_buf=$(getconf PIPE_BUF /) function pad() { # redirect stderr (file descriptor 2) to /dev/null to get rid of noise dd conv=block cbs=$pipe_buf obs=$pipe_buf 2>/dev/null } function unpad() { dd conv=unblock cbs=$pipe_buf ibs=$pipe_buf 2>/dev/null }
For convenience, you could insert this snippet into your .bashrc
.
Let's look at a fixed version:
yes ooo | head -n 8 | pv --quiet --rate-limit 4 | pad | { <&0 unpad | tr o z & unpad | tr o x & }
zzz |
zzz |
zzz |
xxx |
xxx |
xxx |
xxx |
xxx |
Great.
Now let's use these capabilities productively. To factor numbers!
# define some giant constants seqs=5000000000000000000 seqe=18000000000000000000 shufn=100000 # generate lines containing $shufn numbers randomly selected from $seqs to $seqe shuf --input-range $seqs-$seqe -n $shufn | pad | { # pad and unpad them, and factor each number <&0 unpad | factor | pad & unpad | factor | pad & unpad | factor | pad & unpad | factor | pad & } | unpad
Since factor
is CPU-bound,
we want to run it on multiple CPU cores at once.
Since these worker tasks will be able to run simultaneously on different cores, this will be substantially faster than the single-worker case.9
We pad
and unpad
the output as well to avoid it being interleaved,
since factor could perform partial writes or writes over the maximum atomic size.
Placing pad
and unpad
on the output is safer unless you know for sure that the output is atomic.10
Try comparing different numbers of workers to the single-worker case:
# define some giant constants seqs=5000000000000000000 seqe=18000000000000000000 shufn=100000 shuf --input-range $seqs-$seqe -n $shufn | factor
Now let's look at a more sophisticated example. I want to send a bunch of HTTPS requests to some server, perhaps for scraping or testing. Python, the usual tool for something like this, is just too slow for my purposes; so I want to use something fast, like a shell script.11 I can just do the following:
# specify the server we're connecting to host="api.example.com" # We will produce our HTTP requests with printf; we will perform a # printf "$format" somenumber # for each input number, which outputs a complete HTTP request to send off format="GET /api/heartbeat/%d HTTP/1.1 Host: $host " function worker() { # unpad the input, pass each line to xargs for printf-formatting, # and pass the resulting request to s_client # use pv to throttle our requests to 8 per second per worker. unpad | pv --quiet --line-mode --rate-limit 8 | \ xargs -n 1 printf "$format" | openssl s_client $host | pad } # generate an endless stream of increasing integers and pad them to a fixed size seq inf | pad | { <&0 worker & worker & worker & worker & } | unpad
And that's all there is to it!
openssl s_client
establishes a TLS connection to the provided host,
then sends stdin to the host and copies the host's replies to stdout.
So this will endlessly send heartbeat GET requests to api.example.com,
in parallel over 4 TCP connections,
and we'll get the results on stdout.
One small caveat, which is unimportant for most usage.
Note that a pipe doesn't directly connect one process to another.
The kernel maintains a buffer for the pipe, called the pipe buffer.
Writes go into the pipe buffer and reads come out of the pipe buffer.
And since each process reads and writes data as quickly as possible,
and unpad
can read and write very quickly indeed,
unpad
might outpace the later parts of the worker pipeline.
In that case, some lines of input would sit idle in the pipe buffer between unpad
and the rest of the pipeline.
This won't normally be a problem,
but if you've exhausted all the input,
then you might have one or several workers with full pipe buffers,
while other workers don't have any more input to process.
Thus at the end of the input, there might be less parallel processing going on than is possible.
Again, only a small issue, but I thought it was best to mention it.
A quick hack around this is to throttle (with pv
) right after unpad
in the worker pipeline,
which limits the amount of pipe buffers that could be filled to just one;
alternatively, throttle before pad
when generating the input.
Ideally, dd
would have a throttling option built-in, which would allow wholly eliminating the problem… I'm working on a patch.
Again one last reminder:
If your lines are over 4096 bytes long, you will lose data if you don't use a workaround.7
And pad
and unpad
can only be omitted from the worker output if you know the worker output is atomic.10
Now, with this caveats,
go forth and use this knowledge to construct high-performance concurrent systems out of shell scripts!
Footnotes:
Everything in this article applies to every normal, Bourne-shell-inspired shell, like bash or zsh.
For the sake of clarity, in this article, I'm using the word "task" instead of "process" to refer to an operating-system process.
If you're worried about interleaving, just read on.
GNU parallel partially, but not fully, does what I describe in this article.
It supports sending lines of input to commands over stdin with its --pipe
argument
and sending multiple lines to the same task with its --round-robin
argument.
And it doesn't have a 4096-byte-long line limit,
and you don't need to worry about wrapping your workers with pad
and unpad
.
But, parallel can only send fixed-size blocks to workers; so if the pipeline is only getting small amounts of work at a time, parallel won't send that work out until it builds up to a larger block. That's an unacceptable limitation for at least my use cases.
If you want all output from src
to be copied to all the workers,
perhaps because you want to run several different commands on the input,
you can use tee
and "process substitution".
src | tee >(worker1) >(worker2) >(worker3) >/dev/null | sink
The output of each worker will go to sink
.
We send the output of tee
to /dev/null
,
since tee will just ouptut the unprocessed data from src
.
You probably want to send the output of each worker to a different place,
rather than all to the same sink.
(If you do want to send them all to the same sink,
you need to pad the output of the worker and unpad before the sink,
as described in this article.)
That kind of advanced redirection of process output is better described elsewhere,
such as the Bash manual (available on your system through info bash
) or
the Bash hackers wiki.
You may also want to look at this StackOverflow question as an alternative way to combine the output of these parallel workers.
From the glibc manual:
Reading or writing pipe data is atomic if the size of data written is not greater than
PIPE_BUF
.
So our upper limit for line length is PIPE_BUF
.
Keep in mind that PIPE_BUF
is completely different from the size of kernel pipe buffer;
the kernel pipe buffer is (presumably) much larger than PIPE_BUF
.
We can get a kernel-minimum value of PIPE_BUF
with getconf PIPE_BUF /
.
We need to pass a path to getconf
because this value can vary on a per-pipe basis,
and getconf
(or more specifically the underlying fpathconf(3p) system call)
gives you the ability to specify the pipe that you are checking PIPE_BUF
for.
For example, you could put the data in a temporary file and pass file paths on each line. Heck, you could even pass the path of a named pipe on each line, and communicate the data that way; that would be a wild hack.
Note that this implementation of pad
and unpad
will strip trailing spaces from lines.
So if for some reason your input depends on the number of spaces at the end of each line,
then you'll need different pad
and unpad
functions.
If you have multiple cores, anyway. But who doesn't, these days?
Using pad
and unpad
on the output of parallel workers is not always necessary,
but it is generally a safer way to do things.
We can control interleaving of the output with other techniques too,
such as stdbuf -oL
,
to force output to be written in units of whole lines,
which will be atomic as long as those lines are short enough.
The foe of atomicity in this context is buffering of the output,
so see this BashFAQ question for more information about controlling or disabling buffering.
Again, if you don't know for sure that the output of the worker is line-buffered,
or otherwise written in atomic-sized chunks,
it is best to use pad
and unpad
for safety.
This is sarcasm, and also mockery of Python. Shell scripts are notoriously slow… yet this shell script is way, way faster than the idiomatic Python solution here. Of course, all of the work here is being done by programs written in C, so it's cheating a bit, but that's what the shell is all about.