Monday, November 8, 2010

Implementing a fast concurrent Object Manager for Terracotta Server Array

If you have been following the latest news about Terracotta, you probably already know that we recently released a new feature called  BigMemory. It is a great solution for efficiently using very large amount of memory (tested with up to 350 GB) in Java. More info on  BigMemory can be found here and here.

When implementing such a game changing feature like BigMemory, it exposes certain bottlenecks and behaviors that you never thought existed in the system. Putting in BigMemory into our server compelled us to do various kinds of optimizations to our system to really take advantage of all the memory BigMemory made available to us. This blog is about one bottleneck that we discovered during the process and how we solved it by writing a more concurrent resource manager to replace the old one.

Identifying the bottleneck :

First step in solving a bottleneck is identifying you have one. And to be able to do that you would have to have a good test showing the problem. My colleague, Steve, has written a multi-part blog sometime back about writing a good test, so I am not going to go into that here.

In one of our test cases, we saw that adding more resources (like more client VMs, machines, RAM etc.) didn't help after a certain point. The TPS flat-lined after the initial gain. A quick thread dump of the Terracotta Server revealed this.

"WorkerThread(server_map_request_stage, 5, 0)" daemon prio=10 tid=0x00002aac3856f000 nid=0x250b waiting for monitor entry [0x000000004bb27000]
   java.lang.Thread.State: BLOCKED (on object monitor)
    at com.tc.objectserver.impl.ObjectManagerImpl.lookupObjectsForOptionallyCreate(ObjectManagerImpl.java:202)
    - waiting to lock <0x00002aaab41f6c90> (a com.tc.objectserver.impl.ObjectManagerImpl)
    at com.tc.objectserver.impl.ObjectManagerImpl.lookupObjectsFor(ObjectManagerImpl.java:196)
    at com.tc.objectserver.handler.ServerTCMapRequestHandler.handleEvent(ServerTCMapRequestHandler.java:26)
    at com.tc.async.impl.StageImpl$WorkerThread.run(StageImpl.java:127)

"WorkerThread(server_map_request_stage, 4, 0)" daemon prio=10 tid=0x00002aac3856d000 nid=0x250a runnable [0x000000004ba26000]
   java.lang.Thread.State: RUNNABLE
    at com.tc.objectserver.managedobject.ManagedObjectImpl.getFlag(ManagedObjectImpl.java:110)
    - locked <0x00002aaab534c2b0> (a com.tc.objectserver.managedobject.ManagedObjectImpl)
    at com.tc.objectserver.managedobject.ManagedObjectImpl.isReferenced(ManagedObjectImpl.java:304)
    at com.tc.objectserver.impl.ObjectManagerImpl.markReferenced(ObjectManagerImpl.java:255)
    at com.tc.objectserver.impl.ObjectManagerImpl.processObjectsRequest(ObjectManagerImpl.java:833)
    at com.tc.objectserver.impl.ObjectManagerImpl.basicLookupObjectsFor(ObjectManagerImpl.java:480)
    - locked <0x00002aaab41f6c90> (a com.tc.objectserver.impl.ObjectManagerImpl)
    at com.tc.objectserver.impl.ObjectManagerImpl.lookupObjectsForOptionallyCreate(ObjectManagerImpl.java:208)
    - locked <0x00002aaab41f6c90> (a com.tc.objectserver.impl.ObjectManagerImpl)
    at com.tc.objectserver.impl.ObjectManagerImpl.lookupObjectsFor(ObjectManagerImpl.java:196)

Only a couple of threads are shown above but if you look at the full thread dump you can see that many threads are waiting for the same Object Manager lock while one thread is doing some processing. Our Object Manager subsystem was not written to be concurrent. Only one  thread can act on it at a time.

The old non-concurrent Object Manager :

The Object Manager is a central subsystem in Terracotta Server Array that is responsible for managing and accounting distributed objects. It keeps track of  what objects exists in the system, what are currently being used and who is waiting for them among other things. Other subsystems like the Transaction processor,  Distributed Garbage collector (DGC), Cache Manager etc. come to the Object Manager to checkout the objects that they need to work on. Once they are done with it, they check the objects back in for others to use.

