Streamdrill compared to other approaches for the Top-K-Problem

Streamdrill solves the top-k problem in real-time which consists in counting activities of different event times over a certain time interval.

So far, so good, but you might wonder why you couldn’t just do this by yourself. After all, it’s justcounting, right? Actually, the problem that streamdrill solves is simple, as long as:

  • the data volume is small
  • the time windows are small compared to the data volume
  • the number different kinds of events is small and bounded
  • you don’t need the results in real-time

But as soon as you have millions of events per day, wish to aggregate over days, weeks, or even months, and potentially have several million of different types of events, the problem gets quite complicated.

You may end up in such a situation faster than you think:

  • Event rates may rise up beyond what you originally envisioned (in particular if you’re successful. 😉)
  • The number of different event types may explode. This might either be because the underlying sets are already large (say, IP addresses, or users in a social network), or because you are tracking combinations such that the sizes multiply.

Let’s dig deeper into this problem to better understand why this problem quickly gets hard.

One thing you need to keep in mind is that the other solutions we will discuss still involve some amount of coding. So if you compare streamdrill against Hadoop, you would need to do a non-trivial amount of coding for Hadoop, because Hadoop is a general purpose framework taking care of the scaling, but doesn’t solve the top-k problem out of the box.

streamdrill Standard SQL Counters Stream Processing
solves top-k out of the box
real-time (✓ $$$)
focusses computation on “hot set”
memory based for high throughput
persistent (✓ ⌚) (✓)
scales to cluster (✓ ⌚)
exact results (✓ ⌚)

✓ = yes, ✕ = no, (✓) = possible, (✓ $$$) = possible, but expensive, (✓ ⌚) = possible, but not yet.

Approach 1: Store and crunch later

Let’s start by using a traditional approach based on some SQL database. To do that, you would create a table with a timestamp column and columns to reflect the fields in your data. Then you would just pipe in each event into the database. To get the counts you would run something like SELECT count(*) FROM events WHERE timestamp > '2012-12-01 00:00' AND timestamp < '2012-12-31 23:59' potentially also grouping to focus on certain types of events, and adding an ORDER BY count(*) clause to get the most active elements.

There a number of problems with this approach:

  • If you use a normal disk based database, you will be able to add only a few hundred, at best thousands, events per second,
  • As you add more data, your database will become slower over time (and you will be adding a lot of data!). Leo has a number of nice benchmarks on his blog for MongoDB and Cassandra insertion performance.
  • Just adding the data is not enough, you also need to crunch the whole data to compute the activities. But the longer the time window, the longer will the query take to run.
  • While the query runs, there will be considerable load on your server, making the addition of events even slower.
  • Eventually, you will get so slow that your results will already be a few minutes or even hours old once you get them. Hardly real-time.

What’s more, you’re probably only interested in the top 100 active elements, so most of the computation is spent on data you’re not interested in.

In other words, putting the data into some big database and crunching it later won’t really scale. If you’ve got a lot of money on the side, you can employ some form of clustering using map reduce or a similar approach to bring down the times, but the bottom line is the same: You crunch a lot of data which you don’t really need, and the problem will only become harder if you get more and more data. And “harder” also means a lot of coding and operations work (which you don’t have if you just use streamdrill 😉).

Approach 2: Just keeping the counters

So just storing the data away and crunching it later won’t work. So how about doing the counting on the spot? That way, you wouldn’t have to store all those duplicate events. Note that just keeping the counters isn’t sufficient, you also need to maintain the global index such that you can quickly identify the top-k entries.

Using a complex event processing framework like Esper would also be a way to reduce the coding load, as Esper comes with a nice query language which let’s you formulate averages over time windows in compact way.

Let’s assume your data doesn’t fit into memory anymore (otherwise it won’t be Big Data, right?). One option is to again store the counters in a database. However, just as in the previous example this restricts the number of updates you can handle. Also, you will generate a lot of changes on the database and not all databases handle that amount of write throughput gracefully. For example, Cassandra only marks old entries for deletion and cleans up during the compaction phases. In our experience, such compactions will eventually take hours and put significant load on your system, cutting the throughput in half.

And again, most of the time is spent on elements which will never make the top-k entries.

Approach 3: Stream processing frameworks

Instead of keeping counters in a database, you could also try and scale out using a stream processing framework like Twitter’s Storm, or Yahoo’s S4. Such systems let you define the computation tasks in the form of small worker threads which are then distributed over a cluster automatically by the framework, also keeping the counters in memory.

While this looks appealing (and in fact, allows you to scale to several hundred thousand events per second), note that this only solves the counting part, but not the global index of all activities. Computing that in a way which scales is non-trivial. You can collect the counter updates at a worker thread which then maintains the index, but what if it doesn’t fit into memory? You could partition the index, but then you’d have to aggregate the data to compute queries, and you’d have to do this yourself, so again, a lot of complexity. The above stream processing frameworks also don’t come with easy support for query, so you’d need to build some infrastructure to collect the results yourself.

And again, you also have a lot of computation for elements which will never show up in your top 100.

In summary

While conceptually simple (in the end, you just count, right), the top-k problem which streamdrill addresses becomes hard if there are more things to count than fit into memory, and the event rate is higher than what you can write to disk.

Finally, let’s discuss some of the differences:

  • As streamdrill is memory based, all the data is lost when streamdrill crashes or is restarted. However, we already have functionality in the engine to write snapshots of all the data, but those aren’t available yet via the API streamdrill.
  • Right now, streamdrill does not support clustering. We just haven’t found it necessary so far, but it’s something that is possible and will be included soon.
  • Finally, as I’m going to explain in more depth in the next post, streamdrill is based on approximate algorithms which trade exactness versus performance. Again, if exactness is really an issue, you can get it by combining with one of the other technologies. This is possible, but not our top priority for now.




Hazelcast Overview

