Monday, November 8, 2010

Implementing a fast concurrent Object Manager for Terracotta Server Array

If you have been following the latest news about Terracotta, you probably already know that we recently released a new feature called  BigMemory. It is a great solution for efficiently using very large amount of memory (tested with up to 350 GB) in Java. More info on  BigMemory can be found here and here.

When implementing such a game changing feature like BigMemory, it exposes certain bottlenecks and behaviors that you never thought existed in the system. Putting in BigMemory into our server compelled us to do various kinds of optimizations to our system to really take advantage of all the memory BigMemory made available to us. This blog is about one bottleneck that we discovered during the process and how we solved it by writing a more concurrent resource manager to replace the old one.

Identifying the bottleneck :

First step in solving a bottleneck is identifying you have one. And to be able to do that you would have to have a good test showing the problem. My colleague, Steve, has written a multi-part blog sometime back about writing a good test, so I am not going to go into that here.

In one of our test cases, we saw that adding more resources (like more client VMs, machines, RAM etc.) didn't help after a certain point. The TPS flat-lined after the initial gain. A quick thread dump of the Terracotta Server revealed this.

"WorkerThread(server_map_request_stage, 5, 0)" daemon prio=10 tid=0x00002aac3856f000 nid=0x250b waiting for monitor entry [0x000000004bb27000]
   java.lang.Thread.State: BLOCKED (on object monitor)
    at com.tc.objectserver.impl.ObjectManagerImpl.lookupObjectsForOptionallyCreate(ObjectManagerImpl.java:202)
    - waiting to lock <0x00002aaab41f6c90> (a com.tc.objectserver.impl.ObjectManagerImpl)
    at com.tc.objectserver.impl.ObjectManagerImpl.lookupObjectsFor(ObjectManagerImpl.java:196)
    at com.tc.objectserver.handler.ServerTCMapRequestHandler.handleEvent(ServerTCMapRequestHandler.java:26)
    at com.tc.async.impl.StageImpl$WorkerThread.run(StageImpl.java:127)

"WorkerThread(server_map_request_stage, 4, 0)" daemon prio=10 tid=0x00002aac3856d000 nid=0x250a runnable [0x000000004ba26000]
   java.lang.Thread.State: RUNNABLE
    at com.tc.objectserver.managedobject.ManagedObjectImpl.getFlag(ManagedObjectImpl.java:110)
    - locked <0x00002aaab534c2b0> (a com.tc.objectserver.managedobject.ManagedObjectImpl)
    at com.tc.objectserver.managedobject.ManagedObjectImpl.isReferenced(ManagedObjectImpl.java:304)
    at com.tc.objectserver.impl.ObjectManagerImpl.markReferenced(ObjectManagerImpl.java:255)
    at com.tc.objectserver.impl.ObjectManagerImpl.processObjectsRequest(ObjectManagerImpl.java:833)
    at com.tc.objectserver.impl.ObjectManagerImpl.basicLookupObjectsFor(ObjectManagerImpl.java:480)
    - locked <0x00002aaab41f6c90> (a com.tc.objectserver.impl.ObjectManagerImpl)
    at com.tc.objectserver.impl.ObjectManagerImpl.lookupObjectsForOptionallyCreate(ObjectManagerImpl.java:208)
    - locked <0x00002aaab41f6c90> (a com.tc.objectserver.impl.ObjectManagerImpl)
    at com.tc.objectserver.impl.ObjectManagerImpl.lookupObjectsFor(ObjectManagerImpl.java:196)

Only a couple of threads are shown above but if you look at the full thread dump you can see that many threads are waiting for the same Object Manager lock while one thread is doing some processing. Our Object Manager subsystem was not written to be concurrent. Only one  thread can act on it at a time.

The old non-concurrent Object Manager :

The Object Manager is a central subsystem in Terracotta Server Array that is responsible for managing and accounting distributed objects. It keeps track of  what objects exists in the system, what are currently being used and who is waiting for them among other things. Other subsystems like the Transaction processor,  Distributed Garbage collector (DGC), Cache Manager etc. come to the Object Manager to checkout the objects that they need to work on. Once they are done with it, they check the objects back in for others to use.

Terracotta uses a staged event driven architecture (SEDA) internally and the requests to the ObjectManager are batched conveniently at higher level, so there were only few threads interacting directly with the Object Manager. Also the act of checking out and checking in objects require the Object Manager to be locked but the processing of request (like READ/WRITE/UPDATE etc.) happens outside the lock, so the lock is held for a very short time. So the non-concurrent Object Manager was never a problem until now.

