Monday, November 8, 2010

Implementing a fast concurrent Object Manager for Terracotta Server Array

If you have been following the latest news about Terracotta, you probably already know that we recently released a new feature called  BigMemory. It is a great solution for efficiently using very large amount of memory (tested with up to 350 GB) in Java. More info on  BigMemory can be found here and here.

When implementing such a game changing feature like BigMemory, it exposes certain bottlenecks and behaviors that you never thought existed in the system. Putting in BigMemory into our server compelled us to do various kinds of optimizations to our system to really take advantage of all the memory BigMemory made available to us. This blog is about one bottleneck that we discovered during the process and how we solved it by writing a more concurrent resource manager to replace the old one.

Identifying the bottleneck :

First step in solving a bottleneck is identifying you have one. And to be able to do that you would have to have a good test showing the problem. My colleague, Steve, has written a multi-part blog sometime back about writing a good test, so I am not going to go into that here.

In one of our test cases, we saw that adding more resources (like more client VMs, machines, RAM etc.) didn't help after a certain point. The TPS flat-lined after the initial gain. A quick thread dump of the Terracotta Server revealed this.

"WorkerThread(server_map_request_stage, 5, 0)" daemon prio=10 tid=0x00002aac3856f000 nid=0x250b waiting for monitor entry [0x000000004bb27000]
   java.lang.Thread.State: BLOCKED (on object monitor)
    at com.tc.objectserver.impl.ObjectManagerImpl.lookupObjectsForOptionallyCreate(ObjectManagerImpl.java:202)
    - waiting to lock <0x00002aaab41f6c90> (a com.tc.objectserver.impl.ObjectManagerImpl)
    at com.tc.objectserver.impl.ObjectManagerImpl.lookupObjectsFor(ObjectManagerImpl.java:196)
    at com.tc.objectserver.handler.ServerTCMapRequestHandler.handleEvent(ServerTCMapRequestHandler.java:26)
    at com.tc.async.impl.StageImpl$WorkerThread.run(StageImpl.java:127)

"WorkerThread(server_map_request_stage, 4, 0)" daemon prio=10 tid=0x00002aac3856d000 nid=0x250a runnable [0x000000004ba26000]
   java.lang.Thread.State: RUNNABLE
    at com.tc.objectserver.managedobject.ManagedObjectImpl.getFlag(ManagedObjectImpl.java:110)
    - locked <0x00002aaab534c2b0> (a com.tc.objectserver.managedobject.ManagedObjectImpl)
    at com.tc.objectserver.managedobject.ManagedObjectImpl.isReferenced(ManagedObjectImpl.java:304)
    at com.tc.objectserver.impl.ObjectManagerImpl.markReferenced(ObjectManagerImpl.java:255)
    at com.tc.objectserver.impl.ObjectManagerImpl.processObjectsRequest(ObjectManagerImpl.java:833)
    at com.tc.objectserver.impl.ObjectManagerImpl.basicLookupObjectsFor(ObjectManagerImpl.java:480)
    - locked <0x00002aaab41f6c90> (a com.tc.objectserver.impl.ObjectManagerImpl)
    at com.tc.objectserver.impl.ObjectManagerImpl.lookupObjectsForOptionallyCreate(ObjectManagerImpl.java:208)
    - locked <0x00002aaab41f6c90> (a com.tc.objectserver.impl.ObjectManagerImpl)
    at com.tc.objectserver.impl.ObjectManagerImpl.lookupObjectsFor(ObjectManagerImpl.java:196)

Only a couple of threads are shown above but if you look at the full thread dump you can see that many threads are waiting for the same Object Manager lock while one thread is doing some processing. Our Object Manager subsystem was not written to be concurrent. Only one  thread can act on it at a time.

The old non-concurrent Object Manager :

The Object Manager is a central subsystem in Terracotta Server Array that is responsible for managing and accounting distributed objects. It keeps track of  what objects exists in the system, what are currently being used and who is waiting for them among other things. Other subsystems like the Transaction processor,  Distributed Garbage collector (DGC), Cache Manager etc. come to the Object Manager to checkout the objects that they need to work on. Once they are done with it, they check the objects back in for others to use.