Hazelcast is an open source In-Memory Data Grid (IMDG). As such it provides elastically scalable distributed In-Memory computing, widely recognized as the fastest and most scalable approach to application performance, and Hazelcast does so in open source. More importantly it makes distributed computing simple by offering distributed implementations of developer friendly interfaces from Java such as Map, Queue, ExecutorService, Lock, JCache and many more. For example, the Map interface provides an In-Memory Key Value store which confers many of the advantages of NoSQL in terms of developer friendliness and developer productivity.

In addition to distributing data In-Memory, Hazelcast provides a convenient set of APIs to access the CPUs in your cluster for maximum processing speed. Hazelcast is designed to be lightweight and easy to use. Since Hazelcast is delivered as a compact library (JAR) and has no external dependencies other than Java, it is easily pluggable into your software solution to provide distributed data structures and distributed computing utilities.

Hazelcast is highly scalable and available. Distributed applications can use Hazelcast for distributed caching, synchronization, clustering, processing, pub/sub messaging, etc. Hazelcast is implemented in Java and has clients for Java, C/C++, .NET as well as REST. Hazelcast can also speak memcache protocol. It also plugs in to Hibernate and can easily be used with any existing database system.

If you are looking for In-Memory speed, elastic scalability and the developer friendliness of NoSQL, Hazelcast is a great choice for you.

Hazelcast is simple

Hazelcast is written in Java with no other dependencies. It exposes the same API from the familiar Java util package. Just add hazelcast.jar to your classpath, enjoy JVMs clustering in less than a minute and start building scalable applications.

Hazelcast is Peer-to-Peer

Unlike many NoSQL solutions, Hazelcast is peer-to-peer. There is no master and slave; there is no single point of failure. All nodes store equal amount of data and do equal amount of processing. Hazelcast can be embedded to your existing application or used in client and server mode where your application is client to the Hazelcast nodes.

Hazelcast is scalable

Hazelcast is designed to scale up to hundreds and thousands of nodes. Simply add new nodes and they will automatically discover the cluster and will linearly increase both memory and processing capacity. The nodes maintain a TCP connection between each other and all communication is performed through this layer.

Hazelcast is fast

Hazelcast stores everything in-memory. It is designed to perform very fast reads and updates.

Hazelcast is redundant

Hazelcast keeps the backup of each data entry on multiple nodes. On a node failure, the data is restored from the backup and cluster will continue to operate without a downtime.

Sharding in Hazelcast

Hazelcast shards are called Partitions. By default, Hazelcast has 271 partitions. Given a key; we serialize, hash and mode it with the number of partitions to find the partition it belongs to. The partitions themselves are distributed equally among the members of the cluster. Hazelcast also creates the backups of partitions and also distributes them among nodes for redundancy.

Partitions in a 1 node Hazelcast cluster.

Partitions in a 2 node cluster.

The blacks are primary partitions and reds are backups. In the above illustration, first node has 135 primary partitions (black) and each of these partitions are backed up in the second node (red). At the same time, first node has the backup partitions of second node’s primary partitions.

As you add more nodes, Hazelcast will move one by one some of the primary and backup partitions to new nodes to make all nodes equal and redundant. Only minimum amount of partitions will be moved to scale out Hazelcast.

Hazelcast Topology

If you have an application whose main focal point is asynchronous or high performance computing and lots of task executions, then embedded deployment is the most useful. In this type, nodes include both the application and data, see the below illustration.

You can have a cluster of server nodes that can be independently created and scaled. Your clients communicate with these server nodes to reach to the data on them. Hazelcast provides native clients (Java, .NET and C++), Memcache clients and REST clients. See the below illustration.


Hazelcast MultiMap is a specialized map where you can store multiple values under a single key. Just like any other distributed data structure implementation in Hazelcast, MultiMap is distributed and thread-safe.

Hazelcast MultiMap is not an implementation of java.util.Map due to the difference in method signatures. It supports most features of Hazelcast Map except for indexing, predicates and MapLoader/MapStore. Yet, like Hazelcast Map, entries are almost evenly distributed onto all cluster members. When a new member joins the cluster, the same ownership logic used in the distributed map applies.

Sample MultiMap Code

Let’s write code that puts data into a MultiMap.

