Hazelcast Overview

Hazelcast is an open source In-Memory Data Grid (IMDG). As such it provides elastically scalable distributed In-Memory computing, widely recognized as the fastest and most scalable approach to application performance, and Hazelcast does so in open source. More importantly it makes distributed computing simple by offering distributed implementations of developer friendly interfaces from Java such as Map, Queue, ExecutorService, Lock, JCache and many more. For example, the Map interface provides an In-Memory Key Value store which confers many of the advantages of NoSQL in terms of developer friendliness and developer productivity.

In addition to distributing data In-Memory, Hazelcast provides a convenient set of APIs to access the CPUs in your cluster for maximum processing speed. Hazelcast is designed to be lightweight and easy to use. Since Hazelcast is delivered as a compact library (JAR) and has no external dependencies other than Java, it is easily pluggable into your software solution to provide distributed data structures and distributed computing utilities.

Hazelcast is highly scalable and available. Distributed applications can use Hazelcast for distributed caching, synchronization, clustering, processing, pub/sub messaging, etc. Hazelcast is implemented in Java and has clients for Java, C/C++, .NET as well as REST. Hazelcast can also speak memcache protocol. It also plugs in to Hibernate and can easily be used with any existing database system.

If you are looking for In-Memory speed, elastic scalability and the developer friendliness of NoSQL, Hazelcast is a great choice for you.

Hazelcast is simple

Hazelcast is written in Java with no other dependencies. It exposes the same API from the familiar Java util package. Just add hazelcast.jar to your classpath, enjoy JVMs clustering in less than a minute and start building scalable applications.

Hazelcast is Peer-to-Peer

Unlike many NoSQL solutions, Hazelcast is peer-to-peer. There is no master and slave; there is no single point of failure. All nodes store equal amount of data and do equal amount of processing. Hazelcast can be embedded to your existing application or used in client and server mode where your application is client to the Hazelcast nodes.

Hazelcast is scalable

Hazelcast is designed to scale up to hundreds and thousands of nodes. Simply add new nodes and they will automatically discover the cluster and will linearly increase both memory and processing capacity. The nodes maintain a TCP connection between each other and all communication is performed through this layer.

Hazelcast is fast

Hazelcast stores everything in-memory. It is designed to perform very fast reads and updates.

Hazelcast is redundant

Hazelcast keeps the backup of each data entry on multiple nodes. On a node failure, the data is restored from the backup and cluster will continue to operate without a downtime.

Sharding in Hazelcast

Hazelcast shards are called Partitions. By default, Hazelcast has 271 partitions. Given a key; we serialize, hash and mode it with the number of partitions to find the partition it belongs to. The partitions themselves are distributed equally among the members of the cluster. Hazelcast also creates the backups of partitions and also distributes them among nodes for redundancy.

Partitions in a 1 node Hazelcast cluster.

Partitions in a 2 node cluster.

The blacks are primary partitions and reds are backups. In the above illustration, first node has 135 primary partitions (black) and each of these partitions are backed up in the second node (red). At the same time, first node has the backup partitions of second node’s primary partitions.

As you add more nodes, Hazelcast will move one by one some of the primary and backup partitions to new nodes to make all nodes equal and redundant. Only minimum amount of partitions will be moved to scale out Hazelcast.

Hazelcast Topology

If you have an application whose main focal point is asynchronous or high performance computing and lots of task executions, then embedded deployment is the most useful. In this type, nodes include both the application and data, see the below illustration.

You can have a cluster of server nodes that can be independently created and scaled. Your clients communicate with these server nodes to reach to the data on them. Hazelcast provides native clients (Java, .NET and C++), Memcache clients and REST clients. See the below illustration.


Hazelcast MultiMap is a specialized map where you can store multiple values under a single key. Just like any other distributed data structure implementation in Hazelcast, MultiMap is distributed and thread-safe.

Hazelcast MultiMap is not an implementation of java.util.Map due to the difference in method signatures. It supports most features of Hazelcast Map except for indexing, predicates and MapLoader/MapStore. Yet, like Hazelcast Map, entries are almost evenly distributed onto all cluster members. When a new member joins the cluster, the same ownership logic used in the distributed map applies.

Sample MultiMap Code

Let’s write code that puts data into a MultiMap.

