No more DSLs: Implement and deploy a distributed system with a single program

Table of Contents

1 Introduction

If you want to write a distributed system, then instead of writing a thousand different programs and configuration files in many different DSLs, you can use an approach I call "single-program systems", and write just one program. In this article, I describe how we do this at $DAYJOB, and I show some example programs. These examples are in Python, but this approach works in any language.

Besides single-program systems, distributed languages also try to allow the user to implement a distributed system as a single program. But these languages assume the presence of lots of distributed systems infrastructure which is maintained outside the language; so a program written in such a language is only one component in a larger distributed system. The single-program system approach, instead, allows you to write a program which manages every aspect of the distributed system above the hardware level, such that running the program deploys the distributed system, and the program and the distributed system can really be said to be the same thing.

At $DAYJOB, each team maintains Python libraries for their components; these libraries are used to write the single-program system. All the component libraries are maintained in a single repository to reduce compatibility issues. Some components are in Python, but others are written in other languages (mostly C/C++/Java) and run in a separate process. In this way, performance-sensitive components of a distributed system are written in faster languages than Python, in the same way that Fortran libraries are used to speed up math in Python programs. A component might communicate with other components, replicate and persist state, or any of the usual distributed system stuff.

The component libraries use normal programming language features, including Python 3's static type system, to handle distributed systems issues. For example, regular function arguments are used to express service discovery and dependencies between services; for each service, there's a service-starting function which takes that service's dependencies as arguments, uses those arguments to run the service, and returns a value that can be passed as an argument (a dependency) to other service starting functions. In general, services are "configured" by passing arguments, in normal code, and multiple instances can be created just by calling functions multiple times with different arguments.

When a service-starting function starts a C/C++/Java process, it starts the process directly as a subprocess, on some host, which the program will continue to monitor directly rather than delegating to a process supervisor service. Functionality for distributed process spawning and monitoring is shared through libraries rather than by delegating to external orchestration systems, making a single-program system completely self-contained. This allows a single-program system to be used for deployment in any environment, whether that's bare-metal, virtual machines, containers, or a developer machine; though different environments and use cases usually implies different programs with different configurations.

You can run the resulting program to run the full system for production, or for testing the system against its users. This program is not a DSL; it doesn't get processed by some tool and turned into a list of actions to perform. Rather, it's run by a normal Python interpreter, and when the program calls start_foo, the program starts up the foo process, right then, directly. When everything is started, the program continues running, monitoring each process and providing interfaces (the Python REPL, UIs, etc) to interact with the running system.

Upgrading the distributed system is a matter of upgrading the single program. Doing this doesn't require downtime. One simple approach is to restart the single program without killing its child processes, and then the new version takes on the responsibility of gradually upgrading the services. Alternatively, more advanced techniques such as dynamic software updating could be used to upgrade the single program without restarting it.

This approach works for almost all distributed systems. Most distributed systems, such as those operated by a single company, have centralized authority; by this I mean that, ultimately, the owners and employees of the company have the authority to do anything they want with the distributed system (though doing anything actually useful might be hard). A single-program system extends this to logically centralized control; that centralized authority is delegated to a single program which controls the system. Systems with centralized authority are easier to operate in many ways, but centralization isn't always possible or desirable. Decentralized systems such as the Internet couldn't be run as a single-program system because they contain many interacting "ultimate authorities" running their own subsystems; each of those subsystems, however, could be run as a single-program system.

2 Writing a single-program system

We'll see this in action by looking at a working example program.

2.1 orderd: an order entry daemon

  • Accepts or rejects orders sent over a TCP listening socket
  • Updates the positiond service with the positions
  • Stores order data in a SQLite database

orderd is a real daemon ("d" is short for "daemon"), with a few details removed. We're looking at orderd specifically because it has only the three dependencies we've already mentioned. Note that orderd itself may or may not be written in Python this is abstracted away from us. (although in practice, it is in fact a separate subprocess)

For our example, we'll write a test of our distributed system. We'll start up orderd and its dependencies (just positiond) for the test, using functions from the component libraries to run each service. First, some testing boilerplate:

import unittest
from orderd import start_orderd

class TestOrderd(unittest.TestCase):
  def setUp(self) -> None:
    # TODO start up orderd and its dependencies
    self.orderd = start_orderd(...)

  def test(self) -> None:
    self.assertTrue("Do test stuff")

To write setUp, we'll proceed by looking at the signature of the start_orderd function, provided by the orderd component library. Note the type annotations for static type checking, introduced by Python 3.