Terracotta uses a staged event driven architecture (SEDA) internally and the requests to the ObjectManager are batched conveniently at higher level, so there were only few threads interacting directly with the Object Manager. Also the act of checking out and checking in objects require the Object Manager to be locked but the processing of request (like READ/WRITE/UPDATE etc.) happens outside the lock, so the lock is held for a very short time. So the non-concurrent Object Manager was never a problem until now.

But with DCV2 and BigMemory things changed. There is more load on the Object Manager, more objects managed by it and hence greater lock contention in the Object Manager.  So we set on a path to make the non-concurrent Object Manager  subsystem into a more concurrent one.

Requirements for the new concurrent Object Manager :

1. If a thread T1 requests objects A, B and C, you can only give all the objects or none of the objects to T1.

2. If an object (say C) is not available, because it is currently being used by someone else or is currently paged to disk, it should be given to the requester (T1) as soon as its available at a later point in time. In the mean time requests to A and B can be furnished to other requesters.

3. When Distributed garbage collector (DGC) is running, it should have the ability to pause the ObjectManager. This means any currently processing requests should complete, but new requests should be queued up and DGC should be notified so that it can complete its rescue phase.

4. Cache Manager, which is responsible for managing the object cache internally, could be running simultaneously flushing objects from heap to off-heap and from off-heap to disk.

5. It should be fast and lock-free (mostly, i.e. no global locks in check-in/check-out path) so all these operations can take place concurrently. It is OK to lock when DGC requests it to pause.

With these requirements in mind we set out to write a new Object Manager. Some of the core logic that we employed are listed below.

1 : Try to mark objects for use and rollback on failure

When requests are being processed concurrently, the state is changing quickly and concurrently. If two requests for objects (A, B and C) and objects (C and D)  are processed concurrently, only one can be furnished at a time because of the common object C and rule 1. Note that such collisions to top level objects are not uncommon but when you have thousands of  requests processing on millions of objects per second, there are also many requests that are running successfully in parallel.

So we have an "in-use" bit in every object. For every object in the request, we try to mark that bit atomically if its not set.  If we are able to mark all the objects required for processing the request, then we are allowed to carry on with the request. If not we have to roll back, i.e. unmark the objects we just marked so far, and add the request to a blocked request queue.

When the object becomes available again, the request becomes pending and is processed again.

2 : Maintain lock order

If  two requests for objects (A, B and C) and (C, D and A) are processed concurrently as mentioned above, we could end up in a situation where Request 1 is waiting for Request 2 to complete while Request 2 is waiting for Request 1 to complete. In order to avoid these circular dependencies and the classic deadlock that comes with it, we always process the objects in a request in ascending order of its ids thus maintaining a consistent order to avoid deadlocks.

3. Check, change and recheck state

Lets say we couldn't process a request because an object (C) is not available and we decide to add it to the blocked  queue, but before we could add it to the blocked queue and signal the requester that the request went pending and that it will be furnished in future,  the object C could be checked back in and become available for use, as things are running in parallel. This will lead to a request ending up in the blocked queue when all requested objects are available and breaks rule 2 (and may lead to a hung request)

To avoid this we recheck the state again after we add to the blocked queue. If things changed in the meantime, we rerun the request until either the request is successfully furnished or successfully added to the blocked queue without the state changing underneath us for that object.

4. Initiate faulting from disk once and only once

When a requested object is not available in memory, we initiate a fault from disk or off-heap but while the object is being faulted we don't want to initiate more fault requests for that same object from other requests. To do this we add a marker object that is "always" in-use (mark bit set) and use CHM's putIfAbsent to decide who should initate the fault. For the time it takes to fault from disk, this object is "in-use" for all other requests and thus they go to blocked request queue automatically.

5. Pause and unpause the Object Manager when needed for DGC