But with DCV2 and BigMemory things changed. There is more load on the Object Manager, more objects managed by it and hence greater lock contention in the Object Manager.  So we set on a path to make the non-concurrent Object Manager  subsystem into a more concurrent one.

Requirements for the new concurrent Object Manager :

1. If a thread T1 requests objects A, B and C, you can only give all the objects or none of the objects to T1.

2. If an object (say C) is not available, because it is currently being used by someone else or is currently paged to disk, it should be given to the requester (T1) as soon as its available at a later point in time. In the mean time requests to A and B can be furnished to other requesters.

3. When Distributed garbage collector (DGC) is running, it should have the ability to pause the ObjectManager. This means any currently processing requests should complete, but new requests should be queued up and DGC should be notified so that it can complete its rescue phase.

4. Cache Manager, which is responsible for managing the object cache internally, could be running simultaneously flushing objects from heap to off-heap and from off-heap to disk.

5. It should be fast and lock-free (mostly, i.e. no global locks in check-in/check-out path) so all these operations can take place concurrently. It is OK to lock when DGC requests it to pause.

With these requirements in mind we set out to write a new Object Manager. Some of the core logic that we employed are listed below.

1 : Try to mark objects for use and rollback on failure

When requests are being processed concurrently, the state is changing quickly and concurrently. If two requests for objects (A, B and C) and objects (C and D)  are processed concurrently, only one can be furnished at a time because of the common object C and rule 1. Note that such collisions to top level objects are not uncommon but when you have thousands of  requests processing on millions of objects per second, there are also many requests that are running successfully in parallel.

So we have an "in-use" bit in every object. For every object in the request, we try to mark that bit atomically if its not set.  If we are able to mark all the objects required for processing the request, then we are allowed to carry on with the request. If not we have to roll back, i.e. unmark the objects we just marked so far, and add the request to a blocked request queue.

When the object becomes available again, the request becomes pending and is processed again.

2 : Maintain lock order

If  two requests for objects (A, B and C) and (C, D and A) are processed concurrently as mentioned above, we could end up in a situation where Request 1 is waiting for Request 2 to complete while Request 2 is waiting for Request 1 to complete. In order to avoid these circular dependencies and the classic deadlock that comes with it, we always process the objects in a request in ascending order of its ids thus maintaining a consistent order to avoid deadlocks.

3. Check, change and recheck state

Lets say we couldn't process a request because an object (C) is not available and we decide to add it to the blocked  queue, but before we could add it to the blocked queue and signal the requester that the request went pending and that it will be furnished in future,  the object C could be checked back in and become available for use, as things are running in parallel. This will lead to a request ending up in the blocked queue when all requested objects are available and breaks rule 2 (and may lead to a hung request)

To avoid this we recheck the state again after we add to the blocked queue. If things changed in the meantime, we rerun the request until either the request is successfully furnished or successfully added to the blocked queue without the state changing underneath us for that object.

4. Initiate faulting from disk once and only once

When a requested object is not available in memory, we initiate a fault from disk or off-heap but while the object is being faulted we don't want to initiate more fault requests for that same object from other requests. To do this we add a marker object that is "always" in-use (mark bit set) and use CHM's putIfAbsent to decide who should initate the fault. For the time it takes to fault from disk, this object is "in-use" for all other requests and thus they go to blocked request queue automatically.

5. Pause and unpause the Object Manager when needed for DGC

Every once in a while when Distributed Garbage Collector (DGC) is running, it may need the Object Manager to stop processing any request during its final rescue phase for a short time so it can come up with the final set of objects that are garbage and can be deleted from the system. (Rule 3)

To achieve this and also rule 5, we introduced a ReentrantReadWriteLock (RRWL). Every request must acquire a READ lock  before processing the request in the ObjectManager. When DGC requests the Object Manager to pause, it changes the state to not allow further requests and then acquires a WRITE lock and  waits for all currently operating requests to complete. We maintain the number of objects that are currently checked out. When it reaches zero, DGC can continue its final rescue. This way all the requests are processed concurrently while holding READ lock but DGC still can pause the processing when needed by grabbing the WRITE lock. This could have also been done using a more complicated state machine but this solution proved simple and gave good performance.

Conclusion :

So with all these changes we were able to remove all global locks from lookup code path in  Object Manager  thus eliminating the bottleneck we saw earlier in the thread dump. We got a very good improvement to the performance with this change.  Also with BigMemory, we were able to scale up linearly up to 350 GB of data even as we add more load to the system. In fact for some heavy read use case, we saw 2X improvement in TPS for even the regular distributed cache without BigMemory.

For the adventurous ones out there, the source for the new concurrent Object Manager can be found here.  If you want to checkout our open source Terracotta platform and play with it, you could get the source or download it from here.