Terracotta uses a staged event driven architecture (SEDA) internally and the requests to the ObjectManager are batched conveniently at higher level, so there were only few threads interacting directly with the Object Manager. Also the act of checking out and checking in objects require the Object Manager to be locked but the processing of request (like READ/WRITE/UPDATE etc.) happens outside the lock, so the lock is held for a very short time. So the non-concurrent Object Manager was never a problem until now.

But with DCV2 and BigMemory things changed. There is more load on the Object Manager, more objects managed by it and hence greater lock contention in the Object Manager.  So we set on a path to make the non-concurrent Object Manager  subsystem into a more concurrent one.

Requirements for the new concurrent Object Manager :

1. If a thread T1 requests objects A, B and C, you can only give all the objects or none of the objects to T1.

2. If an object (say C) is not available, because it is currently being used by someone else or is currently paged to disk, it should be given to the requester (T1) as soon as its available at a later point in time. In the mean time requests to A and B can be furnished to other requesters.

3. When Distributed garbage collector (DGC) is running, it should have the ability to pause the ObjectManager. This means any currently processing requests should complete, but new requests should be queued up and DGC should be notified so that it can complete its rescue phase.

4. Cache Manager, which is responsible for managing the object cache internally, could be running simultaneously flushing objects from heap to off-heap and from off-heap to disk.

5. It should be fast and lock-free (mostly, i.e. no global locks in check-in/check-out path) so all these operations can take place concurrently. It is OK to lock when DGC requests it to pause.

With these requirements in mind we set out to write a new Object Manager. Some of the core logic that we employed are listed below.

1 : Try to mark objects for use and rollback on failure

When requests are being processed concurrently, the state is changing quickly and concurrently. If two requests for objects (A, B and C) and objects (C and D)  are processed concurrently, only one can be furnished at a time because of the common object C and rule 1. Note that such collisions to top level objects are not uncommon but when you have thousands of  requests processing on millions of objects per second, there are also many requests that are running successfully in parallel.

So we have an "in-use" bit in every object. For every object in the request, we try to mark that bit atomically if its not set.  If we are able to mark all the objects required for processing the request, then we are allowed to carry on with the request. If not we have to roll back, i.e. unmark the objects we just marked so far, and add the request to a blocked request queue.

When the object becomes available again, the request becomes pending and is processed again.

2 : Maintain lock order

If  two requests for objects (A, B and C) and (C, D and A) are processed concurrently as mentioned above, we could end up in a situation where Request 1 is waiting for Request 2 to complete while Request 2 is waiting for Request 1 to complete. In order to avoid these circular dependencies and the classic deadlock that comes with it, we always process the objects in a request in ascending order of its ids thus maintaining a consistent order to avoid deadlocks.

3. Check, change and recheck state

Lets say we couldn't process a request because an object (C) is not available and we decide to add it to the blocked  queue, but before we could add it to the blocked queue and signal the requester that the request went pending and that it will be furnished in future,  the object C could be checked back in and become available for use, as things are running in parallel. This will lead to a request ending up in the blocked queue when all requested objects are available and breaks rule 2 (and may lead to a hung request)

To avoid this we recheck the state again after we add to the blocked queue. If things changed in the meantime, we rerun the request until either the request is successfully furnished or successfully added to the blocked queue without the state changing underneath us for that object.

4. Initiate faulting from disk once and only once

When a requested object is not available in memory, we initiate a fault from disk or off-heap but while the object is being faulted we don't want to initiate more fault requests for that same object from other requests. To do this we add a marker object that is "always" in-use (mark bit set) and use CHM's putIfAbsent to decide who should initate the fault. For the time it takes to fault from disk, this object is "in-use" for all other requests and thus they go to blocked request queue automatically.

5. Pause and unpause the Object Manager when needed for DGC

Every once in a while when Distributed Garbage Collector (DGC) is running, it may need the Object Manager to stop processing any request during its final rescue phase for a short time so it can come up with the final set of objects that are garbage and can be deleted from the system. (Rule 3)