# in the "orderd" module
async def start_orderd(
  nursery: trio.Nursery,
  thread: rsyscall.Thread,
  positiond: positiond.Positiond,
  listening_sock: rsyscall.FileDescriptor,
  database: orderd.Database,
) -> Orderd:

We'll look at the start_orderd signature line by line, creating each argument individually, and at the end we'll call start_orderd and have a running instance of orderd.

The first three lines of the function signature (up to and including thread: rsyscall.Thread,) are essentially common to all service starting functions. The last four lines (starting with positiond: Positiond,) are specific to orderd.

2.2 async def start_orderd(

async def start_orderd(

start_orderd is an async function. In Python, this simply means that it can run concurrently with other functions, which allows us to start services up in parallel, using Python-specific techniques which we won't show in this example. Other than that, it's a completely normal function, which is called with await start_orderd(...) from any other async function, and which blocks execution until it returns.

Since start_orderd is async, we need to run it from an async runner. We'll use the open source library trio for that, which means we'll need to tweak our boilerplate slightly to use TrioTestCase.

from trio_unittest import TrioTestCase

class TestOrderd(TrioTestCase):
  async def asyncSetUp(self) -> None:
    self.orderd = await start_orderd(...)

Other than this change in boilerplate, Python async functions work like any others; you can safely ignore the "async" and "await" annotations. We won't use any async features in this TestCase example; the only use of async features will be later, with start_exampled, when we look at how a component library is implemented.

2.3 nursery: trio.Nursery,

nursery: trio.Nursery,

trio.Nursery is defined by the open source trio library, and it provides the ability to start up functions in the background. We pass it in to start_orderd so that start_orderd can start a function in the background to monitor the running orderd process. If the orderd process exits, the background function monitoring that process will throw, and the resulting exception will be propagated to the trio.Nursery, which will deal with it in some way specific to how the trio.Nursery was produced. Upon seeing an exception in a background function, the logic for a trio.Nursery might call start_orderd again immediately, it might kill the other background functions and start them all up again with start_ functions, or it might ultimately prompt for operator intervention through various means. An operator might then work at a UI or a REPL to fix the issue, by calling start_orderd with different arguments.

In this case, we'll use self.nursery as provided by TrioTestCase, which turns any failure in a background task into a failure of the whole test.

async def asyncSetUp(self) -> None:
  # self.nursery provided by TrioTestCase
  self.orderd = await start_orderd(
    self.nursery,
    ...,
  )

2.4 thread: rsyscall.Thread,

thread: rsyscall.Thread,

rsyscall.Thread is defined by the open source rsyscall library, and it provides the ability to run system calls, including running subprocesses. We pass it in to start_orderd so that start_orderd can start the orderd subprocess, as well as perform other operations to prepare the environment for orderd. An rsyscall.Thread may operate on a local or remote host, or inside a container or VM, or on other kinds of nodes, depending on how the rsyscall.Thread was produced, but it provides a completely common interface regardless of where it runs.

Component library code itself never runs distributed across multiple nodes; there's a single Python interpreter on a single host. All distributed operations are performed by method calls on rsyscall.Thread objects.

In this case, we'll use local_thread imported from rsyscall and assigned to self.thread. local_thread runs on the same thread as the Python interpreter - that is, on localhost.

from rsyscall import local_thread

  async def asyncSetUp(self) -> None:
    self.thread = local_thread
    self.orderd = await start_orderd(
      ..., self.thread, ...,
    )

2.5 positiond: Positiond,

positiond: Positiond,

This is the first orderd-specific argument.

positiond is a service which orderd updates with information about its position. All the information required to connect to and use positiond is contained in the Positiond class.

Since positiond is its own service, we need to use start_positiond to start it.

async def start_positiond(
  nursery: trio.Nursery,
  thread: rsyscall.Thread,
  workdir: rsyscall.Path,
) -> Positiond: ...

The first two arguments are shared with orderd. The third argument, workdir, is unique to positiond. workdir is a path in the filesystem that positiond will use; in this case, positiond will use it to store shared memory communication mechanisms and persistent data.

We'll pass a path in a temporary directory in this example.

# Make a temporary directory
self.tmpdir = await self.thread.mkdtemp()
self.orderd = await start_orderd(
  ...,
  await start_positiond(self.nursery, self.thread, self.tmpdir/"positiond"),
  ...,
)

2.6 database: orderd.Database,

database: orderd.Database,

This is a completely conventional SQLite database, initialized with the orderd schema.

Here, for a test, we're calling orderd.Database.make to make a fresh database, every time. If we wanted to persist state between runs of orderd, we'd pass in a orderd.Database instance from a previous run, recovered from some known path in the filesystem with order.Database.recover(path).

self.orderd = await start_orderd(
  ...,
  await orderd.Database.make(self.thread, self.tmpdir/"db"),
  ...,
)

2.7 listening_sock: FileDescriptor,

listening_sock: FileDescriptor,

This is a listening socket, passed down to the orderd subprocess through file descriptor inheritance, and used to listen for TCP connections.

This is standard Unix socket programming, so we won't go into this in depth; although note that we create this with self.thread, so that it it's on the same host as orderd.

async def asyncSetUp(self) -> None:
  # Make a TCP socket...
  sock = await self.thread.socket(AF.INET, SOCK.STREAM)
  # ...bind to a random port on localhost...
  await sock.bind(await self.thread.ptr(SockaddrIn(0, "127.0.0.1")))
  # ...and start listening.
  await sock.listen(1024)
  self.orderd = await start_orderd(
    ..., sock, ...,
  )

2.8 ) -> Orderd:

) -> Orderd:

