Category Archives: Concurrent Programming

Time in Distributed Systems: Lamport Timestamps

What I do in my day job is working on infrastructure systems at dropbox. That means that I’m neck-deep in distributed systems programming – that’s my bread and butter. One of the problems that comes up over and over again in distributed systems is time.

In distributed systems, time is a problem. Each computer has a clock built in, but those clocks are independent. The clocks on different machines can vary quite a bit. If a human being is setting them, then they’re probably at best accurate to one second. Even using a protocol like NTP, which synchronizes clocks between different computers, you can only get the clocks accurate to within about a millisecond of each other.

That sounds pretty good. In human timescales, a millisecond is a nearly imperceptible time interval. But to a modern computer, which can execute billions of instructions per second, it’s a long time: long enough to execute a million instructions! To get a sense of how much time that is to a computer, I just measured the time it took to take all of the source code for my main project, compile it from scratch, and execute all of its tests: it took 26 milliseconds.

That’s a lot of work. On the scale of a machine running billions of instructions per second, a millisecond is a long time.

Why does that matter?

For a lot of things that we want to do with a collection of computers, we need to know what event happened first. This comes up in lots of different contexts. The simplest one to explain is a shared resource locked by a mutex.

A mutex is a mutual exclusion lock. It’s basically a control that only allows one process to access some shared resource at a time. For example, you could think of a database that a bunch of processes all talk to. To make an update, a process P needs to send a request asking for access. If no one is using it when the server receives the request, it will give a lock to P, and and then block anyone else from accessing it until P is done. Anyone else who asks for access to the the database will have to wait. When P is done, it releases the lock on the mutex, and then if there’s any processes waiting, the database will choose one, and give it the lock.

Here’s where time comes into things. How do you decide who to give the lock to? You could give it to whoever you received the request from first, using the time on the database host. But that doesn’t always work well. It could easily end up with hosts with a lower-bandwidth connection to the server getting far worse service than a a closer host.

You get better fairness by using “send time” – that is, the time that the request was sent to the server by the client. But that’s where the clock issue comes up. Different machines don’t agree perfectly on the current time. If you use their clocktime to determine gets the lock first, then a machine with a slow clock will always get access before one with a fast clock. What you need is some fair way of determining ordering by some kind of timestamp that’s fair.

There are a couple of algorithms for creating some notion of a clock or timestamp that’s fair and consistent. The simplest one, which we’ll look at in this post, is called Lamport timestamps. It’s impressively simple, but it works really well. I’ve seen it used in real-world implementations of Paxos at places like Google. So it’s simple, but it’s serious.

The idea of Lamport timestamps is to come up with a mechanism that defines a partial order over events in a distributed system. What it defines is a causal ordering: that is, for any two events, A and B, if there’s any way that A could have influenced B, then the timestamp of A will be less than the timestamp of B. It’s also possible to have two events where we can’t say which came first; when that happens, it means that they couldn’t possible have affected each other. If A and B can’t have any affect on each other, then it doesn’t matter which one “comes first”.

The way that you make this work is remarkably simple and elegant. It’s based on the simplest model of distributed system, where a distributed system is a collection of processes. The processes only communicate by explicitly sending messages to each other.

  1. Every individual process p in the distributed system maintains an integer timestamp counter, \tau_p.
  2. Every time a process p performs an action, it increments \tau_p. Actions that trigger increments of \tau_p include message sends.
  3. Every time a process p sends a message to another process, it includes the current value of \tau_p in the message.
  4. When a process p receives a message from a process q, that message includes the value of \tau_q when the message was sent. So it updates its \tau_q to the \max(\tau_p, \tau_q)+1 (one more than the maximum of its current timestamp and the incoming message timestamp).

For any two events A and B in the system, if A \rightarrow B (that is, if A causally occurred before B – meaning that A could have done something that affected B), then we know that the timestamp of A will be smaller than the timestamp of B.

The order of that statement is important. It’s possible for timestamp(A) to be smaller than timestamp(B), but for B to have occurred before A by some wallclock. Lamport timestamps provide a causal ordering: A cannot have influenced or caused B unless A \rightarrow B; but A and B can be independent.

