Class NonThreadedProcessingUnit
java.lang.Object
org.apache.uima.collection.impl.cpm.engine.NonThreadedProcessingUnit
-
Field Summary
FieldsModifier and TypeFieldDescriptionprotected Object[]
private CAS[]
protected CAS[]
protected CPECasPool
protected CAS
protected CAS[]
protected CpeConfiguration
protected CPMEngine
protected CasConverter
protected boolean
protected long
protected BoundedWorkQueue
protected LinkedList
protected ProcessTrace
protected boolean
protected ArrayList
protected String
int
protected UimaTimer
protected BoundedWorkQueue
-
Constructor Summary
ConstructorsConstructorDescriptionNonThreadedProcessingUnit
(CPMEngine acpm, BoundedWorkQueue aInputQueue, BoundedWorkQueue aOutputQueue) Initialize the PU -
Method Summary
Modifier and TypeMethodDescriptionvoid
Plugs in Listener object used for notifications.protected boolean
analyze
(Object[] aCasObjectList, ProcessTrace pTrTemp) void
cleanup()
Null out fields of this object.private void
private boolean
containerDisabled
(ProcessingContainer aContainer) private void
convertCasDataToCasObject
(int casIndex, String aContainerName, Object[] aCasObjectList) void
disableCasProcessor
(int aCasProcessorIndex) Disable a CASProcessor in the processing pipeline.void
disableCasProcessor
(String aCasProcessorName) Alternative method to disable Cas Processor.private void
doEndOfBatch
(ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTr, int howManyCases) protected void
doNotifyListeners
(Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus) Notifies all configured listeners.private void
doReleaseCasProcessor
(ProcessingContainer aContainer, CasProcessor aCasProcessor) void
enableCasProcessor
(String aCasProcessorName) Enables Cas Processor with a given name.private boolean
filterOutTheCAS
(ProcessingContainer aContainer, boolean isCasObject, Object[] aCasObjectList) protected long
Returns the size of the CAS object.Returns list of listeners used by this PU for callbacks.private boolean
handleErrors
(Throwable e, ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTrace, Object[] aCasObjectList, boolean isCasObject) Main routine that handles errors occuring in the processing loop.private void
handleServiceException
(ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTr, Exception ex) private void
handleSkipCasProcessor
(ProcessingContainer aContainer, Object[] aCasObjectList, boolean isLastCP) private void
invokeCasDataCasProcessor
(ProcessingContainer container, CasProcessor processor, Object[] aCasObjectList, ProcessTrace pTrTemp, boolean isCasObject, boolean retry) private void
invokeCasObjectCasProcessor
(ProcessingContainer container, CasProcessor processor, Object[] aCasObjectList, ProcessTrace pTrTemp, boolean isCasObject) protected boolean
isProcessorReady
(int aStatus) Check if the CASProcessor status is available for processingprotected void
notifyListeners
(Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus) Notifies Listeners of the fact that the pipeline has finished processing the current set Cas'esprivate boolean
pauseContainer
(ProcessingContainer aContainer, Exception aException, String aThreadId) Determines if the thread should be paused.private void
postAnalysis
(Object[] aCasObjectList, boolean isCasObject, Object[] casObjects, ProcessTrace aProcessTr, boolean doneAlready) private void
releaseCases
(Object aCasList, boolean lastProcessor, String aName) Conditionally, releases CASes back to the CAS pool.void
Removes given listener from the list of listenersvoid
setCasPool
(CPECasPool aPool) void
setContainers
(LinkedList processorList) Plugs in a list of Cas Processor containers.void
setCPMEngine
(CPMEngine acpm) Alternative method of providing the reference to the component managing the lifecycle of the CPEvoid
setInputQueue
(BoundedWorkQueue aInputQueue) Alternative method of providing a queue from which this PU will read bundle of Casvoid
setNotifyListeners
(boolean aDoNotify) Set a flag indicating if notifications should be made via configured Listenersvoid
setOutputQueue
(BoundedWorkQueue aOutputQueue) Alternative method of providing a queue where this PU will deposit results of analysisvoid
setProcessingUnitProcessTrace
(ProcessTrace aProcessingUnitProcessTrace) Plugs in ProcessTrace object used to collect statisticsvoid
setReleaseCASFlag
(boolean aFlag) void
setUimaTimer
(UimaTimer aTimer) Plugs in custom timer used by the PU for getting timevoid
stopCasProcessors
(boolean kill) Stops all Cas Processors that are part of this PU.
-
Field Details
-
threadState
public int threadState -
casPool
-
relaseCAS
protected boolean relaseCAS -
cpm
-
workQueue
-
outputQueue
-
mConverter
-
processingUnitProcessTrace
-
processContainers
-
numToProcess
protected long numToProcess -
casList
-
statusCbL
-
notifyListeners
protected boolean notifyListeners -
conversionCas
-
artifact
-
conversionCasArray
-
timer
-
threadId
-
cpeConfiguration
-
casCache
-
-
Constructor Details
-
NonThreadedProcessingUnit
public NonThreadedProcessingUnit(CPMEngine acpm, BoundedWorkQueue aInputQueue, BoundedWorkQueue aOutputQueue) Initialize the PU- Parameters:
acpm
- - component managing life cycle of the CPEaInputQueue
- - queue to read fromaOutputQueue
- - queue to write to
-
NonThreadedProcessingUnit
-
-
Method Details
-
setInputQueue
Alternative method of providing a queue from which this PU will read bundle of Cas- Parameters:
aInputQueue
- - read queue
-
setOutputQueue
Alternative method of providing a queue where this PU will deposit results of analysis- Parameters:
aOutputQueue
- - queue to write to
-
setCPMEngine
Alternative method of providing the reference to the component managing the lifecycle of the CPE- Parameters:
acpm
- - reference to the contrlling engine
-
cleanup
public void cleanup()Null out fields of this object. Call this only when this object is no longer needed. -
setNotifyListeners
public void setNotifyListeners(boolean aDoNotify) Set a flag indicating if notifications should be made via configured Listeners- Parameters:
aDoNotify
- - true if notification is required, false otherwise
-
addStatusCallbackListener
Plugs in Listener object used for notifications.- Parameters:
aListener
- -BaseStatusCallbackListener
instance
-
getCallbackListeners
Returns list of listeners used by this PU for callbacks.- Returns:
- - lif of
BaseStatusCallbackListener
instances
-
removeStatusCallbackListener
Removes given listener from the list of listeners- Parameters:
aListener
- - object to remove from the list
-
setProcessingUnitProcessTrace
Plugs in ProcessTrace object used to collect statistics- Parameters:
aProcessingUnitProcessTrace
- - object to compile stats
-
setUimaTimer
Plugs in custom timer used by the PU for getting time- Parameters:
aTimer
- - custom timer to use
-
setContainers
Plugs in a list of Cas Processor containers. During processing Cas Processors in this list are called sequentially. Each Cas Processor is contained in the container that is managing errors, counts and totals, and restarts.- Parameters:
processorList
- CASProcessor to be added to the processing pipeline
-
disableCasProcessor
public void disableCasProcessor(int aCasProcessorIndex) Disable a CASProcessor in the processing pipeline. Locate it by provided index. The disabled Cas Processor remains in the Processing Pipeline, however it is not used furing processing.- Parameters:
aCasProcessorIndex
- - location in the pipeline of the Cas Processor to delete
-
disableCasProcessor
Alternative method to disable Cas Processor. Uses a name to locate it.- Parameters:
aCasProcessorName
- - a name of the Cas Processor to disable
-
enableCasProcessor
Enables Cas Processor with a given name. Enabled Cas Processor will immediately begin to receive bundles of Cas.- Parameters:
aCasProcessorName
- - name of the Cas Processor to enable
-
analyze
- Throws:
Exception
-
setReleaseCASFlag
public void setReleaseCASFlag(boolean aFlag) - Parameters:
aFlag
-
-
setCasPool
- Parameters:
aPool
-
-
postAnalysis
private void postAnalysis(Object[] aCasObjectList, boolean isCasObject, Object[] casObjects, ProcessTrace aProcessTr, boolean doneAlready) throws Exception - Parameters:
aCasObjectList
-isCasObject
-casObjects
-aProcessTr
-doneAlready
-- Throws:
Exception
- -
-
doReleaseCasProcessor
-
doEndOfBatch
private void doEndOfBatch(ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTr, int howManyCases) -
handleErrors
private boolean handleErrors(Throwable e, ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTrace, Object[] aCasObjectList, boolean isCasObject) throws Exception Main routine that handles errors occuring in the processing loop.- Parameters:
e
- - exception in the main processing loopaContainer
- - current container of the Cas ProcessoraProcessor
- - current Cas ProcessoraProcessTrace
- - an object containing stats for this procesing loopaCasObjectList
- - list of CASes being analyzedisCasObject
- - determines type of CAS in the aCasObjectList ( CasData or CasObject)- Returns:
- boolean
- Throws:
Exception
- -
-
invokeCasObjectCasProcessor
private void invokeCasObjectCasProcessor(ProcessingContainer container, CasProcessor processor, Object[] aCasObjectList, ProcessTrace pTrTemp, boolean isCasObject) throws Exception - Parameters:
container
-processor
-aCasObjectList
-pTrTemp
-isCasObject
-- Throws:
Exception
- -
-
convertCasDataToCasObject
private void convertCasDataToCasObject(int casIndex, String aContainerName, Object[] aCasObjectList) throws Exception - Parameters:
casIndex
-aContainerName
-aCasObjectList
-- Throws:
Exception
- -
-
invokeCasDataCasProcessor
private void invokeCasDataCasProcessor(ProcessingContainer container, CasProcessor processor, Object[] aCasObjectList, ProcessTrace pTrTemp, boolean isCasObject, boolean retry) throws Exception - Parameters:
container
-processor
-aCasObjectList
-pTrTemp
-isCasObject
-retry
-- Throws:
Exception
- -
-
containerDisabled
-
isProcessorReady
protected boolean isProcessorReady(int aStatus) Check if the CASProcessor status is available for processing -
filterOutTheCAS
private boolean filterOutTheCAS(ProcessingContainer aContainer, boolean isCasObject, Object[] aCasObjectList) -
notifyListeners
protected void notifyListeners(Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus) Notifies Listeners of the fact that the pipeline has finished processing the current set Cas'es- Parameters:
aCas
- - object containing an array of OR a single instance of CasisCasObject
- - true if instance of Cas is of type Cas, false otherwiseaEntityProcStatus
- - status object that may contain exceptions and trace
-
doNotifyListeners
protected void doNotifyListeners(Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus) Notifies all configured listeners. Makes sure that appropriate type of Cas is sent to the listener. Convertions take place to ensure compatibility.- Parameters:
aCas
- - Cas to pass to listenerisCasObject
- - true is Cas is of type CASaEntityProcStatus
- - status object containing exceptions and trace info
-
clearCasCache
private void clearCasCache() -
pauseContainer
private boolean pauseContainer(ProcessingContainer aContainer, Exception aException, String aThreadId) Determines if the thread should be paused. Pausing container effectively pauses ALL Cas Processors that are managed by the container. The pause is needed when there are multiple pipelines shareing a common service. If this service dies (Socket Down), only one thread should initiate service restart. While the service is being restarted no invocations on the service should be done. Containers will be resumed on successfull service restart.- Parameters:
aContainer
- - a container that manages the current Cas Processor.aThreadId
- - id of the current threadaProcessor
- - a Cas Processor to be disabled- Throws:
Exception
- - exception
-
handleServiceException
private void handleServiceException(ProcessingContainer aContainer, CasProcessor aProcessor, ProcessTrace aProcessTr, Exception ex) throws Exception - Parameters:
aContainer
-aProcessor
-aProcessTr
-ex
-- Throws:
Exception
- -
-
handleSkipCasProcessor
private void handleSkipCasProcessor(ProcessingContainer aContainer, Object[] aCasObjectList, boolean isLastCP) throws Exception - Parameters:
aContainer
-aCasObjectList
-isLastCP
-- Throws:
Exception
- -
-
getBytes
Returns the size of the CAS object. Currently only CASData is supported.- Parameters:
aCas
- - Cas to get the size for- Returns:
- the size of the CAS object. Currently only CASData is supported.
-
releaseCases
Conditionally, releases CASes back to the CAS pool. The release only occurs if the Cas Processor is the last in the processing chain.- Parameters:
aCasList
- - list of CASes to releaselastProcessor
- - determines if the release takes placeaContainer
- - current container
-
stopCasProcessors
public void stopCasProcessors(boolean kill) Stops all Cas Processors that are part of this PU.- Parameters:
kill
- - true if CPE has been stopped before finishing processing during external stop
-