Class ProcessingUnit

java.lang.Object
java.lang.Thread
org.apache.uima.collection.impl.cpm.engine.ProcessingUnit
All Implemented Interfaces:
Runnable

public class ProcessingUnit extends Thread
This component executes the processing pipeline. Running in a seperate thread it continuously reads bundles of Cas from the Work Queue filled by ArtifactProducer and sends it through configured CasProcessors. The sequence in which CasProcessors are invoked is defined by the order of Cas Processor listing in the cpe descriptor. The results of analysis produced be Cas Processors is enqueued onto an output queue that is shared with Cas Consumers.
  • Field Details

    • threadState

      public int threadState
    • casPool

      protected CPECasPool casPool
    • releaseCAS

      protected boolean releaseCAS
    • cpm

      protected CPMEngine cpm
    • workQueue

      protected BoundedWorkQueue workQueue
    • outputQueue

      protected BoundedWorkQueue outputQueue
    • mConverter

      protected CasConverter mConverter
    • processingUnitProcessTrace

      protected ProcessTrace processingUnitProcessTrace
    • processContainers

      protected LinkedList processContainers
    • numToProcess

      protected long numToProcess
    • casList

      protected CAS[] casList
    • statusCbL

      protected ArrayList statusCbL
    • notifyListeners

      protected boolean notifyListeners
    • conversionCas

      protected CAS conversionCas
    • artifact

      protected Object[] artifact
    • conversionCasArray

      protected CAS[] conversionCasArray
    • timer

      protected UimaTimer timer
    • threadId

      protected String threadId
    • cpeConfiguration

      protected CpeConfiguration cpeConfiguration
    • casCache

      private CAS[] casCache
    • isCasConsumerPipeline

      private boolean isCasConsumerPipeline
    • isRunning

      private boolean isRunning
    • timer01

      public long timer01
    • timer02

      public long timer02
    • timer03

      public long timer03
    • timer04

      public long timer04
    • timer05

      public long timer05
    • timer06

      public long timer06
    • zeroLengthObjectArray

      private static final Object[] zeroLengthObjectArray
      loggers Special forms for frequent args sets "maybe" versions test isLoggable Additional args passed as object array to logger
    • thisClassName

      private static final String thisClassName
  • Constructor Details

    • ProcessingUnit

      public ProcessingUnit()
    • ProcessingUnit

      public ProcessingUnit(CPMEngine acpm, BoundedWorkQueue aInputQueue, BoundedWorkQueue aOutputQueue)
      Initialize the PU
      Parameters:
      acpm - - component managing life cycle of the CPE
      aInputQueue - - queue to read from
      aOutputQueue - - queue to write to
    • ProcessingUnit

      public ProcessingUnit(CPMEngine acpm)
  • Method Details

    • isRunning

      public boolean isRunning()
      Returns true if this component is in running state.
      Returns:
      - true if running, false otherwise
    • setCasConsumerPipelineIdentity

      public void setCasConsumerPipelineIdentity()
      Define a CasConsumer Pipeline identity for this instance
    • isCasConsumerPipeline

      public boolean isCasConsumerPipeline()
    • setInputQueue

      public void setInputQueue(BoundedWorkQueue aInputQueue)
      Alternative method of providing a queue from which this PU will read bundle of Cas
      Parameters:
      aInputQueue - - read queue
    • setOutputQueue

      public void setOutputQueue(BoundedWorkQueue aOutputQueue)
      Alternative method of providing a queue where this PU will deposit results of analysis
      Parameters:
      aOutputQueue - - queue to write to
    • setCPMEngine

      public void setCPMEngine(CPMEngine acpm)
      Alternative method of providing the reference to the component managing the lifecycle of the CPE
      Parameters:
      acpm - - reference to the contrlling engine
    • cleanup

      public void cleanup()
      Null out fields of this object. Call this only when this object is no longer needed.
    • setNotifyListeners

      public void setNotifyListeners(boolean aDoNotify)
      Set a flag indicating if notifications should be made via configured Listeners
      Parameters:
      aDoNotify - - true if notification is required, false otherwise
    • addStatusCallbackListener

      public void addStatusCallbackListener(BaseStatusCallbackListener aListener)
      Plugs in Listener object used for notifications.
      Parameters:
      aListener - - BaseStatusCallbackListener instance
    • getCallbackListeners

      public ArrayList getCallbackListeners()
      Returns list of listeners used by this PU for callbacks.
      Returns:
      - lif of BaseStatusCallbackListener instances
    • removeStatusCallbackListener

      public void removeStatusCallbackListener(BaseStatusCallbackListener aListener)
      Removes given listener from the list of listeners
      Parameters:
      aListener - - object to remove from the list
    • setProcessingUnitProcessTrace

      public void setProcessingUnitProcessTrace(ProcessTrace aProcessingUnitProcessTrace)
      Plugs in ProcessTrace object used to collect statistics
      Parameters:
      aProcessingUnitProcessTrace - - object to compile stats
    • setUimaTimer

      public void setUimaTimer(UimaTimer aTimer)
      Plugs in custom timer used by the PU for getting time
      Parameters:
      aTimer - - custom timer to use
    • setContainers

      public void setContainers(LinkedList processorList)
      Plugs in a list of Cas Processor containers. During processing Cas Processors in this list are called sequentially. Each Cas Processor is contained in the container that is managing errors, counts and totals, and restarts.
      Parameters:
      processorList - CASProcessor to be added to the processing pipeline
    • disableCasProcessor

      public void disableCasProcessor(int aCasProcessorIndex)
      Disable a CASProcessor in the processing pipeline. Locate it by provided index. The disabled Cas Processor remains in the Processing Pipeline, however it is not used furing processing.
      Parameters:
      aCasProcessorIndex - - location in the pipeline of the Cas Processor to delete
    • disableCasProcessor

      public void disableCasProcessor(String aCasProcessorName)
      Alternative method to disable Cas Processor. Uses a name to locate it.
      Parameters:
      aCasProcessorName - - a name of the Cas Processor to disable
    • enableCasProcessor

      public void enableCasProcessor(String aCasProcessorName)
      Enables Cas Processor with a given name. Enabled Cas Processor will immediately begin to receive bundles of Cas.
      Parameters:
      aCasProcessorName - - name of the Cas Processor to enable
    • getProcessTrace

      private ProcessTrace getProcessTrace()
      Returns a ProcessTrace instance used by this component
      Returns:
      - ProcessTrace instance
    • handleEOFToken

      private void handleEOFToken() throws Exception
      Handles EOFToken. This object is received when the CPM terminates. This token is passed to each running processing thread and cas consumer thread to allow orderly shutdown. The EOFToken may be generated by ArtifactProducer if end of collection is reached, or the CPM itself can place it in the Work Queue to force all processing threads to stop.
      Throws:
      Exception - -
    • releaseTimedOutCases

      private void releaseTimedOutCases(Object[] artifact)
      Release CAS back to the CAS Pool. This method is only used when chunk-aware queue is used. When a document is chunked each chunk represents a portion of the document. These chunks are ingested in sequential order by the Cas Consumer. The delivery of chunks in the correct sequence ( chunk seg 1 before chunk sequence 2) is guaranteed. Since chunks are processed asynchronously ( if multi pipeline configuration is used), they may arrive in the queue out of sequence. If this happens the Cas Consumer will wait for an expected chunk sequence. If such chunk does not arrive in configured interval the entire sequence ( all related chunks (CASes) ) are invalidated. Invalidated in the sense that they are marked as timed out. Each CAS will be released back to the CAS Pool.
      Parameters:
      artifact - - an array of CAS instances
    • isCpmPaused

      private void isCpmPaused()
    • run

      public void run()
      Starts the Processing Pipeline thread. This thread waits for an artifact to arrive on configured Work Queue. Once the CAS arrives, it is removed from the queue and sent through the analysis pipeline.
      Specified by:
      run in interface Runnable
      Overrides:
      run in class Thread
    • clearCasCache

      private void clearCasCache()
      Releases all CAS instances from the Cache back to the Cas Pool. Cas Cache is used as optimization to store CAS in case it is needed for conversion. Specifically, in configurations that use XCAS and CAS based AEs.
    • consumeQueue

      public boolean consumeQueue()
      Consumes the input queue to make sure all bundles still there get processede before CPE terminates.
    • processNext

      protected boolean processNext(Object[] aCasObjectList, ProcessTrace pTrTemp) throws ResourceProcessException, IOException, CollectionException, AbortCPMException, KillPipelineException
      Executes the processing pipeline. Given bundle of Cas instances is processed by each Cas Processor in the pipeline. Conversions between different types of Cas Processors is done on the fly. Two types of Cas Processors are currently supported:
    • CasDataProcessor
    • CasObjectProcessor
    • The first operates on instances of CasData the latter operates on instances of CAS. The results produced by Cas Processors are added to the output queue.
      Parameters:
      aCasObjectList - - bundle of Cas to analyze
      pTrTemp - - object used to aggregate stats
      Throws:
      ResourceProcessException
      IOException
      CollectionException
      AbortCPMException
      KillPipelineException
    • postAnalysis

      private void postAnalysis(Object[] aCasObjectList, boolean isCasObject, Object[] casObjects, ProcessTrace aProcessTr, boolean doneAlready) throws Exception
      Notifies application listeners of completed analysis and stores results of analysis (CAS) in the Output Queue that this thread shares with a Cas Consumer thread.
      Parameters:
      aCasObjectList - - List of Artifacts just analyzed
      isCasObject - - determines the types of CAS just analyzed ( CasData vs CasObject)
      casObjects -
      aProcessTr - - ProcessTrace object holding events and stats
      doneAlready - - flag to indicate if the last Cas Processor was released back to its container
      Throws:
      Exception - -
    • doEndOfBatchProcessing

      private void doEndOfBatchProcessing(ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTr, Object[] aCasObjectList)
      Performs end of batch processing. It delegates the processing to Cas Processor container. The container using configuration determines if its time to call Cas Processor's batchProcessComplete() method.
      Parameters:
      aContainer - - container performing end of batch processing
      aProcessor - - Cas Processor to call on end of batch
      aProcessTr - - Process Trace to use for aggregating events
      aCasObjectList - - CASes just analyzed
    • handleSkipCasProcessor

      private void handleSkipCasProcessor(ProcessingContainer aContainer, Object[] aCasObjectList, boolean isLastCP) throws Exception
      In case a CAS is skipped ( due to excessive exceptions that it causes ), increments stats and totals
      Parameters:
      aContainer -
      aCasObjectList -
      isLastCP -
      Throws:
      Exception - -
    • handleServiceException

      private void handleServiceException(ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTr, Exception ex) throws Exception
      Handle exceptions related to remote invocations.
      Parameters:
      aContainer - - container managing CasProcessor that failed
      aProcessor - - failed CasProcessor
      aProcessTr - - ProcessTrace object holding events
      ex - - Source exception
      Throws:
      Exception - -
    • handleAbortCasProcessor

      private void handleAbortCasProcessor(ProcessingContainer aContainer, CasProcessor aProcessor) throws Exception
      Diables currect CasProcessor.
      Parameters:
      aContainer - - a container that manages the current Cas Processor.
      aProcessor - - a Cas Processor to be disabled
      Throws:
      Exception - - exception
    • handleAbortCPM

      private void handleAbortCPM(ProcessingContainer aContainer, CasProcessor aProcessor) throws Exception
      Terminates the CPM
      Parameters:
      aContainer - - a container that manages the current Cas Processor.
      aProcessor - - a Cas Processor to be disabled
      Throws:
      Exception - - exception
    • handleKillPipeline

      private void handleKillPipeline(ProcessingContainer aContainer) throws Exception
      Terminates the CPM
      Parameters:
      aContainer - - a container that manages the current Cas Processor.
      aProcessor - - a Cas Processor to be disabled
      Throws:
      Exception - - exception
    • pauseContainer

      private boolean pauseContainer(ProcessingContainer aContainer, Exception aException, String aThreadId)
      Determines if the thread should be paused. Pausing container effectively pauses ALL Cas Processors that are managed by the container. The pause is needed when there are multiple pipelines shareing a common service. If this service dies (Socket Down), only one thread should initiate service restart. While the service is being restarted no invocations on the service should be done. Containers will be resumed on successfull service restart.
      Parameters:
      aContainer - - a container that manages the current Cas Processor.
      aThreadId - - id of the current thread
      aProcessor - - a Cas Processor to be disabled
      Throws:
      Exception - - exception
    • releaseCases

      private void releaseCases(Object aCasList, boolean lastProcessor, String aName)
      Conditionally, releases CASes back to the CAS pool. The release only occurs if the Cas Processor is the last in the processing chain.
      Parameters:
      aCasList - - list of CASes to release
      lastProcessor - - determines if the release takes place
      aContainer - - current container
    • notifyListeners

      protected void notifyListeners(Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus)
      Notifies Listeners of the fact that the pipeline has finished processing the current set Cas'es
      Parameters:
      aCas - - object containing an array of OR a single instance of Cas
      isCasObject - - true if instance of Cas is of type Cas, false otherwise
      aEntityProcStatus - - status object that may contain exceptions and trace
    • doNotifyListeners

      protected void doNotifyListeners(Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus)
      Notifies all configured listeners. Makes sure that appropriate type of Cas is sent to the listener. Convertions take place to ensure compatibility.
      Parameters:
      aCas - - Cas to pass to listener
      isCasObject - - true is Cas is of type CAS
      aEntityProcStatus - - status object containing exceptions and trace info
    • setReleaseCASFlag

      public void setReleaseCASFlag(boolean aFlag)
      Called by the CPMEngine during setup to indicate that this thread is supposed to release a CAS at the end of processing. This is typically done for Cas Consumer thread, but in configurations not using Cas Consumers The processing pipeline may also release the CAS.
      Parameters:
      aFlag - - true if this thread should release a CAS when analysis is complete
    • stopCasProcessors

      public void stopCasProcessors(boolean kill)
      Stops all Cas Processors that are part of this PU.
      Parameters:
      kill - - true if CPE has been stopped before finishing processing during external stop
    • endOfProcessingReached

      protected boolean endOfProcessingReached(long aCount)
      Returns true if the CPM has finished analyzing the collection.
      Parameters:
      aCount - - running total of documents processed so far
      Returns:
      - true if CPM has processed all docs, false otherwise
    • process

      protected void process(Object anArtifact)
      Parameters:
      anArtifact -
    • showMetadata

      protected void showMetadata(Object[] aCasList)
      Parameters:
      aCasList -
    • isProcessorReady

      protected boolean isProcessorReady(int aStatus)
      Check if the CASProcessor status is available for processing
    • getBytes

      protected long getBytes(Object aCas)
      Returns the size of the CAS object. Currently only CASData is supported.
      Parameters:
      aCas - - Cas to get the size for
      Returns:
      the size of the CAS object. Currently only CASData is supported.
    • setCasPool

      public void setCasPool(CPECasPool aPool)
      Parameters:
      aPool -
    • filterOutTheCAS

      private boolean filterOutTheCAS(ProcessingContainer aContainer, boolean isCasObject, Object[] aCasObjectList)
    • containerDisabled

      private boolean containerDisabled(ProcessingContainer aContainer)
    • analyze

      protected boolean analyze(Object[] aCasObjectList, ProcessTrace pTrTemp) throws Exception
      An alternate processing loop designed for the single-threaded CPM.
      Parameters:
      aCasObjectList - - a list of CASes to analyze
      pTrTemp - - process trace where statistics are added during analysis
      Throws:
      Exception
    • doReleaseCasProcessor

      private void doReleaseCasProcessor(ProcessingContainer aContainer, CasProcessor aCasProcessor)
    • doEndOfBatch

      private void doEndOfBatch(ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTr, int howManyCases)
    • handleErrors

      private boolean handleErrors(Throwable e, ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTrace, Object[] aCasObjectList, boolean isCasObject) throws Exception
      Main routine that handles errors occuring in the processing loop.
      Parameters:
      e - - exception in the main processing loop
      aContainer - - current container of the Cas Processor
      aProcessor - - current Cas Processor
      aProcessTrace - - an object containing stats for this procesing loop
      aCasObjectList - - list of CASes being analyzed
      isCasObject - - determines type of CAS in the aCasObjectList ( CasData or CasObject)
      Returns:
      boolean
      Throws:
      Exception - -
    • invokeCasObjectCasProcessor

      private void invokeCasObjectCasProcessor(ProcessingContainer container, CasProcessor processor, Object[] aCasObjectList, ProcessTrace pTrTemp, boolean isCasObject) throws Exception
      Parameters:
      container -
      processor -
      aCasObjectList -
      pTrTemp -
      isCasObject -
      Throws:
      Exception - -
    • convertCasDataToCasObject

      private void convertCasDataToCasObject(int casIndex, String aContainerName, Object[] aCasObjectList) throws Exception
      Parameters:
      casIndex -
      aContainerName -
      aCasObjectList -
      Throws:
      Exception - -
    • invokeCasDataCasProcessor

      private void invokeCasDataCasProcessor(ProcessingContainer container, CasProcessor processor, Object[] aCasObjectList, ProcessTrace pTrTemp, boolean isCasObject, boolean retry) throws Exception
      Parameters:
      container -
      processor -
      aCasObjectList -
      pTrTemp -
      isCasObject -
      retry -
      Throws:
      Exception - -
    • logCPM

      private void logCPM(Level level, String msgBundleId, Object[] args)
    • maybeLogFinest

      private void maybeLogFinest(String msgBundleId)
    • logFinest

      private void logFinest(String msgBundleId)
    • maybeLogFinest

      private void maybeLogFinest(String msgBundleId, String arg1)
    • logFinest

      private void logFinest(String msgBundleId, String arg1)
    • maybeLogFinest

      private void maybeLogFinest(String msgBundleId, String arg1, String arg2)
    • logFinest

      private void logFinest(String msgBundleId, String arg1, String arg2)
    • logFinest

      private void logFinest(String msgBundleId, String arg1, String arg2, String arg3)
    • maybeLogFinest

      private void maybeLogFinest(String msgBundleId, ProcessingContainer container, CasProcessor processor)
    • logFinest

      private void logFinest(String msgBundleId, ProcessingContainer container, CasProcessor processor)
    • maybeLogFinest

      private void maybeLogFinest(String msgBundleId, ProcessingContainer container)
    • maybeLogFinest

      private void maybeLogFinest(String msgBundleId, CasProcessor processor)
    • maybeLogFinest

      private void maybeLogFinest(String msgBundleId, ProcessingContainer container, CasProcessor processor, CAS[] casCache)
    • maybeLogFinest

      private void maybeLogFinest(String msgBundleId, CAS[] casCache)
    • maybeLogMemoryFinest

      private void maybeLogMemoryFinest()
    • logMemoryFinest

      private void logMemoryFinest()
    • logWarning

      private void logWarning(String msgBundleId)
    • maybeLogWarning

      private void maybeLogWarning(String msgBundleId, String arg1, String arg2)
    • logWarning

      private void logWarning(String msgBundleId, String arg1, String arg2)
    • maybeLogSevere

      private void maybeLogSevere(String msgBundleId)
    • maybeLogSevere

      private void maybeLogSevere(String msgBundleId, String arg1)
    • logSevere

      private void logSevere(String msgBundleId, String arg1)
    • maybeLogSevere

      private void maybeLogSevere(String msgBundleId, String arg1, String arg2)
    • logSevere

      private void logSevere(String msgBundleId, String arg1, String arg2)
    • maybeLogSevere

      private void maybeLogSevere(String msgBundleId, String arg1, String arg2, String arg3)
    • logSevere

      private void logSevere(String msgBundleId, String arg1, String arg2, String arg3)
    • maybeLogSevereException

      private void maybeLogSevereException(Throwable e)
    • maybeLogFinestWorkQueue

      private void maybeLogFinestWorkQueue(String msgBundleId, BoundedWorkQueue workQueue)