Interested in working with us? We are hiring!

See open positions
Back

Data Science Event Processing

Steven Wright Written by Steven Wright, December 08, 2015

AdRoll uses large-scale machine learning to bid intelligently in internet advertising auctions. In this post, we explore some of the engineering behind the data pipelines that feed our learning algorithms, in particular our real-time system that constructs features from event streams.


#Overview

If you have been served a banner ad on the internet in the past few years, there is a good chance that ad was served through a process known as real-time bidding (RTB). As its name suggests, in RTB-based ad serving, there is an auction held for the ad space on a web page in real-time every time a page is loaded. AdRoll bids in these auctions on behalf of our clients; ad exchanges send us a bid request whenever a page is loading, and we respond to these requests with a bid price. There are many engaging engineering and data science problems that arise in the context of participating in RTB auctions. On AdRoll’s data science team, we mainly concern ourselves with the problem of determining the value of impressions intelligently in real-time.

At a high level, the solution to this problem is essentially a large-scale machine learning product, BidIQ. We learn the probability that particular impressions will click and convert based on historical features available at bid time, use our models to forecast this probability for incoming bid requests, and then bid as some function of these probabilities and other variables. If you’re interested in our modeling and learning algorithms, check out Matt Wilson’s post on Factorization Machines.

There are many features that we feed into our learning algorithm. One major source is those derived from the bid request. These include features such as geo-location, ad network, inventory source, the customer that we would serve this impression to, time, among many others. Another source of very valuable information is features related to the cookie that this impression would be served to, such as past behavior of that user. Enter our event-processing system, internally dubbed chocolate chip (cookies!), which computes such features and makes them available at bid time.


#Chocolate Chip

One of the challenges of using data on individual cookies (profiles) in real-time is that, unless the profile is stored in the cookie, it is not available in a bid request. This necessitates the use of a key-value store mapping cookies to profiles of features. There are costs and benefits to both approaches. Storing data in the cookie guarantees truly real-time data, whereas key-value stores allows greater flexiblity with offline testing and data munging. We ultimately decided on the latter approach. For our backend key-value store, we use AWS DynamoDB, which we have used with great success for similar use cases in the past.

At its core, chocolate chip is a system that takes events, processes them, and then writes profiles to DynamoDB. Although originally designed and built by the data science team, it has found other uses throughout AdRoll. Our use case on the data science team is to build predictive features from events in data streams and write these profiles of features in as close to real-time as possible. These profiles are then fetched by our bidders and fed into our machine learning model to produce more accurate click and conversion predictions. In order to learn on these data, the profile at the time of the bid is logged. Chocolate chip was built from the ground up in D, including our own high-performance DynamoDB client.

The high-level concept of building a profile is simple: read the current value of the profile in DynamoDB, update this profile with information on the current event, and write this profile back. This, however, is complicated due to the fact that in order to achieve the scale and performance needed at AdRoll, the system is highly parallelised and supports several methods by which to distribute the computation. Additionally, we run computations in mini-batches to reduce the number of HTTP requests and increase throughput.


#Architecture

Chocolate chip ingests multiple streams of data, each stream comprised of a distinct event type. Each event is in the form of an AdRoll log line, a unified log line format over all our event types. The cookie is then handed off to one of several worker threads based on the cookie hash and event type. One of the main considerations when dealing with concurrent read and writes is always going to be avoiding data races. In particular, we never want more than one concurrent read-build-write cycle per cookie to DynamoDB, otherwise the events processed by one of the cycles will be overwritten by the other. By having each worker thread only deal with a partition of the cookie space, we eliminate this race condition. This race is exacerbated for us because cookies tend to be active at certain times, and therefore have events close together in the event stream. If we did not break up by cookies, it is likely multiple workers could deal with events from the same cookie concurrently. Although sharding by cookie does scale over several nodes, it is not our primary form of sharding as each of the data streams themselves are not broken up by cookie, and hence would have to be replicated.

