Interested in working with us? We are hiring!

See open positions

Multi-Region DynamoDB

Walter King Written by Walter King, October 02, 2013

In rolling out our real time bidding infrastructure, we were faced with the task of syncing data for every user we could possibly target across four regions. We have on the order of hundreds of millions of users, and tens of thousands of writes per second. Not only do we have to deal with the daunting task of writing this data out in real time, the bidding system has a hard cap of 100ms for every bid request, so we need strong guarantees on read performance.


With DynamoDB we are able to have consistent, fast performance without having to worry about scaling out our own database infrastructure. However, DynamoDB does not support any concept of replication so we needed to build a system to write data to multiple data centers. Most replication solutions generate a log of every write, and then sends that log to all slaves. In our case, our application servers generate a list of writes that act as our source of truth, and then we can treat each region’s DynamoDB table as a slave reading from those logs.

The data we are storing is a mapping of user ids to segments, along with a timestamp. We can then look up which ad to serve from the list of valid segments.

User Lists

The first replication solution was implemented using map reduce. Periodically, a job would gather all the (userID, segmentID, timestamp) tuples from logfiles, with an output format that wrote to every region that needed this data. As we scaled up to more and more users, this job would take many hours to run. Also, due to the nature of batch jobs, there would be long idle periods, followed by large spikes in writes to DynamoDB. Since billing was based on throughput, we were interested in a much more consistent write load. We began searching for solutions to speed this system up.

We have been moving away from batch jobs towards more realtime processing systems using Storm. The process for converting the batch job to Storm was very straight-forward. We already had a system for ingesting our data into Storm; we just had to translate the data into partial updates to update the time for a particular segment.

This scheme ran into some performance issues. In the map reduce job, we were able to combine all the updates per user over a couple hour time period into a single update. However, with Storm we were issuing them one at a time, and suffering from the large overhead of each http request. Since most of the time we were idle waiting for requests, the first performance enhancement was to switch to the asynchronous Java API. We were then able to issue more requests in parallel, but it still was not able to get the throughput we expected.

We really wanted to use batching, but the API does not support batch updates, only batch puts. This required a slight change to our schema.

User Profiles

By using range keys each row is independent of the others. Updates can be done using the batch API, which lets us send 25 updates at a time, effectively multiplying the throughput by that amount. Reads then just have to query by user id with no range key, which will fetch all the same data as the old schema.

One important thing to keep in mind is that the batch API can be partially successful, and it returns the subset of write requests that failed. And since each region has its own request, a given write may succeed in one region and not the other. Our writes can safely be repeated, so we err on the side of replaying a given write, which fits well with Storm’s promise of at-least-once processing. We maintain a list of all writes that are in flight, along with the number of regions it has successfully been written to. When a request returns, that count is incremented for all successful values. Once that count equals the number of expected regions written to, it is acknowledged in Storm as successful. Any failed values get retried a couple of times within the same process. After the retries are finished, if there are any failed writes left, we mark it as failed in Storm, which will cause it to be put in the back of the write queue to be retried later. As most failures are temporal, generally due to throughput limit throttling, this process can repeat until all writes complete successfully.

While DynamoDB does not inherently support multiple regions, given a stream of writes it’s relatively easy to send those data across the world. Combining batch, asynchronous writes with the speed of DynamoDB yields us an average of over ten thousand writes per second on a single Storm worker, with plenty of room to handle spikes. Add to that linear scaling of throughput with the number of workers, and we have a system that will handle any load we can throw at it.

See open positions AdRoll on Github