To achieve this and also rule 5, we introduced a ReentrantReadWriteLock (RRWL). Every request must acquire a READ lock  before processing the request in the ObjectManager. When DGC requests the Object Manager to pause, it changes the state to not allow further requests and then acquires a WRITE lock and  waits for all currently operating requests to complete. We maintain the number of objects that are currently checked out. When it reaches zero, DGC can continue its final rescue. This way all the requests are processed concurrently while holding READ lock but DGC still can pause the processing when needed by grabbing the WRITE lock. This could have also been done using a more complicated state machine but this solution proved simple and gave good performance.

Conclusion :

So with all these changes we were able to remove all global locks from lookup code path in  Object Manager  thus eliminating the bottleneck we saw earlier in the thread dump. We got a very good improvement to the performance with this change.  Also with BigMemory, we were able to scale up linearly up to 350 GB of data even as we add more load to the system. In fact for some heavy read use case, we saw 2X improvement in TPS for even the regular distributed cache without BigMemory.

For the adventurous ones out there, the source for the new concurrent Object Manager can be found here.  If you want to checkout our open source Terracotta platform and play with it, you could get the source or download it from here.



11 comments:

  1. Hi Saravanan,

    very nice blog entry.
    Just one note about point 5: isn't the acquisition of a read lock for every request too costly?
    In my experience, read locks are not just that lightweight (you obviously preserve throughput but increase latency), so I solved a similar use case by employing an atomic flag variable (for simulating the lock state) and an atomic counter (for counting threads which acquired "read" state), so avoiding to use explicit locks but only CAS operations.

    Feel free to get back to discuss at length ;)

    ReplyDelete
  2. Hi Sergio,

    Thanks. About point 5, the read lock acquisition didn't prove to be that costly for our usecase. In our usecase, there is seldom a write lock request, so mostly the read locks are not contending with write locks. Also we do a good amount of batching at the higher layer so that not every request translates to a lock acquisition. Also with batching generally if we add to latency then we batch better, so our overall throughput ends up being same.

    Out of curiosity, I also wrote a simple test program to see how fast I can acquire read locks with 50 threads contending simultaneously. In my 2.2GHz Core2duo laptop, I was able to do about 958K lock acquisitions with 0% write. This number only dips to 949K with 2% write lock requests. So it seems like pretty fast with the latest jdks.

    ReplyDelete
  3. Hi Saravanan,

    thanks for the response!
    Is this new object manager included in Terracotta 3.4.0 open source release?

    ReplyDelete
  4. Yes, all these changes and optimizations are available in Terracotta 3.4 open source release.

    ReplyDelete
  5. Hi Saravanan and the TC guys, would you like to provide your inputs from a TC perspective about Scalable frameworks - http://code.google.com/p/scalable-frameworks/ ?

    Especially with the spreadsheet described here - http://javaforu.blogspot.com/2010/11/scalable-compute-storage-frameworks.html - just a few words in each column.

    Regards,
    Ashwin.

    ReplyDelete
  6. Awesome post .I am facing a problem when i am using Ehcache in Transaction Mode .I could notOn and off transaction in runtime So i need to include every Action in Ehcache inside transaction .This cause a huge Overhead .Any Suggestion for this ?

    ReplyDelete
  7. Unfortunately transactions are all or nothing in Ehcache. But there different txn modes and local txns are pretty fast. Check out http://ehcache.org/documentation/jta.html

    ReplyDelete
  8. Hi Thank you for your reply.Can i use Bulk Upload in transaction mode

    ReplyDelete
  9. I need to use Txn mode XA n when i am punting 10 k data in ehcache using transaction(This is a requirement ) it takes long time .Can you suggest any action to improve perf

    ReplyDelete
  10. Did u checkout local transactions ? Check out the link I posted earlier. Also ehcache forum is a good place to post these questions coz many people monitor it. http://forums.terracotta.org/forums/forums/show/16.page

    ReplyDelete