When scaling out chocolate chip, the primary mechanism is to shard by event type. This exploits DynamoDB hash-and-range key type, with the primary key being the cookie hash and the range key being the event type. Using this property, we can break out separate nodes to deal with distinct event streams and build profiles for different event types on separate subrange keys. Our input event streams do not guarantee event ordering, so any features we build from events must be commutative. With the hash-and-range key method, we’re also restricted to associative events. This basically means we can only build features which, even if partially constructed from separate events stream, can be reconstructed to their true value. While this adds some complexity at bid time, we have yet to find a feature we’d like to build that is commutative but not associative, so sharding out the building of features by event has not impacted what features we construct.

Once a worker thread reads an event from its queue, it stores this event until it has events accumulated from several distinct cookies. DynamoDB permits batch read and write requests, the latter having a batch size of 25, so this is the size we use. Once 25 distinct cookies have been registered (perhaps with multiple events per cookie), a batch read request is made to the master region, the profiles returned are updated or generated if nonexistent, and then written back and replicated to slave regions when there is available capacity.


#Example feature: Exponential Moving Average

One can imagine that for some event types, recent events are more valuable for learning than past ones. A mechanism we use to capture such information is the exponential moving average. To understand why this type of feature is used, let us first examine the simple moving average. Say we care about the amount of ads we have shown to a particular cookie in the past hour (such a feature could be phrased as a simple moving average). In order to calulate this feature, we would have to store the timestamps of all the impressions shown, and count those in the past hour at bid time. The generation of the feature can lazily eliminate events over an hour old, but the key here is that some information about all the events must be stored.

In the low-latency environment under which this system operates, the memory costs of this feature are prohibitive. At bid time, we must fetch the the profile from DynamoDB over a TCP connection and the latency of fetching the feature is proportional to the number of TCP packets needed to communicate the profile. If we store many features like this, the number of packets needed and, therefore, latency would be unacceptable.

This is where the exponential moving average (EMA) comes in. EMAs can store a notion of number of events that occurred recently in a fixed amount of memory.

The formal defintion of an EMA is:

With a little math, we can show that this is equivalent to following recursive definition:

And there we have it. All we need to calculate the current value of the EMA is that last value, the timestamp, and the current timestamp. An observant reader will also note that we can add in events out of order to the EMA, as we just need to apply an exponential decay factor to the event. We can build secondary features for free by using the EMA feature; we have to store the timestamp of the last event, so we can extract this and build other features out of it.

In our case, applying this to impression count, we can think of each passing second as an observation, which takes the value 1 if an impressions occured in that second, or a 0 if not. Then we can say that:

where

refers to the timestamp of the ith event.

The EMA described is essentially just a counter that decays over time. While we cannot guarantee the count of events in the past hour, we get a feature which takes a high value when there are a large number of recent events, and a low value when there are not.


#Performance considerations in D

As mentioned earlier, this system was built from the ground up in the D programming language. Rather than plugging a storm bolt into our existing data structure, this allows code sharing with our existing machine learning infrastructure, and our already-developed in-house D expertise. An added benefit is our ability to optimize all the areas of our stack, rather than just the feature generation layer.

With systems such as these, memory management is also going to be a major factor in performance. We found when we profiled our code that the primary source of contention on a single machine was the global lock that occurs around everything in D’s GC allocator. By restructuring our code to mostly use preallocated buffers for computation, and by using a specialized JSON serializer and parser, we are able to achieve single machine throughput of around 20,000 events per second on a 32 hyperthread instance. Another optimization we found was that D’s GC seems to have poor heuristics for when to enter a GC cycle for this application. By disabling automatic GC collections and running GC.collect() and GC.minimize() according to our own heuristics, we are able to achieve 2-3x higher throughput. The runtime in D has been getting much love in recent releases, and we hope that such measures will not be necessary in the future.

Ultimately, we didn’t really have to do anything fancy or micro-optimize to get solid performance out of the system. A significant advantage of this solid performance is that we have not had to build chocolate chip as a truly distributed system, and as such, we’ve had very good uptime and low maintenance costs. As a side note, for performance reasons, we also use unique memory management techniques in our prediction server for real-time ad pricing. That is a truly real-time system with millisecond SLAs and cannot afford any GC cycles.


Chocolate chip has added tremendous predictive power and accuracy to our learning algorithms and has drastically improved how intelligently we bid on exchanges. In the recent release of an EMA feature as outlined above, we found cost-per-click to drop by 2% with no effect on reach.

See open positions AdRoll on Github