Let’s run through an example of how that happens. I’ll write it out by describing the clock-time sequence of events, and following it by a list of the timestamp counter settings for each host. We start with all timestamps at 0: [A(0), B(0), C(0), D(0).

  • [Event 1] A sends to C; sending trigger a timestamp increment. [A(1), B(0), C(0), D(0)].
  • [Event 2] C receives a message from A, and sets its counter to 2. [A(1), B(0), C(2), D(0).
  • [Event 3] C sends a message to A (C increments to 3, and sends.) [A(1), B(0), C(3), D(0).
  • [Event 4] A recieves the message from C, and sets its clock to 4. [A(4), B(0), C(3), D(0)]
  • [Event 5] B sends a message to D. [A(4), B(1), C(3), D(0)]
  • [Event 6] D receives the message. [A(4), B(1), C(3), D(2)].
  • [Event 7] D sends a message to C. [A(4), B(1), C(3), D(3)].
  • [Event 8] C receives the message, and sets its clock to 4.

According to the Lamport timestamps, in event 5, B sent its message to D at time 1. But by wallclock time, it sent its message after C’s timestamp was already 3, and A’s timestamp was already 4. We know that in our scenario, event 5 happened before event 3 by wallclock time. But in a causal ordering, it didn’t. In causal order, event 8 happened after event 4, and event 7 happened before event 8. In causal comparison, we can’t say whether 7 happened before or after 3 – but it doesn’t matter, because which order they happened in can’t affect anything.

The Lamport timestamp is a partial ordering. It tells us something about the order that things happened in, but far from everything. In effect, if the timestamp of event A is less than the timestamp of event B, it means that either A happened before B or that there’s no causal relation between A and B.

The Lamport timestamp comparisons only become meaningful when there’s an actual causal link between events. In our example, at the time that event 5 occurs, there’s no causal connection at all between the events on host A, and the events on host B. You can choose any arbitrary ordering between causally unrelated events, and as long as you use it consistently, everything will work correctly. But when event 6 happens, now there’s a causal connection. Event 5 could have changed some state on host D, and that could have changed the message that D sent in event 7. Now there’s a causal relationship, timestamp comparisons between messages after 7 has to reflect that. Lamport timestamps are the simplest possible mechanism that captures that essential fact.

When we talk about network time algorithms, we say that what Lamport timestamps do is provide weak clock consistency: If A causally happened before B, then the timestamp of A will be less than the timestamp of B.

For the mutex problem, we’d really prefer to have strong clock consistency, which says that the timestamp of A is smaller than the timestamp of B if and only if A causally occurred before B. But Lamport timestamps don’t give us enough information to do that. (Which is why there’s a more complex mechanism called vector clocks, which I’ll talk about in another post.

Getting back to the issues that this kind of timestamp is meant to solve, we’ve got a partial order of events. But that isn’t quite enough. Sometimes we really need to have a total order – we need to have a single, correct ordering of events by time, with no ties. That total order doesn’t need to be real – by which I mean that it doesn’t need to be the actual ordering in which events occured according to a wallclock. But it needs to be consistent, and no matter which host you ask, they need to always agree on which order things happened in. Pure lamport timestamps don’t do that: they’ll frequently have causally unrelated events with identical timestamps.

The solution to that is to be arbitrary but consistent. Take some extra piece of information that uniquely identifies each host in the distributed system, and use comparisons of those IDs to break ties.

For example, in real systems, every host has a network interface controller (NIC) which has a universally unique identifier called a MAC address. The MAC address is a 48 bit number. No two NICs in the history of the universe will ever have the same MAC address. (There are 281 trillion possible MAC codes, so we really don’t worry about running out.) You could also use hostnames, IP addresses, or just random arbitrarily assigned identifiers. It doesn’t really matter – as long as it’s consistent.

This doesn’t solve all of the problems of clocks in distributed systems. For example, it doesn’t guarantee fairness in Mutex assignment – which is the problem that I used as an example at the beginning of this post. But it’s a necessary first step: algorithms that do guarantee fairness rely on some kind of consistent event ordering.

It’s also just a beautiful example of what good distributed solutions look like. It’s simple: easy to understand, easy to implement correctly. It’s the simplest solution to the problem that works: there is, provably, no simpler mechanism that provides weak clock consistency.

Simpler Consensus with Raft

A few weeks ago, I wrote about Paxos, which is (at least in my experience), the most widely used algorithm for consensus in distributed systems. I’m a huge fan of Paxos – I think that it’s a remarkably elegant system.

But Paxos does have its problem.

  1. Paxos has a lot of roles: client, proposer, learner, acceptor, leader, follower. When you want to implement Paxos, you need to figure out all of those roles, and how you’re going to implement them. In general, you end up merging roles – but there are lots of ways of doing that merge. Each particular way of setting up the roles has its own properties, and thus its own tradeoffs that you need to understand.
  2. Paxos, as we normally talk about it, is really a single-consensus protocol – that is, the basic protocol is designed to get a group of agents to come to consensus just once. If you want to be able to repeatedly seek new consensus values, you’re actually going to be using an extension to the basic paxos protocol. There are a ton of Paxos extensions that work to add repeated consensus. Paxos itself is simple and elegant, with well-defined formal properties that we care about – the moment we start modifying it, we can no longer count on those properties unless we can also prove them in our extension!
  3. Paxos was originally described in a truly awful paper. Leslie Lamport was trying to write a paper that would be less dull than the typical bone-dry technical snoozer – but the way that he wrote it actually makes it much harder to understand.

In short: Paxos has more complexity than it needs, and despite that, it needs to be tweaked to be really useful, and getting those tweaks right is hard. There are, sadly, a lot of incorrect Paxos implementations – and their incorrectness has all-too-often come as a surprise to the people who rely on them.

To avoid those problems, there are other consensus algorithms out there. In this post, we’re going to look at one of the Paxos competitors – a consensus algorithm/protocol called raft.

Raft does away with the role complexity of Paxos. In Raft, you have a collection of cooperating agents. There are no distinct proposers, acceptors, or learners: there are just servers. Communication between the servers in raft is done entirely with synchronous remote procedure calls.

In Raft, the target of consensus is a log containing a sequence of events. The log is the history of the distributed system. The goal of raft is that the log be maintained in a consistent state throughout the raft network. Just like in Paxos, if we have 2n+1 servers, up to n can fail without the network losing its consistency.

Raft is designed in terms of remote procedure calls between the elements of the network. In Raft, we never talk about single messages – every communication between servers is a pair of messages: a request from caller to callee; and a response from callee to caller. When a message gets lost, we’ll just talk about it as a failed remote procedure call.

Within a Raft network, at any given time, each server has a state. It can be a follower, a leader, or a candidate. Within the network, there is at most one leader. When there is a leader, all of the other servers are in the follower state. The followers are almost entirely passive. Followers don’t talk to clients at all – they just wait for RPCs from the leader. The leader is the only participant that’s allowed to talk to clients: any client request must go through the current leader. The leader is also the only server that’s allowed to add new entries to the consensus log.

Raft divides time into a sequence of terms. In each term, the servers in the raft network need to select a leader using a process called an election. Raft is a strong leader protocol – no interactions with a client can take place except through a leader. If there’s no leader, then we can’t process client requests without a leader.

So, to understand Raft, there’s three processes that we need to

  1. Leader election
  2. Transitions between terms
  3. Appending an entry to a log.

In those processes, the servers have a collection of variables
that they use for the Raft protocol:

the current term for the server.
the serverID that this server voted for in the current term, or “none”.
the list of entries in the log.
The index of the highest log entry known to be committed by the server.
The index of the highest log entry that’s been added to the log – but not necessarily committed. (It doesn’t become committed until a majority of servers accept it.)

Leader Election

In each term, the Raft cluster needs to have a leader. The way that a leader is selected called election.

Elections are triggered by a term transition. When a server in the cluster decided that it needs to start a new term, it increments its term number, puts itself into the candidate state, and sends a RequestVote(term, candidateId) RPC to each of the other servers in the cluster. This request asks the other servers in the cluster to select it as the leader. If it receives enough “yes” votes, it will become the leader.

When a server receives a RequestVote RPC, it checks the term. If it’s smaller than the server’s current term, then it replies “No” – meaning that it cannot support the requestor as leader.

If the term in the request is greater that the receiver’s term, then the receiver cannot have voted in the new term. So it updates to the term from the request, and then it replies “Yes”.

If the term in the request equals the receiver’s term, then the receiver has already updated its term. If it’s already voted for someone else as leader, then it can’t support the requestor, so it replies “No”. If it hasn’t voted for a leader in the term, then it votes for the requestor, and replies “Yes”.

If the requester receives “Yes” votes from more than 1/2 of the cluster (counting itself), then it becomes the leader, and starts both processing requests from clients, and sending heartbeats to the other servers in the cluster.

If it doesn’t receive enough votes, then it waits to see if anyone else becomes the leader and starts sending heartbeats. If it doesn’t get a heartbeat in time, then it starts over: it would increment its term again, and try to start a fresh election.

Term Transitions

For a given server, term transitions happen in three ways:

  1. Timeout: the leading server needs to periodically communicate with each of the followers. This process is called heartbeat: even if the leader has no updates for its followers, it sends RPC calls to the followers just to say “I’m still here”. If a client goes too long without receiving a heartbeat, it decides that the leader was lost, and it will increment the term number, and trigger a new election.
  2. Leader resignation: the current leader can, at any time, decide to stop being the leader. (This is typically done by an implementation as part of a system that says that there’s a maximum period between leader elections. For example, in the Aurora scheduler, we had leader elections at least once per day. In a raft consensus, the leader would trigger this by deciding it was time for it to stop being a leader, and triggering an election by starting a new term.)
  3. External term change: every RPC received by a server includes a term number. If any RPC to a server ever includes a term number greater than the current term for that server, the server will update its term to the new number. As a special case of this, when a leader server decides to resign, it does that by sending an RPC to the other servers with an incremented term number.

Appending to the log

We just spent a fair bit of time talking about leaders and elections. That’s almost beside the point. What we really want to do is just maintain a consistent log across the cluster of servers. Everything except creating log entries is just the book-keeping that’s necessary to make the consistent log work. The log itself is maintained using the AppendEntries RPC call.

When a client request does something that alters the state of the cluster, the leader describes that change by adding an entry to the log. It builds a proposed log entry, and sends it to the other members of the cluster using an RPC. If it gets enough “Yes” votes from other cluster members, then the log entry becomes committed, and the leader updates its commitIndex to the index of the new log entry to reflect that.

The RPC request takes a bunch of parameters:

  1. term: the leader’s term.
  2. leaderId: the id of the leader.
  3. prevLogIndex: the index number of the last log entry in the consensus log preceeding this new entry.
  4. prevLogTerm: the number of the term where the last log entry was committed.
  5. entries: a set of new log entries to be appended to the log.
  6. leaderCommit: the index of the commitlog on the leader after this set of entries has been committed.

When an AppendEntries call is received by a follower, what it does is:

  • If the receiver’s term is greater than the request term, then the receiver rejects the request by replying “No”.
  • If the the receivers commit index is larger than the commit index of the request, then it rejects the request by replying “No”.
  • If the receiver’s log doesn’t contain an entry at prevLogIndex, or that entry’s term doesn’t match the request term, then it rejects the request by replying “No”.
  • If there’s an entry in the log with the same index as the new log entries, and the term in the request matches the receiver’s term, then the receiver removes all entries after prevLogIndex from its log.
  • The receiver then appends the new entries from the request to its log.
  • If the leaderCommit is greater than the commitIndex on the receiver, then the receiver updates its commitIndex.
  • Finally, the receiver replies “Yes”.

When a majority of the cluster members have accepted an AppendEntries call, then the log entry gets committed.

The one part of this that’s confusing is how the logs get managed. The leader creates a new log entry, and sends it to the other servers. The complexity comes from dealing with cases where something doesn’t reach consensus.

For example, the leader sends entries 5, 6, and 7 to server S. S adds the entries to its copy of log – it now contains [1, 2, 3, 4, 5, 6, 7]. Meanwhile, the leader also sends those entries to server T, but the RPC to T fails due to a network fault. Another client request happens, and now the leader sends [5, 6, 7, 8] to S. S sees that it’s got entry 5 already: so it discards everything after 5, and then re-appends.

So the trailing segment of the log can change! How do we handle consensus?

The next time that the leader sends an AppendEntries to a follower, it contains the leader’s commitIndex. The follower updates its commit index to that value. Once it’s done that, any request from a leader that tries to modify anything that comes before that commit index will be rejected.

The consensus commit thus doesn’t really occur until the next heartbeat call after a log update.

Raft versus Paxos

That’s the basics of Raft.

In comparison to Paxos, there’s a couple of things to notice:

  1. There’s a lot less confusion around roles. Paxos had a ton of different roles, and rules for interactions between the different roles. Raft doesn’t have any of that: it’s just servers, with one of the servers designated as the leader.
  2. Raft explicitly manages a log, and it adds complexity around log management. In Paxos, you’re just managing a single consensus value; in Raft, you’ve got a sequence of log entries.
  3. Paxos is defined in terms of messages; Raft is designed in terms of remote procedure calls.

So is Raft really simpler than Paxos? I think that’s up for discussion. Personally, I prefer Paxos. There’s a lot of complexity hidden under the covers of the RPC system. It looks simple on the surface, but all of the complexity of message passing, lost messages, message duplication – it’s still there. It’s just been swept under the carpet, as if that really makes it easier.

The way that the logs get maintained is confusing. That’s inevitable: getting distributed knowledge is never easy. Raft at least makes that part of things explicit, whereas it’s a common part of Paxos implementations, but it’s not really specified in the protocol.

Controlling Thousands of Machines. Aka, My Day Job.

I promised my friend Toby that I’d link to one of his blog posts. Toby is one of the SREs that I work with at twitter, and let me tell you, you always want to stay on the good side of the SREs!

And to introduce it, I actually get a chance to talk about what I really do!

Since last July, I’ve been working for twitter. I haven’t yet gotten to talk about what it is that I do at twitter. For obvious reasons, I think it’s absolutely fascinating. And since we just recently released it as open-source, there’s no reason to keep it secret anymore. I work on something called Aurora, which is a framework for Mesos.

Let’s start with Mesos.

When you run a web application like Twitter, you can’t run your application on a single computer. There’s a bunch of reasons for that, but the biggest one is that there’s not a computer in the world that’s big enough to do it; and if there was, depending on one epically massive computer would be incredibly unreliable. Instead, what we do is set up a data center, and fill it up with thousands upon thousands of cheap machines. Then we just use all of those thousands of little computers. These days, everyone does things that way. Twitter, Google, Facebook, Amazon, Microsoft, Apple – we all run on gigantic clusters of cheap machines in a datacenter.

But there’s a problem there. How do you use a thousand computers? Much less 10,000 or a million, or more?

What you do is build a substrate, called a cluster management system. You write a program, and you say that you want to run 1,000 copies of it, and hand it to the cluster manager substrate. The substrate takes care of figuring out where to put those processes, and telling the specific individual machines that it selects what to do to run your program.

One of the hardest problems in a cluster management system is figuring out how to assign programs to machines. Depending on what a particular program needs to do, its requirements can vary enormously. Running a quick map-reduce has one set of requirements. Running a reliable service – a service that can survive if part of a datacenter loses electricity – has a different set of requirements.

One approach – the one used by Google when I worked there – was to define a constraint language that ultimately had hundreds of possible parameters, and then treat it as a classical optimization problem, trying to optimize over all of it. The good side of that is that it meant that every program at Google could express its requirements exactly. The bad side of it was that the constraint system was incredibly complicated, and almost no one could predict what results a given set of constraints would produce. Configuration turned into a sort of game, where you’d make a guess, look at what the borg gave you, and then modify your constraint – repeat until you got something satisfactory.

mesos_logo At Twitter, Mesos is our cluster management system. It takes a completely different approach. Basically, it takes the borg-like approach and turns it upside down. Mesos itself doesn’t try to do constraint satisfaction at all. What it does is talk to components, called frameworks, and it offers them resources. It basically says “Here’s all of the possible ways I can give you a fair share of resources in the clusters. Tell me which ones you want.”.

Each framework, in turn, figures out how to allocate the resources offered to it by Mesos to individual jobs. In some ways, that might seem like it’s just deferring the problem: don’t the frameworks end up needing to do the same allocation nightmare? But in fact, what you do is write a framework for a particular kind of job. A framework needs to work out some constraint satisfaction – but it’s a much simpler problem, because instead of needing to satisfy every possible customer with one way of expressing constraints, each framework can decide, on its own, how to allocate resources to one particular kind of job. Map-reduce jobs get one framework that knows how to do a good job scheduling map-reduces. Services get a framework that knows how to do a good job allocating resources for services. Databases get a framework that knows how to do a good job allocating resources for storage and bandwidth intensive processes. As a result, the basic Mesos cluster manager is dramatically simpler than a one-size-fits-all scheduler, and each of the component frameworks is also much simpler.

You can see a really cute sort-of demonstration of how Mesos works by looking at @MesosMaster and @MesosSlave on twitter.

I don’t work on Mesos.

I work on Aurora. Aurora is a framework that runs under Mesos. It’s specialized for running services. Services are basically little server processes that run a lot of identical copiesfor a long time. Things like web-servers – where you need a ton of them to support millions of users. Or things like common processes that live behind the web-server answering requests for particular kinds of information.

At Twitter, all of the services that make up our system run in our datacenters on top of Mesos. They’re scheduled by Aurora.

With Aurora, running a service is incredibly easy. You write a configuration:

my_server_task = SequentialTask(
  processes = [
        cmdline='hadoopfs-copy /glorp/foo/hello_service .'),
    Process(name='run_service', cmdline='./myservice')
  resources = Resources(cpu=1.0, ram=128*MB, disk=128*MB))

jobs = [
    Service(task = my_server_task, 
        cluster = 'mycluster',
        environment = 'prod',
        name = 'hello',

The configuration says how to install a program and then run it on a machine, once Aurora assigns a machine to it. Aurora will find 300 machines, each of which can dedicate one full CPU, 128MB of memory, and 128MB of disk to the process, and it will run the service on those 300 machines.

Both Aurora and Mesos are open-source, under the Apache license. We’ve got a test environment in the distribution, so that you could be running tests in a virtual Mesos/Aurora cluster one hour from now! And my good friend Toby wrote an excellent blog post on how to set up a working Mesos/Aurora cluster.

I don’t want to toot my own horn too much, but I’m incredibly proud that Twitter open-sourced this stuff. Most companies consider their cluster management systems to be super-proprietary secrets. But at Twitter, we decided that this is a problem that no one should have to solve from scratch. It’s something we all know how to do, and it’s time that people stop being forced to waste time reinventing the same old wheel. So we’re giving it away, to anyone who wants to use it. That’s pretty damned awesome, and I’m proud to be a part of it!

The Basics of Software Transactional Memory

As promised, it’s time for software transactional memory!

A bit of background, first. For most of the history of computers, the way that we’ve built software is very strongly based on the fact that a computer has a processor – a single CPU, which can do one thing at a time, in order. Until recently, that was true. But now it really isn’t anymore. There’s the internet – which effectively means that no computer is ever really operating in isolation – it’s part of a huge computational space shared with billions of other computers. And even ignoring the internet, we’re rapidly approaching the point where tiny devices, like cellphones, will have more than one CPU.

The single processor assumption makes things easy. We humans tend to think very sequentially – that is, the way that we describe how to do things is: do this, then do that. We’re not so good at thinking about how to do lots of things at the same time. Just think about human language. If I want to bake a cake, what I’ll do is: measure out the butter and sugar, put them in the mixer, mix it until they’re creamed. Then add milk. Then in a separate bowl, measure out and sift the flour and the baking powder. Then slowly pour the dry stuff into the wet, and mix it together. Etc.

I don’t need to fully specify that order. If I had multiple bakers, they could do many of steps at the same time. But how, in english, can I clearly say what needs to be done? I can say it, but it’s awkward. It’s harder to say, and harder to understand than the sequential instructions.

What I need to do are identifying families of things that can be done at the same time, and then the points at which those things will merge.

All of the measurements can be done at the same time. In any mixings step, the mixing can’t be done until all of the ingredients are ready. Ingredients being ready could mean two things. It could mean that the ingredients were measured out; or it could mean that one of the ingredients for the mix is the product of one of the previous mixing steps, and that that previous step is complete. In terms of programming, we’d say that the measurement steps are independent; and the points at which we need to wait for several things to get done (like “we can’t mix the dry ingredients in to the wet until the wet ingredients have all been mixed and the dry ingredients have been measured”), we’d call synchroniation points.

It gets really complicated, really quickly. In fact, it gets even more complicated than you might think. You see, if you were to write out the parallel instructions for this, you’d probably leave a bunch of places where you didn’t quite fully specify things – because you’d be relying on stuff like common sense on the part of your bakers. For example, you’d probably say to turn on the over to preheat, but you wouldn’t specifically say to wait until it reached the correct temperature to put stuff into it; you wouldn’t mention things like “open the over door, then put the cake into it, then close it”.

When we’re dealing with multiple processors, we get exactly those kinds of problems. We need to figure out what can be done at the same time; and we need to figure out what the synchronization points are. And we also need to figure out how to do the synchronization. When we’re talking about human bakers “don’t mix until the other stuff is ready” is fine. But in software, we need to consider things like “How do we know when the previous steps are done?”.

And it gets worse than that. When you have a set of processors doing things at the same time, you can get something called a race condition which can totally screw things up!

For example, imagine that we’re counting that all of the ingredients are measured. We could imagine the mixer process as looking at a counter, waiting until all five ingredients have been measured. Each measuring process would do its measurement, and then increment the counter.

  val measureCount = 0
  process Mixer() {
    wait until measureCount == 5

  process Measurer() {
	 measureCount = measureCount + 1

What happens if two measurer finish at almost the same time? The last statement in Measurer actually consists of three steps: retrieve the value of measureCount; add one; store the incremented value. So we could wind up with:

Time Measurer1 Measurer2 measureCount
0 1
1 Fetch measureCount(=1) 1
2 Increment(=2) Fetch measurecount(=1) 1
3 Store updated(=2) Increment(=2) 2
4 Store updated(=2) 2

Now, Mixer will never get run! Because of the way that the two Measurers overlapped, one of the increments effectively got lost, and so the count will never reach 5. And the way it’s written, there’s absolutely no way to tell whether or not that happened. Most of the time, it will probably work – because the two processes have to hit the code that increments the counter at exactly the same time in order for there to be a problem. But every once in a while, for no obvious reason, the program will fail – the mixing will never get done. It’s the worst kind of error to try to debug – one which is completely unpredictable. And if you try to run it in a debugger, because the debugger slows things down, you probably won’t be able to reproduce it!

This kind of issue always comes down to coordination or synchronization of some kind – that is, the main issue is how do the different processes interact without stomping on each other?

The simplest approach is to use something called a lock. A lock is an object which signals ownership of some resource, and which has one really important property: in concept, it points at at most one process, and updating it is atomic meaning that when you want to look at it and update it, nothing can intervene between the read and write. So if you want to use the resource managed by the lock, you can look at the lock, and see if anyone is using it; and if not, set it to point to you. That process is called acquiring the lock.

In general, we wrap locks up in libraries to make them easier to use. If “l” was a lock, you’d take a lock by using a function something like “lock(l)”, which really expanded to something like:

def take(L: Lock) {
  while (L != me)
     atomically do if L==no one then L=me

So the earlier code becomes:

val measureCount = 0
val l = new Lock()

process Mixer() {
  wait until measureCount == 5

process Measurer() {
  measureCount = measureCount + 1

In a simple example like that, locks are really easy. Unfortunately, real examples get messy. For instance, there’s a situation called deadlock. A classic demonstration is something called the dining philosophers. You’ve got four philosophers sitting at a table dinner table. Between each pair, there’s a chopstick. In order to eat, a philosopher needs two chopsticks. When they get two chopsticks, they’ll use them to take a single bite of food, and then they’ll put down the chopsticks. If Each philosopher starts by grabbing the chopstick to their right, then no one gets to each. Each has one chopstick, and there’s no way for them to get a second one.

That’s exactly what happens in a real system. You lock each resource that you want to share. For some operations, you’re going to use more than one shared resource, and so you need two locks. If you have multiple tasks that need multiple resources, it’s easy to wind up with a situation where each task has some subset of the locks that they need.

Things like deadlock mean that simple locks get hairy really quickly. Not that any of the more complex coordination strategies make deadlocks impossible; you can always find a way of creating a deadlock in any system – but it’s a lot easier to create accidental deadlocks using simple locks than, say, actors.

So there’s a ton of methods that try to make it easier to do coordination between multiple tasks. Under the covers, these ultimately rely on primitives like locks (or semaphores, another similar primitive coordination tool). But they provide a more structured way of using them. Just like structured control flow makes code cleaner, clearer, and more maintanable, structured coordination mechanism makes concurrency cleaner, clearer, and more maintainable.

Software transactional memory is one approach to this problem, which is currently very trendy. It’s still not entirely clear to me whether or not STM is really quite up to the real world – current implementations remain promising, but inefficient. But before getting in to any of that, we need to talk about just what it is.

As I see it, STM is based on two fundamental concepts:

  1. Optimism. In software terms, by optimism, we mean that we’re going to plow ahead and assume that there aren’t any errors; when we’re done, we’ll check if there was a problem, and clean up if necessary. A good example of this from my own background is source code control systems. In the older systems like RCS, you’d lock a source file before you edited it; then you’d make your changes, and check them in, and release the lock. That way, you know that you’re never going to have two people making changes to the same file at the same time. But the downside is that you end up with lots of people sitting around doing nothing, waiting to get the lock on the file they want to change. Odds are, they weren’t going to change the same part of the file as the guy who has the lock. But in order to make sure that they can’t, the locks also block a lot of real work. Eventually, the optimistic systems came along, and what they did was say: “go ahead and edit the file all you want. But before I let you check in (save) the edited file to the shared repository, I’m going to make sure that no one changed it in a way that will mess things up. If I find out that someone did, then you’re going to have to fix it.”
  2. Transactions. A transaction is a concept from (among other places) databases. In a database, you often make a collection of changes that are, conceptually, all part of the same update. By making them a transaction, they become one atomic block – and either the entire collection all succeedd, or the entire collection all fail. Transactions guarantee that you’ll never end up in a situation where half of the changes in an update got written to the database, and the other half didn’t.

What happens in STM is that you have some collection of special memory locations or variables. You’re only allowed to edit those variables in a transaction block. Whenever a program enters a transaction block, it gets a copy of the transaction variables, and just plows ahead, and does whatever it wants with its copy of the transaction variables. When it gets to the end of the block, it tries to commit the transaction – that is, it tries to update the master variables with the values of its copies. But it checks them first, to make sure that the current value of the master copies haven’t changed since the time that it made its copy. If they did, it goes back and starts over, re-running the transaction block. So if anyone else updated any of the transaction variables, the transaction would fail, and then get re-executed.

In terms of our baking example, both of the measurers would enter the transaction block at the same time; and then whichever finished first would commit its transaction, which would update the master count variable. Then when the second transaction finished, it would check the count variable, see that it changed, and go back and start over – fetching the new value of the master count variable, incrementing it, and then committing the result. In terms of code, you’d just do something like:

transactional val measureCount = 0

process Mixer() {
  wait until measureCount == 5

process Measurer() {
  atomically {
    measureCount = measureCount + 1

It’s really that simple. You just mark all the shared resources as transactional, and then wrap the code that modifies them in a transaction block. And it just works. It’s a very elegant solution.

Of course there’s a lot more to it, but that’s the basic idea. In the code, you identify the transactional variables, and only allow them to be updated inside of a transaction block. At runtime, when you encounter a transaction block, charge ahead, and do whatever you want. Then when you finish, make sure that there isn’t an error. If there was, try again.

So what’s it look like in a non-contrived programming language? These days, I’m doing most of my coding in Scala. There’s a decent STM implementation for Scala as a part of a package called Akka.

In Akka, the way that you define a transactional variable is by using a Ref type. A Ref is a basically a cell that wraps a value. (It’s almost like a pointer value in C.) So, for example, in our Baker example:

var count :Ref[Int] = Ref(0)

Then in code, to use it, you literally just wrap the code that modifies the Refs in “atomic”. Alas, you don’t quite get to treat the refs like normal variables – to access the value of a ref, you need to call Ref.get; to change the value, you need to use a method alter, which takes a function that computes the new value in terms of the old.

class Measurer {
  def doMeasure() {
    // do the measuring stuff
    atomic {
	  ref.alter(_ + 1)

The “(_ + 1)” probably needs a quick explanation. In Scala, you can define a single expression function using “_” to mark the slot where the parameter should go. So “(_ + 1)” is equivalent to the lambda expression { x => x + 1}.

You can see, just from this tiny example, why STM is such an attractive approach. It’s so simple! It’s very easy to read and write. It’s a very simple natural model. It’s brilliant in its simplicity. Of course, there’s more to it that what I’ve written about here – error handling, voluntary transaction abort, retry management, transaction waits – but this is the basics, and it really is this simple.

What are the problems with this approach?

  1. Impurity. If not all variables are transactional, and you can modify a non-transactional variable inside of a transaction block, then you’ve got a mess onp your hands. Values from transactionals can “leak” out of transaction blocks.
  2. Inefficiency. You’ve got to either copy all of the transactional variables at the entry to the transaction block, or you’ve got to use some kind of copy-on-write strategy. However you do it, you’ve got grief – aggressive copying, copy-on-write, memory protect – they’ve all got non-trivial costs. And re-doing the entire transaction block every time it fails can eat a lot of CPU.
  3. Fairness. This is a fancy term for “what if one guy keeps getting screwed?” You’ve got lots of processes all working with the same shared resources, which are protected behind the transactional variables. It’s possible for timing to work out so that one process keeps getting stuck doing the re-tries. This is something that comes up a lot in coordination strategies for concurrency, and the implementations can get pretty hairy trying to make sure that they don’t dump all of the burden of retries on one process.