Like all good component libraries, start_orderd returns an Orderd class which contains all the information required to connect to Orderd, such as an address and port, a shared memory segment, or a path in the filesystem.

start_orderd, again like all good component libraries, will only return when the orderd communication mechanisms have been fully created, and therefore the Orderd class can be immediately used to connect to orderd.

2.9 Full example

Here's the full, working example:

class TestOrderd(TrioTestCase):
  async def asyncSetUp(self) -> None:
    # self.nursery provided by TrioTestCase
    self.thread = local_thread
    self.tmpdir = await self.thread.mkdtemp()
    sock = await self.thread.socket(AF.INET, SOCK.STREAM)
    await sock.bind(await self.thread.ptr(SockaddrIn(0, "127.0.0.1")))
    await sock.listen(1024)
    self.orderd = await start_orderd(
      self.nursery, self.thread, 
      await start_positiond(self.nursery, self.thread, self.tmpdir/"positiond")
      await Database.make(self.thread, self.tmpdir/"db"),
      sock,
    )

Then we can proceed to test by running user code.

3 Implementation of component libraries

Now we'll step through a working example of how a component library is implemented. This one shells out to a separate process, exampled.

This daemon is packaged and deployed with Nix; at $DAYJOB we use a proprietary package manager with similar APIs.

Below is the full code for the exampled component library, with comments inline to explain it.

import nix_rsyscall
import rsyscall
import trio
# a Nix-specific generated module, containing the information required
# to deploy the exampled package; generated by setup.py.
import exampled._nixdep

class Exampled:
    def __init__(self, workdir: rsyscall.Path) -> None:
        self.workdir = workdir

async def start_exampled(
    nursery: trio.Nursery,
    thread: rsyscall.Thread,
    workdir: rsyscall.Path,
) -> Exampled:
    # deploy the exampled package and its dependencies; this doesn't deploy the
    # package for this Python library, but rather the exampled daemon
    package = await nix_rsyscall.deploy(thread, exampled._nixdep.closure)
    # build the command to actually run
    command = package.bin('exampled').args("--verbose", "--do-stuff-fast")
    # make the thread that we'll run that exampled command in;
    # this child_thread is a process under our control, see http://rsyscall.org
    child_thread = await thread.clone()
    # change the CWD of the child thread; CWD is inherited over exec, so it will be used by exampled
    await child_thread.mkdir(workdir)
    await child_thread.chdir(workdir)
    # exec the command in the child thread; this exec helper method returns a monitorable child process object
    child_process = await child_thread.exec(command)
    # monitor the child process in the background; see https://trio.readthedocs.io/
    # we'll get an exception if it exits uncleanly; this is our one use of async features.
    nursery.start_soon(child_process.check)
    # return a class containing exampled's communication mechanisms;
    # it communicates with the world only by creating files under `workdir'
    return Exampled(workdir)

4 Conclusion

A single-program system implements an entire distributed system as a single program, delegating to libraries to share functionality and subprocesses to improve performance.

The alternative is writing many programs and many configuration files in many different DSLs: Kubernetes, Helm, Terraform, Ansible, systemd, cron jobs, shell scripts, configuration files in JSON, TOML, YAML, CSV, INI, etc, etc, etc. I think the advantages of the single-program approach are self-explanatory.

The techniques I use for single-program systems are explained in greater detail in other articles linked in the introduction. With those techniques, and with the open source libraries rsyscall and trio, anyone can write a single-program system.

If you're interested in working on such systems at $DAYJOB, we're hiring; here's a job description.

Created: 2021-09-20 Mon 13:38

Validate