Every once in a while when Distributed Garbage Collector (DGC) is running, it may need the Object Manager to stop processing any request during its final rescue phase for a short time so it can come up with the final set of objects that are garbage and can be deleted from the system. (Rule 3)

To achieve this and also rule 5, we introduced a ReentrantReadWriteLock (RRWL). Every request must acquire a READ lock  before processing the request in the ObjectManager. When DGC requests the Object Manager to pause, it changes the state to not allow further requests and then acquires a WRITE lock and  waits for all currently operating requests to complete. We maintain the number of objects that are currently checked out. When it reaches zero, DGC can continue its final rescue. This way all the requests are processed concurrently while holding READ lock but DGC still can pause the processing when needed by grabbing the WRITE lock. This could have also been done using a more complicated state machine but this solution proved simple and gave good performance.

Conclusion :

So with all these changes we were able to remove all global locks from lookup code path in  Object Manager  thus eliminating the bottleneck we saw earlier in the thread dump. We got a very good improvement to the performance with this change.  Also with BigMemory, we were able to scale up linearly up to 350 GB of data even as we add more load to the system. In fact for some heavy read use case, we saw 2X improvement in TPS for even the regular distributed cache without BigMemory.

For the adventurous ones out there, the source for the new concurrent Object Manager can be found here.  If you want to checkout our open source Terracotta platform and play with it, you could get the source or download it from here.



Wednesday, July 21, 2010

Scale your cache to a terabyte with Terracotta Distributed Ehcache 2.2

Introduction :

In todays world of cloud computing and internet scale applications, data scale and availability are becoming an increasing burden to developers. With that problem in mind we are excited to release our latest snap in versions of Terracotta Ehcache. With this release and a few lines of config you are able to scale an application to terabytes of highly available data and 100s of nodes. We  accomplish this for our users with a new Terracotta based storage strategy called  DCV2  (for "Distributed Cache Version 2").  In this post, I would like to demystify DCV2 by explaining a little bit about the design, inner workings and limitations of DCV2 in its current form.

The Problem :

Terracotta Server Array already provides striping capabilities to the cluster, allowing the users scale to their demands. Before we built DCV2, distributed Ehcache using the "classic storage strategy" had the ability to flush and fault values from a cache at its discretion depending on access pattern and memory requirements, but not the keys. All the keys in the cache had to be resident in all the clients memory.

Keys are generally very small compared to the values. Having the keys in memory at the client nodes helped us achieve better performance for certain operations and also allowed us to do some nifty optimizations around locking and prefetching of values so we could be fast and still be coherent. It also allowed us to run more efficient eviction algorithms at the client nodes since we have more details about access patterns here. These optimizations gave us pretty good performance numbers for smaller to medium sized caches with even up to 5-10 million entries.

But this strategy posed a few issues for very large caches :
  • Since all the keys are present in the client heap, large caches (50-100+ million entries)  imposes bigger heap requirement at the client nodes even though keys are generally small.
  • Java takes a long time to do full GCs of large heaps, so increasing the heap was not a great solution.
  • There is a penalty we have to pay for each node in the cluster  since updates to the cache needs to be broadcasted. This is minimal with all the batching we do but it is still there and can be seen for large clusters with a lot of nodes.
DCV2 for large caches :

So to solve all these issues for very large caches, we built a new storage strategy from ground up and did a lot of optimizations to achieve good scale-out characteristics. Some of the key design aspects of DCV2 are listed below.
  • As you might have guessed by now, in DCV2, all the keys in the cache are not kept at the client nodes thus relieving the memory requirement from the client nodes. They can be thin clients with very little heap but still be connected to a huge cluster and accessing a large cache.
  • The state of the cache is maintained by the Terracotta Server Array as usual.
  • The updates to the caches are not broadcasted to any client nodes in the cluster.
  • To have the cache coherent, locks are given out to the client nodes for the entries they access and they can hold them greedily until someone else in the cluster needs them or they don't need them anymore.
  • We build an efficient local memory cache that DCV2 uses at the client side so that each node can hit the local cache for its hot set and not make multiple trips to the server. This cache grows or shrinks as memory demands. One can also hand tune it for whatever reason.
  • The local memory cache only caches entries that are protected by the locks that are owned by the current node. Hence we can afford to not broadcast any changes and still be coherent with low read latency.
  • The eviction algorithm runs at the Terracotta Server array where it is closest to the data, without needing to fault any data to the client, thus being faster.
