No more DSLs: Implement and deploy a distributed system with a single program
Table of Contents
1 Introduction
If you want to write a distributed system, then instead of writing a thousand different programs and configuration files in many different DSLs, you can use an approach I call "single-program systems", and write just one program. In this article, I describe how we do this at $DAYJOB, and I show some example programs. These examples are in Python, but this approach works in any language.
Besides single-program systems, distributed languages also try to allow the user to implement a distributed system as a single program. But these languages assume the presence of lots of distributed systems infrastructure which is maintained outside the language; so a program written in such a language is only one component in a larger distributed system. The single-program system approach, instead, allows you to write a program which manages every aspect of the distributed system above the hardware level, such that running the program deploys the distributed system, and the program and the distributed system can really be said to be the same thing.
At $DAYJOB, each team maintains Python libraries for their components; these libraries are used to write the single-program system. All the component libraries are maintained in a single repository to reduce compatibility issues. Some components are in Python, but others are written in other languages (mostly C/C++/Java) and run in a separate process. In this way, performance-sensitive components of a distributed system are written in faster languages than Python, in the same way that Fortran libraries are used to speed up math in Python programs. A component might communicate with other components, replicate and persist state, or any of the usual distributed system stuff.
The component libraries use normal programming language features, including Python 3's static type system, to handle distributed systems issues. For example, regular function arguments are used to express service discovery and dependencies between services; for each service, there's a service-starting function which takes that service's dependencies as arguments, uses those arguments to run the service, and returns a value that can be passed as an argument (a dependency) to other service starting functions. In general, services are "configured" by passing arguments, in normal code, and multiple instances can be created just by calling functions multiple times with different arguments.
When a service-starting function starts a C/C++/Java process, it starts the process directly as a subprocess, on some host, which the program will continue to monitor directly rather than delegating to a process supervisor service. Functionality for distributed process spawning and monitoring is shared through libraries rather than by delegating to external orchestration systems, making a single-program system completely self-contained. This allows a single-program system to be used for deployment in any environment, whether that's bare-metal, virtual machines, containers, or a developer machine; though different environments and use cases usually implies different programs with different configurations.
You can run the resulting program to run the full system for production,
or for testing the system against its users.
This program is not a DSL;
it doesn't get processed by some tool and turned into a list of actions to perform.
Rather, it's run by a normal Python interpreter,
and when the program calls start_foo
,
the program starts up the foo
process, right then, directly.
When everything is started,
the program continues running, monitoring each process
and providing interfaces (the Python REPL, UIs, etc)
to interact with the running system.
Upgrading the distributed system is a matter of upgrading the single program. Doing this doesn't require downtime. One simple approach is to restart the single program without killing its child processes, and then the new version takes on the responsibility of gradually upgrading the services. Alternatively, more advanced techniques such as dynamic software updating could be used to upgrade the single program without restarting it.
This approach works for almost all distributed systems. Most distributed systems, such as those operated by a single company, have centralized authority; by this I mean that, ultimately, the owners and employees of the company have the authority to do anything they want with the distributed system (though doing anything actually useful might be hard). A single-program system extends this to logically centralized control; that centralized authority is delegated to a single program which controls the system. Systems with centralized authority are easier to operate in many ways, but centralization isn't always possible or desirable. Decentralized systems such as the Internet couldn't be run as a single-program system because they contain many interacting "ultimate authorities" running their own subsystems; each of those subsystems, however, could be run as a single-program system.
2 Writing a single-program system
We'll see this in action by looking at a working example program.
2.1 orderd: an order entry daemon
- Accepts or rejects orders sent over a TCP listening socket
- Updates the
positiond
service with the positions - Stores order data in a SQLite database
orderd
is a real daemon ("d" is short for "daemon"), with a few details removed.
We're looking at orderd
specifically
because it has only the three dependencies we've already mentioned.
Note that orderd
itself may or may not be written in Python
this is abstracted away from us.
(although in practice, it is in fact a separate subprocess)
For our example, we'll write a test of our distributed system.
We'll start up orderd
and its dependencies (just positiond
) for the test,
using functions from the component libraries to run each service.
First, some testing boilerplate:
import unittest from orderd import start_orderd class TestOrderd(unittest.TestCase): def setUp(self) -> None: # TODO start up orderd and its dependencies self.orderd = start_orderd(...) def test(self) -> None: self.assertTrue("Do test stuff")
To write setUp
,
we'll proceed by looking at the signature of the start_orderd
function,
provided by the orderd
component library.
Note the type annotations for static type checking, introduced by Python 3.
# in the "orderd" module async def start_orderd( nursery: trio.Nursery, thread: rsyscall.Thread, positiond: positiond.Positiond, listening_sock: rsyscall.FileDescriptor, database: orderd.Database, ) -> Orderd:
We'll look at the start_orderd
signature line by line,
creating each argument individually,
and at the end we'll call start_orderd
and have a running instance of orderd
.
The first three lines of the function signature
(up to and including thread: rsyscall.Thread,
)
are essentially common to all service starting functions.
The last four lines
(starting with positiond: Positiond,
)
are specific to orderd
.
2.2 async def start_orderd(
async def start_orderd(
start_orderd
is an async function.
In Python, this simply means that it can run concurrently with other functions,
which allows us to start services up in parallel,
using Python-specific techniques which we won't show in this example.
Other than that, it's a completely normal function,
which is called with await start_orderd(...)
from any other async function,
and which blocks execution until it returns.
Since start_orderd
is async, we need to run it from an async runner.
We'll use the open source library trio
for that,
which means we'll need to tweak our boilerplate slightly to use TrioTestCase
.
from trio_unittest import TrioTestCase class TestOrderd(TrioTestCase): async def asyncSetUp(self) -> None: self.orderd = await start_orderd(...)
Other than this change in boilerplate,
Python async functions work like any others;
you can safely ignore the "async" and "await" annotations.
We won't use any async features in this TestCase
example;
the only use of async features will be later, with start_exampled
,
when we look at how a component library is implemented.
2.3 nursery: trio.Nursery,
nursery: trio.Nursery,
trio.Nursery
is defined by the open source trio
library,
and it provides the ability to start up functions in the background.
We pass it in to start_orderd
so that start_orderd
can start a function in the background
to monitor the running orderd
process.
If the orderd
process exits, the background function monitoring that process will throw,
and the resulting exception will be propagated to the trio.Nursery
,
which will deal with it in some way specific to how the trio.Nursery
was produced.
Upon seeing an exception in a background function,
the logic for a trio.Nursery
might call start_orderd
again immediately,
it might kill the other background functions and start them all up again with start_
functions,
or it might ultimately prompt for operator intervention through various means.
An operator might then work at a UI or a REPL to fix the issue,
by calling start_orderd
with different arguments.
In this case, we'll use self.nursery
as provided by TrioTestCase
,
which turns any failure in a background task into a failure of the whole test.
async def asyncSetUp(self) -> None: # self.nursery provided by TrioTestCase self.orderd = await start_orderd( self.nursery, ..., )
2.4 thread: rsyscall.Thread,
thread: rsyscall.Thread,
rsyscall.Thread
is defined by the open source rsyscall
library,
and it provides the ability to run system calls, including running subprocesses.
We pass it in to start_orderd
so that start_orderd
can start the orderd
subprocess,
as well as perform other operations to prepare the environment for orderd
.
An rsyscall.Thread
may operate on a local or remote host,
or inside a container or VM, or on other kinds of nodes,
depending on how the rsyscall.Thread
was produced,
but it provides a completely common interface regardless of where it runs.
Component library code itself never runs distributed across multiple nodes;
there's a single Python interpreter on a single host.
All distributed operations are performed by method calls on rsyscall.Thread
objects.
In this case, we'll use local_thread
imported from rsyscall
and assigned to self.thread
.
local_thread
runs on the same thread as the Python interpreter - that is, on localhost.
from rsyscall import local_thread async def asyncSetUp(self) -> None: self.thread = local_thread self.orderd = await start_orderd( ..., self.thread, ..., )
2.5 positiond: Positiond,
positiond: Positiond,
This is the first orderd
-specific argument.
positiond
is a service which orderd
updates with information about its position.
All the information required to connect to and use positiond
is contained in the Positiond
class.
Since positiond
is its own service, we need to use start_positiond
to start it.
async def start_positiond( nursery: trio.Nursery, thread: rsyscall.Thread, workdir: rsyscall.Path, ) -> Positiond: ...
The first two arguments are shared with orderd
.
The third argument, workdir
, is unique to positiond.
workdir
is a path in the filesystem that positiond
will use;
in this case, positiond
will use it
to store shared memory communication mechanisms and persistent data.
We'll pass a path in a temporary directory in this example.
# Make a temporary directory self.tmpdir = await self.thread.mkdtemp() self.orderd = await start_orderd( ..., await start_positiond(self.nursery, self.thread, self.tmpdir/"positiond"), ..., )
2.6 database: orderd.Database,
database: orderd.Database,
This is a completely conventional SQLite database, initialized with the orderd schema.
Here, for a test, we're calling orderd.Database.make
to make a fresh database, every time.
If we wanted to persist state between runs of orderd
,
we'd pass in a orderd.Database
instance from a previous run,
recovered from some known path in the filesystem with order.Database.recover(path)
.
self.orderd = await start_orderd( ..., await orderd.Database.make(self.thread, self.tmpdir/"db"), ..., )
2.7 listening_sock: FileDescriptor,
listening_sock: FileDescriptor,
This is a listening socket,
passed down to the orderd
subprocess through file descriptor inheritance,
and used to listen for TCP connections.
This is standard Unix socket programming, so we won't go into this in depth;
although note that we create this with self.thread
,
so that it it's on the same host as orderd
.
async def asyncSetUp(self) -> None: # Make a TCP socket... sock = await self.thread.socket(AF.INET, SOCK.STREAM) # ...bind to a random port on localhost... await sock.bind(await self.thread.ptr(SockaddrIn(0, "127.0.0.1"))) # ...and start listening. await sock.listen(1024) self.orderd = await start_orderd( ..., sock, ..., )
2.8 ) -> Orderd:
) -> Orderd:
Like all good component libraries,
start_orderd
returns an Orderd
class
which contains all the information required to connect to Orderd
,
such as an address and port, a shared memory segment, or a path in the filesystem.
start_orderd
, again like all good component libraries,
will only return when the orderd
communication mechanisms have been fully created,
and therefore the Orderd
class can be immediately used to connect to orderd
.
2.9 Full example
Here's the full, working example:
class TestOrderd(TrioTestCase): async def asyncSetUp(self) -> None: # self.nursery provided by TrioTestCase self.thread = local_thread self.tmpdir = await self.thread.mkdtemp() sock = await self.thread.socket(AF.INET, SOCK.STREAM) await sock.bind(await self.thread.ptr(SockaddrIn(0, "127.0.0.1"))) await sock.listen(1024) self.orderd = await start_orderd( self.nursery, self.thread, await start_positiond(self.nursery, self.thread, self.tmpdir/"positiond") await Database.make(self.thread, self.tmpdir/"db"), sock, )
Then we can proceed to test by running user code.
3 Implementation of component libraries
Now we'll step through a working example of how a component library is implemented.
This one shells out to a separate process, exampled
.
This daemon is packaged and deployed with Nix; at $DAYJOB we use a proprietary package manager with similar APIs.
Below is the full code for the exampled
component library,
with comments inline to explain it.
import nix_rsyscall import rsyscall import trio # a Nix-specific generated module, containing the information required # to deploy the exampled package; generated by setup.py. import exampled._nixdep class Exampled: def __init__(self, workdir: rsyscall.Path) -> None: self.workdir = workdir async def start_exampled( nursery: trio.Nursery, thread: rsyscall.Thread, workdir: rsyscall.Path, ) -> Exampled: # deploy the exampled package and its dependencies; this doesn't deploy the # package for this Python library, but rather the exampled daemon package = await nix_rsyscall.deploy(thread, exampled._nixdep.closure) # build the command to actually run command = package.bin('exampled').args("--verbose", "--do-stuff-fast") # make the thread that we'll run that exampled command in; # this child_thread is a process under our control, see http://rsyscall.org child_thread = await thread.clone() # change the CWD of the child thread; CWD is inherited over exec, so it will be used by exampled await child_thread.mkdir(workdir) await child_thread.chdir(workdir) # exec the command in the child thread; this exec helper method returns a monitorable child process object child_process = await child_thread.exec(command) # monitor the child process in the background; see https://trio.readthedocs.io/ # we'll get an exception if it exits uncleanly; this is our one use of async features. nursery.start_soon(child_process.check) # return a class containing exampled's communication mechanisms; # it communicates with the world only by creating files under `workdir' return Exampled(workdir)
4 Conclusion
A single-program system implements an entire distributed system as a single program, delegating to libraries to share functionality and subprocesses to improve performance.
The alternative is writing many programs and many configuration files in many different DSLs: Kubernetes, Helm, Terraform, Ansible, systemd, cron jobs, shell scripts, configuration files in JSON, TOML, YAML, CSV, INI, etc, etc, etc. I think the advantages of the single-program approach are self-explanatory.
The techniques I use for single-program systems are explained in greater detail in other articles linked in the introduction. With those techniques, and with the open source libraries rsyscall and trio, anyone can write a single-program system.
If you're interested in working on such systems at $DAYJOB, we're hiring; here's a job description.