public class PutMember {
  public static void main( String[] args ) {
    HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
    MultiMap <String , String > map = hazelcastInstance.getMultiMap( "map" );

    map.put( "a", "1" );
    map.put( "a", "2" );
    map.put( "b", "3" ); 
    System.out.println( "PutMember:Done" );

Now let’s print the entries in this MultiMap.

public class PrintMember {
  public static void main( String[] args ) { 
    HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
    MultiMap <String, String > map = hazelcastInstance.getMultiMap( "map" );
    for ( String key : map.keySet() ){
      Collection <String > values = map.get( key );
      System.out.println( "%s -> %s\n",key, values );

After you run the first code sample, run the PrintMember sample. You will see the key a has two values, as shown below.

b -> [3]

a -> [2, 1]

Configuring MultiMap

When using MultiMap, the collection type of the values can be either Set or List. You configure the collection type with the valueCollectionType parameter. If you choose Set, duplicate and null values are not allowed in your collection and ordering is irrelevant. If you choose List, ordering is relevant and your collection can include duplicate and null values.

You can also enable statistics for your MultiMap with the statisticsEnabled parameter. If you enable statisticsEnabled, statistics can be retrieved with getLocalMultiMapStats() method.


Please refer to the MultiMap Configuration section for a full description of Hazelcast Distributed MultiMap configuration.

Scaling Real-time Apps on Cloud Foundry Using Node.js and Redis

Common applications being built on Node.js like social networking and chat require real-time scaling capabilities across multiple instances. Developers need to deal with sticky sessions, scale-up, scale-down, instance crash/restart, and more. Cloud Foundry PaaS provides a ready platform to achieve this quickly.

The following blog post will walk you through deploying and managing real-time applications on Cloud Foundry using common Node.js examples and Redis key-value store capabilities.

Chat App

The main objective here is to build a simple chat app while tackling the scale requirements. Specifically, we will be building a simple Express, and Redis-based Chat app that meets the following objectives:

  1. Chat server should run with multiple instances.
  2. The user login should be saved in a session.
    • User should be logged back in upon browser refresh
    • should get user information from the session before sending chat messages.
    • should only connect if user is already logged in.
  3. If the server instance that the user is connected to gets restarted, goes down or is scaled down while the user is chatting, the user should be reconnected to an available instance and recover the session.

Chat app’s Login page:

Chat app’s Chat page:

We will also cover:

  1. How to use and Sticky Sessions
  2. How to use Redis as a session store
  3. How to use Redis as a pubsub service
  4. How to use to get session info (like user info) from Express sessions
  5. How to configure client and server to properly reconnect after one or more server instances goes down (i.e. has been restarted, scaled down or crashed) & Sticky Sessions is one of the earliest and most popular Node.js modules to help build real-time apps like chat, social networking etc. (Note: SockJS is another popular library similar to

When you run such a server in a cloud that has a load-balancer, reverse proxy, routers etc., it has to be configured to work properly, especially when you scale the server to use multiple instances.

One of the constraints, SockJS and similar libraries have is that they need to continuously talk to the same instance of the server. They work perfectly well when there is only 1 instance of the server.

When you scale your app in a cloud environment, the load balancer (Nginx in the case of Cloud Foundry) will take over, and the requests will be sent to different instances causing to break.

For such situations, load balancers have a feature called ‘sticky sessions’, also known as ‘session affinity’. The idea is that if this property is set, all the requests following the first load-balanced request will go to the same server instance.

In Cloud Foundry, cookie-based sticky sessions are enabled for apps that set the cookie jsessionid. Note that jsessionid is the cookie name commonly used to track sessions in Java/Spring applications. Cloud Foundry is simply adopting it as the sticky session cookie for all frameworks.

To make work, the apps just need to set a cookie with the name jsessionid.

* Use cookieParser and session middleware together.
* By default Express/Connect app creates a cookie by name 'connect.sid'. But to scale app,
* make sure to use cookie name 'jsessionid' (instead of connect.sid) use Cloud Foundry's 'Sticky Session' feature.
* W/o this, won't work when you have more than 1 instance.
* If you are NOT running on Cloud Foundry, having cookie name 'jsessionid' doesn't hurt - it's just a cookie name.
app.use(express.session({store:sessionStore, key:'jsessionid', secret:'your secret here'}));

In the above diagram, when you open the app,

  1. Express sets a session cookie with name jsessionid.
  2. When connects, it uses that same cookie and hits the load balancer
  3. The load balancer always routes it to the same server that the cookie was set in.

Sending session info to

Let’s imagine that the user is logging in via Twitter or Facebook, or a regular login screen. We are storing this information in a session after the user has logged in.'/login', function (req, res) {
    //store user info in session after login.
    req.session.user = req.body.user;

Once the user has logged in, we connect via to allow chatting. However, doesn’t know who the user is and if he or she is actually logged in before sending chat messages to others.

That’s where the library comes in. It’s a very simple library that’s a wrapper around All it does is grab session information during the handshake and then pass it to’sconnection function.

//With just
io.sockets.on('connection', function (socket) {
    //do pubsub here

//But with, you'll get session info

 Use SessionSockets so that we can exchange (set/get) user data b/w sockets and http sessions
 Pass 'jsessionid' (custom) cookie name that we are using to make use of Sticky sessions.
var SessionSockets = require('');
var sessionSockets = new SessionSockets(io, sessionStore, cookieParser, 'jsessionid');

sessionSockets.on('connection', function (err, socket, session) {

    //get info from session
    var user = session.user;

    //Close socket if user is not logged in
    if (!user)

    //do pubsub
    socket.emit('chat', {user: user, msg: 'logged in'});

Redis as a session store

So far so good, but Express stores these sessions in MemoryStore (by default). MemoryStore is simply a Javascript object – it will be in memory as long as the server is up. If the server goes down, all the session information of all users will be lost!

We need a place to store this outside of our server, but it should also be very fast to retrieve. That’s where Redis as a session store come in.

Let’s configure our app to use Redis as a session store as below.

 Use Redis for Session Store. Redis will keep all Express sessions in it.
var redis = require('redis');
var RedisStore = require('connect-redis')(express);
var rClient = redis.createClient();
var sessionStore = new RedisStore({client:rClient});

  //And pass sessionStore to Express's 'session' middleware's 'store' value.
app.use(express.session({store: sessionStore, key: 'jsessionid', secret: 'your secret here'}));

With the above configuration, sessions will now be stored in Redis. Also, if one of the server instances goes down, the session will still be available for other instances to pick up. as pub-sub server

So far our sessions are taken care of with the above setup, but if we are using’s default pub-sub mechanism, it will work only for 1 sever instance. i.e. if user1 and user2 are on server instance #1, they can both chat with each other. If they are on different server instances, they cannot do so.

sessionSockets.on('connection', function (err, socket, session) {
    socket.on('chat', function (data) {
        socket.emit('chat', data); //send back to browser
        socket.broadcast.emit('chat', data); // send to others

    socket.on('join', function (data) {
        socket.emit('chat', {msg: 'user joined'});
        socket.broadcast.emit('chat', {msg: 'user joined'});

Redis as a PubSub service

In order to send chat messages to users across servers we will update our server to use Redis as a PubSub service (along with session store). Redis natively supports pub-sub operations. All we need to do is to create a publisher, a subscriber, and a channel.

//We will use Redis to do pub-sub

 Create two Redis connections. A 'pub' for publishing and a 'sub' for subscribing.
 Subscribe 'sub' connection to 'chat' channel.
var sub = redis.createClient();
var pub = redis.createClient();

sessionSockets.on('connection', function (err, socket, session) {
    socket.on('chat', function (data) {
        pub.publish('chat', data);

    socket.on('join', function (data) {
        pub.publish('chat', {msg: 'user joined'});

     Use Redis' 'sub' (subscriber) client to listen to any message from Redis to server.
     When a message arrives, send it back to browser using
    sub.on('message', function (channel, message) {
        socket.emit(channel, message);

The app architecture will now look like this:

Handling server scale-down / crashes / restarts

The app will work fine as long as all the server instances are running. What happens if the server is restarted or scaled down or one of the instances crash? How do we handle that?

Let’s first understand what happens in that situation.

The code below simply connects a browser to server and listens to various events.

  Connect to on the server (***BEFORE FIX***).
 var host =':')[0];
 var socket = io.connect('http://' + host);

 socket.on('connect', function () {
 socket.on('connecting', function () {
 socket.on('disconnect', function () {
 socket.on('connect_failed', function () {
 socket.on('error', function (err) {
     console.log('error: ' + err);
 socket.on('reconnect_failed', function () {
 socket.on('reconnect', function () {
     console.log('reconnected ');
 socket.on('reconnecting', function () {

While the user is chatting, if we restart the app on localhost or on a single host, attempts to reconnect multiple times (based on configuration) to see if it can connect. If the server comes up with in that time, it will reconnect. So we see the below logs:

If we restart the server (say using vmc restart redispubsub) and the user is chatting on the same app that’s running on Cloud Foundry AND with multiple instances, we will see the following log:

You can see that in the above logs, after the server comes back up, client (running in the browser) isn’t able to connect to server (running on Node.js in the server).

This is because, once the server is restarted on Cloud Foundry, instances are brought up as if they are brand-new server instances with different IP addresses and different ports and so jsessionid is no-longer valid. That in turn causes the load balancer to load balance’s reconnection requests (i.e. they are sent to different server instances) causing the server not to properly handshake and consequently to throw client not handshaken errors!

OK, let’s fix that reconnection issue

First, we will disable’s default “reconnect” feature, and then implement our own reconnection feature.

In our custom reconnection function, when the server goes down, we’ll make a dummy HTTP-get call to index.html every 4-5 seconds. If the call succeeds, we know that the (Express) server has already setjsessionid in the response. So, then we’ll call’s reconnect function. This time because jsessionid is set,’s handshake will succeed and the user can continue to chat without interruption.

 Connect to on the server (*** FIX ***).
var host =':')[0];

//Disable's default "reconnect" feature
var socket = io.connect('http://' + host, {reconnect: false, 'try multiple transports': false});
var intervalID;
var reconnectCount = 0;
socket.on('disconnect', function () {

    //Retry reconnecting every 4 seconds
    intervalID = setInterval(tryReconnect, 4000);

 Implement our own reconnection feature.
 When the server goes down we make a dummy HTTP-get call to index.html every 4-5 seconds.
 If the call succeeds, we know that (Express) server sets ***jsessionid*** , so only then we try reconnect.
var tryReconnect = function () {
    if (reconnectCount == 5) {
    console.log('Making a dummy http call to set jsessionid (before we do reconnect)');
        .success(function () {
            console.log("http request succeeded");
            //reconnect the socket AFTER we got jsessionid set
        }).error(function (err) {
            console.log("http request failed (probably server not up yet)");

In addition, since the jsessionid is invalidated by the load balancer, we can’t create a session with the same jsessionid or else the sticky session will be ignored by the load balancer. So on the server, when the dummy HTTP request comes in, we will regenerate the session to remove the old session and sessionid and ensure everything is afresh before we serve the response.

//Instead of..
exports.index = function (req, res) {
    res.render('index', { title: 'RedisPubSubApp', user: req.session.user});

//Use this..
exports.index = function (req, res) {
    //Save user from previous session (if it exists)
    var user = req.session.user;

    //Regenerate new session & store user from previous session (if it exists)
    req.session.regenerate(function (err) {
        req.session.user = user;
        res.render('index', { title: 'RedisPubSubApp', user: req.session.user});

Running / Testing it on Cloud Foundry

  • Clone the app to redispubsub folder
  • cd redispubsub
  • npm install and follow the below instructions to push the app to Cloud Foundry
> vmc push redispubsub
Instances> 4       <----- Run 4 instances of the server

1: node
2: other
Framework> node

1: node
2: node06
3: node08
4: other
Runtime> 3  <---- Choose Node.js 0.8v

1: 64M
2: 128M
3: 256M
4: 512M
Memory Limit> 64M

Creating redispubsub... OK

2: none
URL>  <--- URL of the app (choose something unique)

Updating redispubsub... OK

Create services for application?> y

1: blob 0.51
2: mongodb 2.0
3: mysql 5.1
4: postgresql 9.0
5: rabbitmq 2.4
6: redis 2.6
7: redis 2.4
8: redis 2.2
What kind?> 6 <----- Select & Add Redis v2.6 service

Name?> redis-e9771 <-- This is just a random name for Redis service

Creating service redis-e9771... OK
Binding redis-e9771 to redispubsub... OK
Create another service?> n

Bind other services to application?> n

Save configuration?> n

Uploading redispubsub... OK
Starting redispubsub... OK
Checking redispubsub... OK
  • Once the server is up, open up multiple browsers and go to <appname>
  • Start chatting

Test 1

  • Refresh the browser
  • You should automatically be logged in

Test 2

  • Open up JS debugger (in Chrome, do cmd + alt +j)
  • Restart the server by running vmc restart <appname>
  • Once the server restarts, should automatically reconnect
  • You should be able to chat after the reconnection

That’s it for this time. Stay tuned for my next blog where I will discuss how RabbitMQ can be leveraged in scalable apps built on Cloud Foundry. The content of this blog has also been covered in a video. Feel free to get in touch with us for questions on the material.

General Notes

  • Get the code right away – Github location:
  • Deploy right away – if you don’t already have a Cloud Foundry account, sign up for it here.
  • Check out Cloud Foundry getting started here and install the vmc Ruby command line tool to push apps.
  • To install the latest alpha or beta vmc tool run: sudo gem install vmc --pre.


Front end UI:

Scalability Best Practices: Lessons from eBay

At eBay, one of the primary architectural forces we contend with every day is scalability. It colors and drives every architectural and design decision we make. With hundreds of millions of users worldwide, over two billion page views a day, and petabytes of data in our systems, this is not a choice – it is a necessity.

In a scalable architecture, resource usage should increase linearly (or better) with load, where load may be measured in user traffic, data volume, etc. Where performance is about the resource usage associated with a single unit of work, scalability is about how resource usage changes as units of work grow in number or size. Said another way, scalability is the shape of the price-performance curve, as opposed to its value at one point in that curve.

There are many facets to scalability – transactional, operational, development effort. In this article, I will outline several of the key best practices we have learned over time to scale the transactional throughput of a web-based system. Most of these best practices will be familiar to you. Some may not. All come from the collective experience of the people who develop and operate the eBay site.

Best Practice #1: Partition by Function

Whether you call it SOA, functional decomposition, or simply good engineering, related pieces of functionality belong together, while unrelated pieces of functionality belong apart. Further, the more decoupled that unrelated functionality can be, the more flexibility you will have to scale them independently of one another.

At the code level, we all do this all the time. JAR files, packages, bundles, etc., are all mechanisms we use to isolate and abstract one set of functionality from another.

At the application tier, eBay segments different functions into separate application pools. Selling functionality is served by one set of application servers, bidding by another, search by yet another. In total, we organize our roughly 16,000 application servers into 220 different pools. This allows us to scale each pool independently of one another, according to the demands and resource consumption of its function. It further allows us to isolate and rationalize resource dependencies – the selling pool only needs to talk to a relatively small subset of backend resources, for example.

At the database tier, we follow much the same approach. There is no single monolithic database at eBay. Instead there is a set of database hosts for user data, a set for item data, a set for purchase data, etc. – 1000 logical databases in all, on 400 physical hosts. Again, this approach allows us to scale the database infrastructure for each type of data independently of the others.

Best Practice #2: Split Horizontally

While functional partitioning gets us part of the way, by itself it is not sufficient for a fully scalable architecture. As decoupled as one function may be from another, the demands of a single functional area can and will outgrow any single system over time. Or, as we like to remind ourselves, “if you can’t split it, you can’t scale it.” Within a particular functional area, then, we need to be able to break the workload down into manageable units, where each individual unit retains good price-performance. Here is where the horizontal split comes in.

At the application tier, where eBay’s interactions are by design stateless, splitting horizontally is trivial. Use a standard load-balancer to route incoming traffic. Because all application servers are created equal and none retains any transactional state, any of them will do. If we need more processing power, we simply add more application servers.

The more challenging problem arises at the database tier, since data is stateful by definition. Here we split (or “shard”) the data horizontally along its primary access path. User data, for example, is currently divided over 20 hosts, with each host containing 1/20 of the users. As our numbers of users grow, and as the data we store for each user grows, we add more hosts, and subdivide the users further. Again, we use the same approach for items, for purchases, for accounts, etc. Different use cases use different schemes for partitioning the data: some are based on a simple modulo of a key (item ids ending in 1 go to one host, those ending in 2 go to the next, etc.), some on a range of ids (0-1M, 1-2M, etc.), some on a lookup table, some on a combination of these strategies. Regardless of the details of the partitioning scheme, though, the general idea is that an infrastructure which supports partitioning and repartitioning of data will be far more scalable than one which does not.

Best Practice #3: Avoid Distributed Transactions

At this point, you may well be wondering how the practices of partitioning data functionally and horizontally jibe with transactional guarantees. After all, almost any interesting operation updates more than one type of entity – users and items come to mind immediately. The orthodox answer is well-known and well-understood – create a distributed transaction across the various resources, using two-phase commit to guarantee that all updates across all resources either occur or do not. Unfortunately, this pessimistic approach comes with substantial costs. Scaling, performance, and latency are adversely affected by the costs of coordination, which worsens geometrically as you increase the number of dependent resources and incoming clients. Availability is similarly limited by the requirement that all dependent resources are available. The pragmatic answer is to relax your transactional guarantees across unrelated systems.

It turns out that you can’t have everything. In particular, guaranteeing immediate consistency across multiple systems or partitions is typically neither required nor possible. The CAP theorem, postulated almost 10 years ago by Inktomi’s Eric Brewer, states that of three highly desirable properties of distributed systems – consistency (C), availability (A), and partition-tolerance (P) – you can only choose two at any one time. For a high-traffic web site, we have to choose partition-tolerance, since it is fundamental to scaling. For a 24×7 web site, we typically choose availability. So immediate consistency has to give way.

At eBay, we allow absolutely no client-side or distributed transactions of any kind – no two-phase commit. In certain well-defined situations, we will combine multiple statements on a single database into a single transactional operation. For the most part, however, individual statements are auto-committed. While this intentional relaxation of orthodox ACID properties does not guarantee immediate consistency everywhere, the reality is that most systems are available the vast majority of the time. Of course, we do employ various techniques to help the system reach eventual consistency: careful ordering of database operations, asynchronous recovery events, and reconciliation or settlement batches. We choose the technique according to the consistency demands of the particular use case.

The key takeaway here for architects and system designers is that consistency should not be viewed as an all or nothing proposition. Most real-world use cases simply do not require immediate consistency. Just as availability is not all or nothing, and we regularly trade it off against cost and other forces, similarly our job becomes tailoring the appropriate level of consistency guarantees to the requirements of a particular operation.

Best Practice #4: Decouple Functions Asynchronously

The next key element to scaling is the aggressive use of asynchrony. If component A calls component B synchronously, A and B are tightly coupled, and that coupled system has a single scalability characteristic — to scale A, you must also scale B. Equally problematic is its effect on availability. Going back to Logic 101, if A implies B, then not-B implies not-A. In other words, if B is down then A is down. By contrast, if A and B integrate asynchronously, whether through a queue, multicast messaging, a batch process, or some other means, each can be scaled independently of the other. Moreover, A and B now have independent availability characteristics – A can continue to move forward even if B is down or distressed.

This principle can and should be applied up and down an infrastructure. Techniques like SEDA (Staged Event-Driven Architecture) can be used for asynchrony inside an individual component while retaining an easy-to-understand programming model. Between components, the principle is the same — avoid synchronous coupling as much as possible. More often than not, the two components have no business talking directly to one another in any event. At every level, decomposing the processing into stages or phases, and connecting them up asynchronously, is critical to scaling.

Best Practice #5: Move Processing To Asynchronous Flows

Now that you have decoupled asynchronously, move as much processing as possible to the asynchronous side. In a system where replying rapidly to a request is critical, this can substantially reduce the latency experienced by the requestor. In a web site or trading system, it is worth it to trade off data or execution latency (how quickly we get everything done) for user latency (how quickly the user gets a response). Activity tracking, billing, settlement, and reporting are obvious examples of processing that belongs in the background. But often significant steps in processing of the primary use case can themselves be broken out to run asynchronously. Anything that can wait should wait.

Equally as important, but less often appreciated, is the fact that asynchrony can substantially reduce infrastructure cost. Performing operations synchronously forces you to scale your infrastructure for the peak load – it needs to handle the worst second of the worst day at that exact second. Moving expensive processing to asynchronous flows, though, allows you to scale your infrastructure for the average load instead of the peak. Instead of needing to process all requests immediately, the queue spreads the processing over time, and thereby dampens the peaks. The more spiky or variable the load on your system, the greater this advantage becomes.

Best Practice #6: Virtualize At All Levels

Virtualization and abstraction are everywhere, following the old computer science aphorism that the solution to every problem is another level of indirection. The operating system abstracts the hardware. The virtual machine in many modern languages abstracts the operating system. Object-relational mapping layers abstract the database. Load-balancers and virtual IPs abstract network endpoints. As we scale our infrastructure through partitioning by function and data, an additional level of virtualization of those partitions becomes critical.

At eBay, for example, we virtualize the database. Applications interact with a logical representation of a database, which is then mapped onto a particular physical machine and instance through configuration. Applications are similarly abstracted from the split routing logic, which assigns a particular record (say, that of user XYZ) to a particular partition. Both of these abstractions are implemented in our home-grown O/R layer. This allows the operations team to rebalance logical hosts between physical hosts, by separating them, consolidating them, or moving them — all without touching application code.

We similarly virtualize the search engine. To retrieve search results, an aggregator component parallelizes queries over multiple partitions, and makes a highly partitioned search grid appear to clients as one logical index.

The motivation here is not only programmer convenience, but also operational flexibility. Hardware and software systems fail, and requests need to be re-routed. Components, machines, and partitions are added, moved, and removed. With judicious use of virtualization, higher levels of your infrastructure are blissfully unaware of these changes, and you are therefore free to make them. Virtualization makes scaling the infrastructure possible because it makes scaling manageable.

Best Practice #7: Cache Appropriately

The last component of scaling is the judicious use of caching. The specific recommendations here are less universal, because they tend to be highly dependent on the details of the use case. At the end of the day, the goal of an efficient caching system to maximize your cache hit ratio within your storage constraints, your requirements for availability, and your tolerance for staleness. It turns out that this balance can be surprisingly difficult to strike. Once struck, our experience has shown that it is also quite likely to change over time.

The most obvious opportunities for caching come with slow-changing, read-mostly data – metadata, configuration, and static data, for example. At eBay, we cache this type of data aggressively, and use a combination of pull and push approaches to keep the system reasonably in sync in the face of updates. Reducing repeated requests for the same data can and does make a substantial impact. More challenging is rapidly-changing, read-write data. For the most part, we intentionally sidestep these challenges at eBay. We have traditionally not done any caching of transient session data between requests. We similarly do not cache shared business objects, like item or user data, in the application layer. We are explicitly trading off the potential benefits of caching this data against availability and correctness. It should be noted that other sites do take different approaches, make different tradeoffs, and are also successful.

Not surprisingly, it is quite possible to have too much of a good thing. The more memory you allocate for caching, the less you have available to service individual requests. In an application layer which is often memory-constrained, this is a very real tradeoff. More importantly, though, once you have come to rely on your cache, and have taken the extremely tempting steps of downsizing the primary systems to handle just the cache misses, your infrastructure literally may not be able to survive without it. Once your primary systems can no longer directly handle your load, your site’s availability now depends on 100% uptime of the cache – a potentially dangerous situation. Even something as routine as rebalancing, moving, or cold-starting the cache becomes problematic.

Done properly, a good caching system can bend your scaling curve below linear – subsequent requests retrieve data cheaply from cache rather than the relatively more expensive primary store. On the other hand, caching done poorly introduces substantial additional overhead and availability challenges. I have yet to see a system where there are not significant opportunities for caching. The key point, though, is to make sure your caching strategy is appropriate for your situation.


Scalability is sometimes called a “non-functional requirement,” implying that it is unrelated to functionality, and strongly implying that it is less important. Nothing could be further from the truth. Rather, I would say, scalability is a prerequisite to functionality – a “priority-0” requirement, if ever there was one.

I hope that you find the descriptions of these best practices useful, and that they help you to think in a new way about your own systems, whatever their scale.


eBay Architecture

eBay Architecture

eBay Serves 5 Billion API Calls Each Month. Aren’t we seeing more and more traffic driven by mashups composed on top of open APIs? APIs are no longer a bolt on, they are your application. Architecturally that argues for implementing your own application around the same APIs developers and users employ.

Who hasn’t wondered how eBay does their business? As one of the largest most loaded websites in the world, it can’t be easy. And the subtitle of the presentation hints at how creating such a monster system requires true engineering: Striking a balance between site stability, feature velocity, performance, and cost.

You may not be able to emulate how eBay scales their system, but the issues and possible solutions are worth learning from.


Information Sources

  • The eBay Architecture – Striking a balance between site stability, feature velocity, performance, and cost.
  • Podcast: eBay’s Transactions on a Massive Scale
  • Dan Pritchett on Architecture at eBay interview by InfoQ


  • Java
  • Oracle
  • WebSphere, servlets
  • Horizontal Scaling
  • Sharding
  • Mix of Windows and Unix

    What’s Inside?

    This information was adapted from Johannes Ernst’s Blog

    The Stats

  • On an average day, it runs through 26 billion SQL queries and keeps tabs on 100 million items available for purchase.
  • 212 million registered users, 1 billion photos
  • 1 billion page views a day, 105 million listings, 2 petabytes of data, 3 billion API calls a month
  • Something like a factor of 35 in page views, e-mails sent, bandwidth from June 1999 to Q3/2006.
  • 99.94% availability, measured as “all parts of site functional to everybody” vs. at least one part of a site not functional to some users somewhere
  • The database is virtualized and spans 600 production instances residing in more than 100 server clusters.
  • 15,000 application servers, all J2EE. About 100 groups of functionality aka “apps”. Notion of a “pool”: “all the machines that deal with selling”…

    The Architecture

  • Everything is planned with the question “what if load increases by 10x”. Scaling only horizontal, not vertical: many parallel boxes.
  • Architectures is strictly divided into layers: data tier, application tier, search, operations,
  • Leverages MSXML framework for presentation layer (even in Java)
  • Oracle databases, WebSphere Java (still 1.3.1)
  • Split databases by primary access path, modulo on a key.
  • Every database has at least 3 on-line databases. Distributed over 8 data centers
  • Some database copies run 15 min behind, 4 hours behind
  • Databases are segmented by function: user, item account, feedback, transaction, over 70 in all.
  • No stored procedures are used. There are some very simple triggers.
  • Move cpu-intensive work moved out of the database layer to applications applications layer: referential integrity, joins, sorting done in the application layer! Reasoning: app servers are cheap, databases are the bottleneck.
  • No client-side transactions. no distributed transactions
  • J2EE: use servlets, JDBC, connection pools (with rewrite). Not much else.
  • No state information in application tier. Transient state maintained in cookie or scratch database.
  • App servers do not talk to each other — strict layering of architecture
  • Search, in 2002: 9 hours to update the index running on largest Sun box available — not keeping up.
  • Average item on site changes its search data 5 times before it is sold (e.g. price), so real-time search results are extremely important.
  • “Voyager”: real-time feeder infrastructure built by eBay.. Uses reliable multicast from primary database to search nodes, in-memory search index, horizontal segmentation, N slices, load-balances over M instances, cache queries.

    Lessons Learned

  • Scale Out, Not Up
    – Horizontal scaling at every tier.
    – Functional decomposition.
  • Prefer Asynchronous Integration
    – Minimize availability coupling.
    – Improve scaling options.
  • Virtualize Components
    – Reduce physical dependencies.
    – Improve deployment flexibility.
  • Design for Failure
    – Automated failure detection and notification.
    – “Limp mode” operation of business features.
  • Move work out of the database into the applications because the database is the bottleneck. Ebay does this in the extreme. We see it in other architecture using caching and the file system, but eBay even does a lot of traditional database operations in applications (like joins).
  • Use what you like and toss what you don’t need. Ebay didn’t feel compelled to use full blown J2EE stack. They liked Java and Servlets so that’s all they used. You don’t have to buy into any framework completely. Just use what works for you.
  • Don’t be afraid to build solutions that meet and evolve with your needs. Every off the shelf solution will fail you at some point. You have to go the rest of the way on your own.
  • Operational controls become a larger and larger part of scalability as you grow. How do you upgrade, configure, and monitor thousands of machines will running a live system?
  • Architectures evolve. You need to be able to change, refine, and develop your new system while keeping your existing site running. That’s the primary challenge of any growing website.
  • It’s a mistake to worry too much about scalability from the start. Don’t suffer from paralysis by analysis and worrying about traffic that may never come.
  • It’s also a mistake not to worry about scalability at all. You need to develop an organization capable of dealing with architecture evolution. Understand you are never done. Your system will always evolve and change. Build those expectations and capabilities into your business from the start. Don’t let people and organizations be why your site fails. Many people will think the system should be perfect from the start. It doesn’t work that way. A good system is developed overtime in response to real issues and concerns. Expect change and adapt to change.

Scalable NIO Servers – Performance

Scalable NIO Servers – Performance

As a continuation of evaluating an NIO server for my iPhone game, I started with looking at pure performance. First, the following links provide already existant benchmarks:

I took my own samples and quickly ran some basic tests as well to just get a really rough idea.  Note that these metrics should be taken with a large grain of salt as they do not attempt to optimize any of the test libraries and the clients are all run in traditional threads on non-server class machines alongside the actual servers.  Thus, these tests are quickly CPU bound on the client.  The idea is to just get a very rough estimate to compare alongside the above tests.

That point aside, my test basically creates a echo server and echo client.  It then starts up X simultaneous threads that push a simple ‘testing’ string back and forth to the server as fast as it can.  The end result is the time it takes to send a message and receive the echo in milliseconds.

50 Threads

Netty:  ~2.75 ms / transaction

Grizzly:  ~2.52 ms / transaction

Mina:  ~3.45 ms / transaction

xSocket:  ~4.15 ms / transaction

100 Threads

Netty:  ~5.23 ms / transaction

Grizzly:  ~5.51 ms / transaction

Mina:  ~7.19 ms / transaction

xSocket:  ~8.23 ms / transaction

Basically, Netty and Grizzly are very similar in their results.  Mina and xSocket are considerably worse.  As such, we will look into memory usage overall and over time in my next post as we continue to drill down to selecting an ideal library.


Blazing fast node.js: 10 performance tips from LinkedIn Mobile

Here are our top 10 performance takeaways for working with Node.js:

1. Avoid synchronous code

By design, Node.js is single threaded. To allow a single thread to handle many concurrent requests, you can never allow the thread to wait on a blocking, synchronous, or long running operation. A distinguishing feature of Node.js is that it was designed and implemented from top to bottom to be asynchronous. This makes it an excellent fit for evented applications.

Unfortunately, it is still possible to make synchronous/blocking calls. For example, many file system operations have both asynchronous and synchronous versions, such as writeFile and writeFileSync. Even if you avoid synchronous methods in your own code, it’s still possible to inadvertently use an external library that has a blocking call. When you do, the impact on performance is dramatic.

// Good: write files asynchronously
fs.writeFile(‘message.txt’, ‘Hello Node’, function (err) {
console.log(“It’s saved and the server remains responsive!”);
// BAD: write files synchronously
fs.writeFileSync(‘message.txt’, ‘Hello Node’);
console.log(“It’s saved, but you just blocked ALL requests!”);
view rawSynchronous.js hosted with ❤ by GitHub

Our initial logging implementation accidentally included a synchronous call to write to disc. This went unnoticed until we did performance testing. When benchmarking a single instance of Node.js on a developer box, this one synchronous call caused a performance drop from thousands of requests per second to just a few dozen!

2. Turn off socket pooling

The Node.js http client automatically uses socket pooling: by default, this limits you to 5 sockets per host. While the socket reuse may keep resource growth under control, it will be a serious bottleneck if you need to handle many concurrent requests that all need data from the same host. In these scenarios, it’s a good idea to increase maxSockets or entirely disable socket pooling:

// Disable socket pooling
var http = require(‘http’);
var options = {…..};
options.agent = false;
var req = http.request(options)

3. Don’t use Node.js for static assets

For static assets, such as CSS and images, use a standard webserver instead of Node.js. For example, LinkedIn mobile uses nginx. We also take advantage of Content Delivery Networks (CDNs), which copy the static assets to servers around the world. This has two benefits: (1) we reduce load on our Node.js servers and (2) CDNs allow static content to be delivered from a server close to the user, which reduces latency.

4. Render on the client-side

Let’s quickly compare rendering a page server-side vs. client-side. If we have Node.js render server-side, we’ll send back an HTML page like this for every request:

<!– An example of a simple webpage rendered entirely server side –>
<!DOCTYPE html>
<title>LinkedIn Mobile</title>
<div class=“header”>
<div class=“body”>
Hello John!

Note that everything on this page, except for the user’s name, is static: that is, it’s identical for every user and page reload. So a much more efficient approach is to have Node.js return just the dynamic data needed for the page as JSON:

{“name”: “John”}

The rest of the page – all the static HTML markup – can be put into a JavaScript template (such as anunderscore.js template):

<!– An example of a JavaScript template that can be rendered client side –>
<!DOCTYPE html>
<title>LinkedIn Mobile</title>
<div class=“header”>
<div class=“body”>
Hello <%= name %>!

Here’s where the performance benefit comes in: as per tip #3, the static JavaScript template can be served from your webserver (e.g. nginx) and, even better, from a CDN. Moreover, JavaScript templates can be cached in the browser or saved in LocalStorage, so after the initial page load, theonly data sent to the client is the dynamic JSON, which is maximally efficient. This approach dramatically reduces the CPU, IO, and load on Node.js.

5. Use gzip

Most servers and clients support gzip to compress requests and responses. Make sure you take advantage of it, both when responding to clients and when making requests to remote servers:

Use gzip

6. Go parallel

Try to do all your blocking operations – that is, requests to remote services, DB calls, and file system access – in parallel. This will reduce latency to the slowest of the blocking operations rather than the sum of each one in sequence. To keep the callbacks and error handling clean, we use Step for flow control.

Make blocking calls in parallel

7. Go session-free

LinkedIn mobile uses the Express framework to manage the request/response cycle. Most express examples include the following configuration:

app.use(express.session({ secret: “keyboard cat” }));

By default, session data is stored in memory, which can add significant overhead to the server, especially as the number of users grows. You could switch to an external session store, such as MongoDB or Redis, but then each request incurs the overhead of a remote call to fetch session data. Where possible, the best option is to store no state on the server-side at all. Go session free by NOT including the express config above and you’ll see better performance.

8. Use binary modules

When available, use binary modules instead of JavaScript modules. For example, when we switched from a SHA module written in JavaScript to the compiled version that comes with Node.js, we saw a big performance bump:

// Use built in or binary modules
var crypto = require(‘crypto’);
var hash = crypto.createHmac(“sha1”,key).update(signatureBase).digest(“base64”);
view rawBinaryModules.js hosted with ❤ by GitHub

9. Use standard V8 JavaScript instead of client-side libraries

Most JavaScript libraries are built for use in a web browser, where the JavaScript environment is inconsistent: for example, one browser may support functions like forEach, map and reduce, but other browsers don’t. As a result, client-side libraries usually have a lot of inefficient code to overcome browser differences. On the other hand, in Node.js, you know exactly what JavaScript functions are available: the V8 JavaScript engine that powers Node.js implements ECMAScript as specified in ECMA-262, 5th edition. By directly using the standard V8 functions instead of client libraries, you may see significant performance gains.

10. Keep your code small and light

Working with mobile, where devices are slower and latencies are higher, teaches you to keep your code small and light. Apply this same idea to your server code as well. Revisit your decisions from time to time and ask yourself questions like: “Do we really need this module?”, “Why are we using this framework? Is it worth the overhead?”, “Can we do this in a simpler way?”. Smaller, lighter code is usually more efficient and faster.