public class PutMember {
  public static void main( String[] args ) {
    HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
    MultiMap <String , String > map = hazelcastInstance.getMultiMap( "map" );

    map.put( "a", "1" );
    map.put( "a", "2" );
    map.put( "b", "3" ); 
    System.out.println( "PutMember:Done" );

Now let’s print the entries in this MultiMap.

public class PrintMember {
  public static void main( String[] args ) { 
    HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
    MultiMap <String, String > map = hazelcastInstance.getMultiMap( "map" );
    for ( String key : map.keySet() ){
      Collection <String > values = map.get( key );
      System.out.println( "%s -> %s\n",key, values );

After you run the first code sample, run the PrintMember sample. You will see the key a has two values, as shown below.

b -> [3]

a -> [2, 1]

Configuring MultiMap

When using MultiMap, the collection type of the values can be either Set or List. You configure the collection type with the valueCollectionType parameter. If you choose Set, duplicate and null values are not allowed in your collection and ordering is irrelevant. If you choose List, ordering is relevant and your collection can include duplicate and null values.

You can also enable statistics for your MultiMap with the statisticsEnabled parameter. If you enable statisticsEnabled, statistics can be retrieved with getLocalMultiMapStats() method.


Please refer to the MultiMap Configuration section for a full description of Hazelcast Distributed MultiMap configuration.


Scaling Real-time Apps on Cloud Foundry Using Node.js and Redis

Common applications being built on Node.js like social networking and chat require real-time scaling capabilities across multiple instances. Developers need to deal with sticky sessions, scale-up, scale-down, instance crash/restart, and more. Cloud Foundry PaaS provides a ready platform to achieve this quickly.

The following blog post will walk you through deploying and managing real-time applications on Cloud Foundry using common Node.js examples and Redis key-value store capabilities.

Chat App

The main objective here is to build a simple chat app while tackling the scale requirements. Specifically, we will be building a simple Express, Socket.io and Redis-based Chat app that meets the following objectives:

  1. Chat server should run with multiple instances.
  2. The user login should be saved in a session.
    • User should be logged back in upon browser refresh
    • Socket.io should get user information from the session before sending chat messages.
    • Socket.io should only connect if user is already logged in.
  3. If the server instance that the user is connected to gets restarted, goes down or is scaled down while the user is chatting, the user should be reconnected to an available instance and recover the session.

Chat app’s Login page:

Chat app’s Chat page:

We will also cover:

  1. How to use Socket.io and Sticky Sessions
  2. How to use Redis as a session store
  3. How to use Redis as a pubsub service
  4. How to use sessions.sockets.io to get session info (like user info) from Express sessions
  5. How to configure Socket.io client and server to properly reconnect after one or more server instances goes down (i.e. has been restarted, scaled down or crashed)

Socket.io & Sticky Sessions

Socket.io is one of the earliest and most popular Node.js modules to help build real-time apps like chat, social networking etc. (Note: SockJS is another popular library similar to Socket.io).

When you run such a server in a cloud that has a load-balancer, reverse proxy, routers etc., it has to be configured to work properly, especially when you scale the server to use multiple instances.

One of the constraints Socket.io, SockJS and similar libraries have is that they need to continuously talk to the same instance of the server. They work perfectly well when there is only 1 instance of the server.

When you scale your app in a cloud environment, the load balancer (Nginx in the case of Cloud Foundry) will take over, and the requests will be sent to different instances causing Socket.io to break.

For such situations, load balancers have a feature called ‘sticky sessions’, also known as ‘session affinity’. The idea is that if this property is set, all the requests following the first load-balanced request will go to the same server instance.

In Cloud Foundry, cookie-based sticky sessions are enabled for apps that set the cookie jsessionid. Note that jsessionid is the cookie name commonly used to track sessions in Java/Spring applications. Cloud Foundry is simply adopting it as the sticky session cookie for all frameworks.

To make socket.io work, the apps just need to set a cookie with the name jsessionid.

* Use cookieParser and session middleware together.
* By default Express/Connect app creates a cookie by name 'connect.sid'. But to scale Socket.io app,
* make sure to use cookie name 'jsessionid' (instead of connect.sid) use Cloud Foundry's 'Sticky Session' feature.
* W/o this, Socket.io won't work when you have more than 1 instance.
* If you are NOT running on Cloud Foundry, having cookie name 'jsessionid' doesn't hurt - it's just a cookie name.
app.use(express.session({store:sessionStore, key:'jsessionid', secret:'your secret here'}));

In the above diagram, when you open the app,

  1. Express sets a session cookie with name jsessionid.
  2. When socket.io connects, it uses that same cookie and hits the load balancer
  3. The load balancer always routes it to the same server that the cookie was set in.

Sending session info to Socket.io

Let’s imagine that the user is logging in via Twitter or Facebook, or a regular login screen. We are storing this information in a session after the user has logged in.

app.post('/login', function (req, res) {
    //store user info in session after login.
    req.session.user = req.body.user;

Once the user has logged in, we connect via socket.io to allow chatting. However, socket.io doesn’t know who the user is and if he or she is actually logged in before sending chat messages to others.

That’s where the sessions.sockets.io library comes in. It’s a very simple library that’s a wrapper around socket.io. All it does is grab session information during the handshake and then pass it to socket.io’sconnection function.

//With just Socket.io..
io.sockets.on('connection', function (socket) {
    //do pubsub here

//But with sessions.sockets.io, you'll get session info

 Use SessionSockets so that we can exchange (set/get) user data b/w sockets and http sessions
 Pass 'jsessionid' (custom) cookie name that we are using to make use of Sticky sessions.
var SessionSockets = require('session.socket.io');
var sessionSockets = new SessionSockets(io, sessionStore, cookieParser, 'jsessionid');

sessionSockets.on('connection', function (err, socket, session) {

    //get info from session
    var user = session.user;

    //Close socket if user is not logged in
    if (!user)

    //do pubsub
    socket.emit('chat', {user: user, msg: 'logged in'});

Redis as a session store

So far so good, but Express stores these sessions in MemoryStore (by default). MemoryStore is simply a Javascript object – it will be in memory as long as the server is up. If the server goes down, all the session information of all users will be lost!

We need a place to store this outside of our server, but it should also be very fast to retrieve. That’s where Redis as a session store come in.

Let’s configure our app to use Redis as a session store as below.

 Use Redis for Session Store. Redis will keep all Express sessions in it.
var redis = require('redis');
var RedisStore = require('connect-redis')(express);
var rClient = redis.createClient();
var sessionStore = new RedisStore({client:rClient});

  //And pass sessionStore to Express's 'session' middleware's 'store' value.
app.use(express.session({store: sessionStore, key: 'jsessionid', secret: 'your secret here'}));

With the above configuration, sessions will now be stored in Redis. Also, if one of the server instances goes down, the session will still be available for other instances to pick up.

Socket.io as pub-sub server

So far our sessions are taken care of with the above setup, but if we are using socket.io’s default pub-sub mechanism, it will work only for 1 sever instance. i.e. if user1 and user2 are on server instance #1, they can both chat with each other. If they are on different server instances, they cannot do so.

sessionSockets.on('connection', function (err, socket, session) {
    socket.on('chat', function (data) {
        socket.emit('chat', data); //send back to browser
        socket.broadcast.emit('chat', data); // send to others

    socket.on('join', function (data) {
        socket.emit('chat', {msg: 'user joined'});
        socket.broadcast.emit('chat', {msg: 'user joined'});

Redis as a PubSub service

In order to send chat messages to users across servers we will update our server to use Redis as a PubSub service (along with session store). Redis natively supports pub-sub operations. All we need to do is to create a publisher, a subscriber, and a channel.

//We will use Redis to do pub-sub

 Create two Redis connections. A 'pub' for publishing and a 'sub' for subscribing.
 Subscribe 'sub' connection to 'chat' channel.
var sub = redis.createClient();
var pub = redis.createClient();

sessionSockets.on('connection', function (err, socket, session) {
    socket.on('chat', function (data) {
        pub.publish('chat', data);

    socket.on('join', function (data) {
        pub.publish('chat', {msg: 'user joined'});

     Use Redis' 'sub' (subscriber) client to listen to any message from Redis to server.
     When a message arrives, send it back to browser using socket.io
    sub.on('message', function (channel, message) {
        socket.emit(channel, message);

The app architecture will now look like this:

Handling server scale-down / crashes / restarts

The app will work fine as long as all the server instances are running. What happens if the server is restarted or scaled down or one of the instances crash? How do we handle that?

Let’s first understand what happens in that situation.

The code below simply connects a browser to server and listens to various socket.io events.

  Connect to socket.io on the server (***BEFORE FIX***).
 var host = window.location.host.split(':')[0];
 var socket = io.connect('http://' + host);

 socket.on('connect', function () {
 socket.on('connecting', function () {
 socket.on('disconnect', function () {
 socket.on('connect_failed', function () {
 socket.on('error', function (err) {
     console.log('error: ' + err);
 socket.on('reconnect_failed', function () {
 socket.on('reconnect', function () {
     console.log('reconnected ');
 socket.on('reconnecting', function () {

While the user is chatting, if we restart the app on localhost or on a single host, socket.io attempts to reconnect multiple times (based on configuration) to see if it can connect. If the server comes up with in that time, it will reconnect. So we see the below logs:

If we restart the server (say using vmc restart redispubsub) and the user is chatting on the same app that’s running on Cloud Foundry AND with multiple instances, we will see the following log:

You can see that in the above logs, after the server comes back up, socket.io client (running in the browser) isn’t able to connect to socket.io server (running on Node.js in the server).

This is because, once the server is restarted on Cloud Foundry, instances are brought up as if they are brand-new server instances with different IP addresses and different ports and so jsessionid is no-longer valid. That in turn causes the load balancer to load balance socket.io’s reconnection requests (i.e. they are sent to different server instances) causing the socket.io server not to properly handshake and consequently to throw client not handshaken errors!

OK, let’s fix that reconnection issue

First, we will disable socket.io’s default “reconnect” feature, and then implement our own reconnection feature.

In our custom reconnection function, when the server goes down, we’ll make a dummy HTTP-get call to index.html every 4-5 seconds. If the call succeeds, we know that the (Express) server has already setjsessionid in the response. So, then we’ll call socket.io’s reconnect function. This time because jsessionid is set, socket.io’s handshake will succeed and the user can continue to chat without interruption.

 Connect to socket.io on the server (*** FIX ***).
var host = window.location.host.split(':')[0];

//Disable Socket.io's default "reconnect" feature
var socket = io.connect('http://' + host, {reconnect: false, 'try multiple transports': false});
var intervalID;
var reconnectCount = 0;
socket.on('disconnect', function () {

    //Retry reconnecting every 4 seconds
    intervalID = setInterval(tryReconnect, 4000);

 Implement our own reconnection feature.
 When the server goes down we make a dummy HTTP-get call to index.html every 4-5 seconds.
 If the call succeeds, we know that (Express) server sets ***jsessionid*** , so only then we try socket.io reconnect.
var tryReconnect = function () {
    if (reconnectCount == 5) {
    console.log('Making a dummy http call to set jsessionid (before we do socket.io reconnect)');
        .success(function () {
            console.log("http request succeeded");
            //reconnect the socket AFTER we got jsessionid set
        }).error(function (err) {
            console.log("http request failed (probably server not up yet)");

In addition, since the jsessionid is invalidated by the load balancer, we can’t create a session with the same jsessionid or else the sticky session will be ignored by the load balancer. So on the server, when the dummy HTTP request comes in, we will regenerate the session to remove the old session and sessionid and ensure everything is afresh before we serve the response.

//Instead of..
exports.index = function (req, res) {
    res.render('index', { title: 'RedisPubSubApp', user: req.session.user});

//Use this..
exports.index = function (req, res) {
    //Save user from previous session (if it exists)
    var user = req.session.user;

    //Regenerate new session & store user from previous session (if it exists)
    req.session.regenerate(function (err) {
        req.session.user = user;
        res.render('index', { title: 'RedisPubSubApp', user: req.session.user});

Running / Testing it on Cloud Foundry

  • Clone the app to redispubsub folder
  • cd redispubsub
  • npm install and follow the below instructions to push the app to Cloud Foundry
> vmc push redispubsub
Instances> 4       <----- Run 4 instances of the server

1: node
2: other
Framework> node

1: node
2: node06
3: node08
4: other
Runtime> 3  <---- Choose Node.js 0.8v

1: 64M
2: 128M
3: 256M
4: 512M
Memory Limit> 64M

Creating redispubsub... OK

1: redispubsub.cloudfoundry.com
2: none
URL> redispubsub.cloudfoundry.com  <--- URL of the app (choose something unique)

Updating redispubsub... OK

Create services for application?> y

1: blob 0.51
2: mongodb 2.0
3: mysql 5.1
4: postgresql 9.0
5: rabbitmq 2.4
6: redis 2.6
7: redis 2.4
8: redis 2.2
What kind?> 6 <----- Select & Add Redis v2.6 service

Name?> redis-e9771 <-- This is just a random name for Redis service

Creating service redis-e9771... OK
Binding redis-e9771 to redispubsub... OK
Create another service?> n

Bind other services to application?> n

Save configuration?> n

Uploading redispubsub... OK
Starting redispubsub... OK
Checking redispubsub... OK
  • Once the server is up, open up multiple browsers and go to <appname>.cloudfoundry.com
  • Start chatting

Test 1

  • Refresh the browser
  • You should automatically be logged in

Test 2

  • Open up JS debugger (in Chrome, do cmd + alt +j)
  • Restart the server by running vmc restart <appname>
  • Once the server restarts, Socket.io should automatically reconnect
  • You should be able to chat after the reconnection

That’s it for this time. Stay tuned for my next blog where I will discuss how RabbitMQ can be leveraged in scalable apps built on Cloud Foundry. The content of this blog has also been covered in a video. Feel free to get in touch with us for questions on the material.

General Notes

  • Get the code right away – Github location: https://github.com/rajaraodv/redispubsub.
  • Deploy right away – if you don’t already have a Cloud Foundry account, sign up for it here.
  • Check out Cloud Foundry getting started here and install the vmc Ruby command line tool to push apps.
  • To install the latest alpha or beta vmc tool run: sudo gem install vmc --pre.



Front end UI: https://github.com/steffenwt/nodejs-pub-sub-chat-demo

Database Index

What is it?

Database index is very similar to the index given at the end of a book. A DB table index is essentially a copy of certain  column(s) of the table along with the pointers to the actual memory location of the record that are stored in a specific way for easy retrieval.

Database index

So, why do we need an index?

For just the same reason why we would use an index in a book! Yes! It is for faster retrieval of data from the table. It makes better sense only for those tables which hold millions of records and doing a full-table scan for retrieval of data would be an over-kill!

The index however comes with its own cost – of course, the extra memory space. But it’s not just that… We’ll discover more on the way!

How is the index stored and how does it work?

This, by itself is a research topic. But as we saw – the primary purpose of index is to make the retrieval faster – which means, the index should be stored in such a way that accessing the elements in it should be sub-linear or of constant time (if possible). Else, the whole point of keeping an index would go waste.

The most commonly used structure is B-tree which has log(n) lookup time and stores the elements in sorted way. We can also have HashMap implementation which has a constant time lookup – O(1) – however, it has no ordering!

Database index

If you are wondering “So what? Why do I need to bother about ordering now?“, then think of a scenario where you want to select all those students who were born before 1990 or students who have scored > 60 where ordering does matter. In that case, indexing the correct column using a B-Tree would be of more help because HashMap has no way of telling you anything about order!

An appropriate data structure should be chosen based on the requirements of data. There is also a data structure called R-Tree when you have requirements related to spatial ordering – like you want to find the petrol bunk within 2 km radius of your current location! (Wikipedia would give you other types of indexes available)

And… what are the costs involved?

As mentioned earlier, space is definitely a big cost involved. You will incur more space when there are billions of records. And, you should also be updating your index whenever your table is updated. Having a b-tree index means sorting the tree every time an insert / delete / update happens.

Is it worth it?

Index may not be worth it if you are trying to use it for those tables which hardly get any lookup queries! You need index only when there are more searching operations! Else, Index is a mere waste of space and processing time.

And, furthermore, indexing column should be wisely chosen! Mostly the primary keys are chosen for index by default in the DBMS. Because, the indexing column should be as unique as possible! Else, it would again end up as a linear search and wouldn’t help much in optimizing the performance. Having a cardinality of 2 would just divide the data into 2 sets whereas a cardinality of 1000 would return 1000 records.

An important point to note here is – your index is not always guaranteed to be used! You can’t force the DBMS to use it in any query. The query optimizer makes the decision for every query based on how much performance improvement is possible with the given index. And the query optimizer will avoid using the index if the cardinality is less than 30% of the record number, effectively making the index a waste of space.

Reference: http://kindleyourbrain.wordpress.com/2014/08/17/database-index/

Running mongo as service

Make sure that you added you /bin directory to you system variable PATH

First I executed this command:

D:\mongodb\bin>mongod --remove

Then executed this one:

D:\mongodb\bin>mongod --dbpath=D:\mongodb --logpath=D:\mongodb\log.txt --install

After that right there in the command prompt execute:


And look for MongoDB service and click start.

What is shard database ?

A database shard is a horizontal partition of data in a database or search engine. Each individual partition is referred to as a shard or database shard.

Database architecture

Horizontal partitioning is a database design principle whereby rows of a database table are held separately, rather than being split into columns (which is what normalization and vertical partitioning do, to differing extents). Each partition forms part of a shard, which may in turn be located on a separate database server or physical location.

There are numerous advantages to this partitioning approach. Since the tables are divided and distributed into multiple servers, the total number of rows in each table in each database is reduced. This reduces index size, which generally improves search performance. A database shard can be placed on separate hardware, and multiple shards can be placed on multiple machines. This enables a distribution of the database over a large number of machines, which means that the database performance can be spread out over multiple machines, greatly improving performance. In addition, if the database shard is based on some real-world segmentation of the data (e.g., European customers v. American customers) then it may be possible to infer the appropriate shard membership easily and automatically, and query only the relevant shard.[1]

In practice, sharding is far more complex. Although it has been done for a long time by hand-coding (especially where rows have an obvious grouping, as per the example above), this is often inflexible. There is a desire to support sharding automatically, both in terms of adding code support for it, and for identifying candidates to be sharded separately. Consistent hashing is one form of automatic sharding to spread large loads across multiple smaller services and servers.[2][3]

Where distributed computing is used to separate load between multiple servers (either for performance or reliability reasons), a shard approach may also be useful.


What is memcached ?

Memcached is a general-purpose distributed memory caching system. It is often used to speed up dynamic database-driven websites by caching data and objects in RAM to reduce the number of times an external data source (such as a database or API) must be read.

Memcached’s APIs provide a giant hash table distributed across multiple machines. When the table is full, subsequent inserts cause older data to be purged in least recently used (LRU) order. Applications using Memcached typically layer requests and additions into RAM before falling back on a slower backing store, such as a database.

The system uses a client–server architecture. The servers maintain a key–value associative array; the clients populate this array and query it. Keys are up to 250 bytes long and values can be at most 1 MB in size.

Clients use client-side libraries to contact the servers which, by default, expose their service at port 11211. Each client knows all servers; the servers do not communicate with each other. If a client wishes to set or read the value corresponding to a certain key, the client’s library first computes a hash of the key to determine the server to use. Then it contacts that server. The server will compute a second hash of the key to determine where to store or read the corresponding value.

The servers keep the values in RAM; if a server runs out of RAM, it discards the oldest values. Therefore, clients must treat Memcached as a transitory cache; they cannot assume that data stored in Memcached is still there when they need it. MemcacheDB,Couchbase Server, Tarantool and other database servers provide persistent storage while maintaining Memcached protocol compatibility.

If all client libraries use the same hashing algorithm to determine servers, then clients can read each other’s cached data.

Converting database or object creation queries to use Memcached is simple. Typically, when using straight database queries, example code would be as follows:

 function get_foo(int userid) {
    data = db_select("SELECT * FROM users WHERE userid = ?", userid);
    return data;

After conversion to Memcached, the same call might look like the following

 function get_foo(int userid) {
    /* first try the cache */
    data = memcached_fetch("userrow:" + userid);
    if (!data) {
       /* not found : request database */
       data = db_select("SELECT * FROM users WHERE userid = ?", userid);
       /* then store in cache until next get */
       memcached_add("userrow:" + userid, data);
    return data;

The client would first check whether a Memcached value with the unique key “userrow:userid” exists, where userid is some number. If the result does not exist, it would select from the database as usual, and set the unique key using the Memcached API add function call.