Results  :

With DCV2 strategy we got the following benefits.
  • Since the client nodes can now fault and flush entries (as opposed to only values) as needed from the Terracotta Server Array, which can be scaled up as needed, it gives us the ability to scale the cache to over 100+ million entries with a terabyte of data in the cache !
  • Since we don't broadcast any updates to the cache, it performed better esp. with higher number of nodes. In some cases we got up to 5X improvement in write tps while still maintaining the same read tps.
  • Each node doesn't add any extra overhead to the cluster thus helping us scale to a large number of nodes. 
  • The node startup time is also almost instantaneous even when connecting to a large cluster with a large cache.
  • You still get all the benefits of a clustered coherent cache with every low latency and high throughput.
 Limitations :

There are still more features and improvements planned for DCV2. In its current form, it has the following limitations.
  • Some operations like getKeys() which returns a list of all keys are not implemented yet. (You shouldn't call getKeys() on a cache with 100 million entries anyways !) We have plans of supporting these kinds of operation with some kind of cursor in future.
  • Some features like eviction events in Ehcache are not supported with DCV2.
  • Incoherent mode as described in my previous post,  is  not optimized for DCV2 yet.
  • For certain kind of usage pattern with smaller to medium size caches, you might find the classic mode to be a better fit. In general we found DCV2 to be as fast as or faster in most cases esp. for larger caches.
Using DCV2 :

To try out DCV2, you can download our enterprise kit from here. You can enable the new storage strategy either in ehcache.xml or programmatically.

//ehcache.xml 
<cache name="MyLargeCache">
<terracotta clustered="true" storageStrategy="DCV2"/>
</cache>

//programmatically ... 
CacheConfiguration cacheConfiguration = new CacheConfiguration("MyLargeCache");
TerracottaConfiguration terracottaConfig = new TerracottaConfiguration().clustered(true);
terracottaConfig.setStorageStrategy("DCV2");
cacheConfiguration.terracotta(terracottaConfig);
cache = new Cache(cacheConfiguration); ... 

Wednesday, February 17, 2010

Terracotta Distributed Ehcache : New fast incoherent mode


Introduction :

One of the main advantage that Terracotta brings to Ehcache users is Cache coherency in a distributed environment. A coherent cache will always give consistent view of its data. Where as with an incoherent cache, you can read stale data, data that is not valid anymore.

Terracotta as a distributed platform provides scalability, availability and coherent view of data by default. So it was easy for us to implement a coherent cache plugin for ehcache using Terracotta as the distributed platform.

So whats the problem ?

So to keep the cache coherent there is a certain cost involved. For example, row level locking is implemented internally to prevent two threads from two nodes updating the same entry at the same time. Various optimizations like greedy locks, optimistic prefetching, transaction folding etc. have been implemented to make this super fast while keeping the cache coherent.

But we felt there are still some scenarios where users will be willing to compromise cache coherency to gain extra performance. Some scenarios where one might not care about coherent cache are
  • In Read-only cache (no updates - so no stale data)
  • When bulk loading entries into a cache (a cron job type of batch loading during non-peak hours)
  • When warming up the cache for the first time before use
  • In usecases where reading stale values once in a while is acceptable
Solution :

With the latest release of Terracotta (3.2.1) and Ehcache (2.0) , we are introducing a new incoherent mode to our distributed Ehcache. To expose such a mode to the users, we had to enhance the Ehcache Interface to include the following methods.


/**
* Returns true if the cache is in coherent mode cluster-wide. Returns false otherwise.
* <p />
* It applies to coherent clustering mechanisms only e.g. Terracotta
*
* @return true if the cache is in coherent mode cluster-wide, false otherwise
*/
public boolean isClusterCoherent();

/**
* Returns true if the cache is in coherent mode for the current node. Returns false otherwise.
* <p />
* It applies to coherent clustering mechanisms only e.g. Terracotta
*
* @return true if the cache is in coherent mode cluster-wide, false otherwise
*/
public boolean isNodeCoherent();

/**
* Sets the cache in coherent or incoherent mode depending on the parameter on this node.
* Calling {@code setNodeCoherent(true)} when the cache is already in coherent mode or
* calling {@code setNodeCoherent(false)} when already in incoherent mode will be a no-op.
* <p />
* It applies to coherent clustering mechanisms only e.g. Terracotta
*
* @param coherent
* true transitions to coherent mode, false to incoherent mode
* @throws UnsupportedOperationException if this cache does not support coherence, like RMI replication
*/
public void setNodeCoherent(boolean coherent) throws UnsupportedOperationException;

/**
* This method waits until the cache is in coherent mode in all the connected nodes. If the cache is already in coherent mode it returns
* immediately
* <p />
* It applies to coherent clustering mechanisms only e.g. Terracotta
* @throws UnsupportedOperationException if this cache does not support coherence, like RMI replication
*/
public void waitUntilClusterCoherent() throws UnsupportedOperationException;


All these methods are self-explanatory. One important detail to note is that when setting a cache to coherent or incoherent mode by calling setNodeCoherent(), the mode is changed for that cache only in that particular node and not cluster wide, as denoted by the method name. This is done so that if a node set a cache to incoherent mode and then crashes, the cache automatically moves to coherent mode as soon as that node disconnects. This also means every node that wants to use the cache in an incoherent fashion has to call this method.

There is also a new configuration attributed added to the terracotta element in ehcache.xml that can setup a cache in incoherent mode on startup. By default, a cache is always in coherent mode unless set in config.

<xs:element name="terracotta">
<xs:complexType>
<xs:attribute name="coherent" use="optional" type="xs:boolean" default="true"/>
</xs:complexType>
</xs:element>

The waitUntilClusterCoherent() method in the Ehcache interface is provided as a convenient mechanism for multiple cache nodes to synchronize among themselves and to wait until all of them are finished loading in incoherent mode, without using any kind of external distributed cyclic barrier. Anyone calling this method on the cache will wait until all the nodes have the cache back in coherent mode. If the cache is already in coherent mode in all nodes then this method just returns immediately.

Test :

To test the performance of incoherent mode compared to coherent mode, we wrote a simple test using Pet Clinic Data Model but we short circuited the database access just to remove the database bottleneck.

The test loads 300,000 owners with each owner having many pets of various types. It ends up loading about 3,000,000 entries across 7 different caches.

The tests were all run in both coherent mode and incoherent mode on similar 8GB, 16 core Linux boxes connected to same Gigabit ethernet backbone.

Test Results :

Number of Cache Nodes

Number of Threads / Cache Node

Cache Entries to load

Coherent Mode – TPS / Time to Load

Incoherent Mode – TPS / Time to Load

Improvement over coherent mode

1

1

3 Million

943 tps – 318.2 sec

3371 tps – 89.4 sec

3.6X Faster

1

8

3 Million

3093 tps – 97.6 sec

7895 tps – 38.6 sec

2.6X Faster

6

1

3 Million

1251 tps – 240 sec

14405 tps – 21.6 sec

11.5X Faster

6

8

3 Million

1416 tps – 211.9 sec

16245 tps – 18.5 sec

11.5X Faster


Conclusion :

As you can see from the above table, in incoherent mode the cache is faster by a factor of 3X to 12X. The benefits are increasingly huge as you increase the number of nodes in the cluster. In scenarios where one can compromise on cache coherency, users can now take advantage of this new incoherent mode to reap huge performance gains with the latest release of Terracotta 3.2.1 and Ehcache 2.0