org.apache.uima.adapter.jms.activemq
Class ConcurrentMessageListener

java.lang.Object
  extended by org.apache.uima.adapter.jms.activemq.ConcurrentMessageListener
All Implemented Interfaces:
org.springframework.jms.listener.SessionAwareMessageListener

public class ConcurrentMessageListener
extends java.lang.Object
implements org.springframework.jms.listener.SessionAwareMessageListener

Message listener injected at runtime into Aggregate to handle a race condition when multiple threads simultaneously process messages from a Cas Multiplier. It is only used to process messages from a Cas Multiplier and only if the reply queue has more than one consumer thread configured in a deployment descriptor. The listener creates a pool of threads equal to the number of concurrent consumers defined in the DD for the listener on the reply queue. Once the message is handled in onMessage(), it is than delegated for processing to one of the available threads from the pool. This listener guarantees processing order. It receives messages from Spring in a single thread and if it finds a child CAS in the message, it increments the parent (input) CAS child count and delegates processing to the InputChannel instance. The race condition: The Cas Multiplier sends the last child and the parent almost at the same time. Both are received by the aggregate and are processed in different threads, if a scaleout is used on the reply queue. One thread may start processing the input CAS while the other thread (with the last child) is not yet allowed to run. The first thread takes the input CAS all the way to the final step and since at this time, the input CAS has no children ( the thread processing the last child has not updated the child count yet), it will be prematurely released. When the thread with the last child is allowed to run, it finds that the parent no longer exists in the cache.


Constructor Summary
ConcurrentMessageListener(int concurrentThreads, java.lang.Object delegateListener, java.lang.String destination, java.lang.ThreadGroup threadGroup, java.lang.String threadPrefix)
          Creates a listener with a given number of process threads.
 
Method Summary
 java.util.concurrent.ThreadPoolExecutor getTaskExecutor()
           
 void onMessage(javax.jms.Message message, javax.jms.Session session)
          Intercept a message to increment a child count of the input CAS.
 void setAnalysisEngineController(AnalysisEngineController controller)
           
 void stop()
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

ConcurrentMessageListener

public ConcurrentMessageListener(int concurrentThreads,
                                 java.lang.Object delegateListener,
                                 java.lang.String destination,
                                 java.lang.ThreadGroup threadGroup,
                                 java.lang.String threadPrefix)
                          throws java.io.InvalidClassException
Creates a listener with a given number of process threads. This listener is injected between Spring and JmsInputChannel to enable orderly processing of CASes. This listener is only used on reply queues that have scale out attribute in DD greater than 1. Its main job is to increment number of child CASes for a given input CAS. It does so in a single thread, and once it completes the update this listener submits the CAS for further processing up to the JmsInputChannel. The CAS is submitted to a queue where the executor assigns a free thread to process the CAS.

Parameters:
concurrentThreads - - number of threads to use to process CASes
delegateListener - - JmsInputChannel instance to delegate CAS to
Throws:
java.io.InvalidClassException
Method Detail

getTaskExecutor

public java.util.concurrent.ThreadPoolExecutor getTaskExecutor()

stop

public void stop()

setAnalysisEngineController

public void setAnalysisEngineController(AnalysisEngineController controller)

onMessage

public void onMessage(javax.jms.Message message,
                      javax.jms.Session session)
               throws javax.jms.JMSException
Intercept a message to increment a child count of the input CAS. This method is always called in a single thread, guaranteeing order of processing. The child CAS will always come here first. Once the count is updated, or this CAS is not an child, the message will be delegated to one of the threads in the pool that will eventually call InputChannel object where the actual processing of the message begins.

Specified by:
onMessage in interface org.springframework.jms.listener.SessionAwareMessageListener
Throws:
javax.jms.JMSException


Copyright © 2011. All Rights Reserved.