diff --git a/pom.xml b/pom.xml index ce0cacb..6fc2083 100644 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ scm:git:https://git.link2tek.net/ymlam/CommEngine.git - CommEngine-1.0.6 + HEAD diff --git a/src/main/java/altk/comm/engine/Broadcast.java b/src/main/java/altk/comm/engine/Broadcast.java index 4e861e3..2df4e9d 100644 --- a/src/main/java/altk/comm/engine/Broadcast.java +++ b/src/main/java/altk/comm/engine/Broadcast.java @@ -69,19 +69,23 @@ public abstract class Broadcast protected long sleepBetweenJobs; protected static Logger myLogger = Logger.getLogger(Broadcast.class); - - private Queue readyQueue; protected List serviceThreadPool; private Object resumeFlag; // Semaphore for dispatcher threads to resume. protected List recipientList; private ScheduledExecutorService scheduler; private int jobsTotal; + + /** Queue of jobs ready to be servivced. Semaphore for next 3 fields */ + private Queue readyQueue; + /** Count of items in scheduler */ private int scheduledJobs; - private Integer transactions; - /** Instantaneous number of jobs being serviced by service provider */ private int serviceActivityCount; + + + private Integer transactions; + /** Running count of successful jobs */ private int successCount; @@ -281,16 +285,8 @@ public abstract class Broadcast public void run() { myLogger.info("Thread starting..."); - for (;;) + while (! serviceThreadsShouldStop()) { - if (serviceThreadsShouldStop()) - { - // Exit thread - myLogger.info("Thread terminating"); - System.out.println(getName() + " terminating"); - closeServiceProvider(serviceProviderPeer); - return; - } if (serviceThreadsShouldPause()) { synchronized (resumeFlag) @@ -306,16 +302,7 @@ public abstract class Broadcast myLogger.warn("Dispatcher thread interrupted while waiting to resume"); } } - if (serviceThreadsShouldStop()) - { - // Exit thread - myLogger.info("Thread terminating"); - System.out.println(getName() + " terminating"); - closeServiceProvider(serviceProviderPeer); - return; - } } - // Get a batch of jobs, if available myLogger.debug("Looking for jobs"); List batch = new ArrayList(); @@ -394,6 +381,10 @@ public abstract class Broadcast } } } + // Exit thread + myLogger.info("Thread terminating"); + System.out.println(getName() + " terminating"); + closeServiceProvider(serviceProviderPeer); } } @@ -535,15 +526,16 @@ public abstract class Broadcast // initialization. try { - initAsync(); - + jobsTotal = recipientList.size(); postback = new Postback(this, getPostbackMaxQueueSize(), getPostbackSenderPoolSize(), getPostbackMaxBatchSize()); - // Create service thread pool to dispatch jobs, + initAsync(); + + // Create service thread pool to dispatch jobs, // at the same time, setting up a list of service thread names // for use by derived class to set up contexts in which // these threads run. @@ -568,9 +560,20 @@ public abstract class Broadcast } catch (Exception e) { + setState(BroadcastState.ABORTED, BroadcastError.UNEXPECTED_EXCEPTION.toString(), e.getMessage()); CommonLogger.alarm.error("Broadcast aborted: " + e.getMessage()); myLogger.error("Broadcast aborted", e); } + finally + { + destroyResources(); + if (postback != null) + { + postback.wrapup(); + postback = null; + } + CommonLogger.activity.info("Broadcast " + getId() + " terminated"); + } } protected int getPostbackMaxQueueSize() @@ -791,16 +794,6 @@ public abstract class Broadcast return responseXML.toString(); } - /** - * If finalState is final, then this state is set, and dispatcher threads are stopped. - * Overriding implementation may release all other resources, like timers. - * @param finalState - */ - public void terminate(BroadcastState finalState) - { - terminate(finalState, null); - } - /** * If finalState is final, then this state is set, and dispatcher threads are stopped. * Overriding implementation may release all other resources, like timers. @@ -952,7 +945,7 @@ public abstract class Broadcast * Overriding implementation performs time consuming initialization, after returning * POST http status indicating accepting broadcast for processing. * - * @throws BroadcastException + * @throws BroadcastException - to abort broadcast */ protected void initAsync() throws BroadcastException { @@ -1060,10 +1053,6 @@ public abstract class Broadcast } waitForEndOfService(); } - destroyResources(); - postback.wrapup(); - postback = null; - CommonLogger.activity.info("Broadcast " + getId() + " terminated"); } /** @@ -1128,7 +1117,7 @@ public abstract class Broadcast } if (getRemainingJobCount() == 0) { - wakeUpServiceThreads(); + wakeUpServiceThreads(); return true; } return false; @@ -1183,6 +1172,9 @@ public abstract class Broadcast { if (state == BroadcastState.RUNNING) { + // TODO: investigate possibility that 0 remainingJobCount may + // not be final. It may still change because a finishing job + // may cause a job to be scheduled. if (getRemainingJobCount() == 0) { setState(BroadcastState.COMPLETED); } @@ -1191,7 +1183,7 @@ public abstract class Broadcast } } - /** + /** * Sets jobStatus in job, and post job report. * If jobStatus is final, and no rescheduling, * then decrement number of remainingJobs,and increment completedJobCount. @@ -1314,8 +1306,8 @@ public abstract class Broadcast Runnable r = new Runnable() { public void run() { synchronized(readyQueue) { scheduledJobs--; + addJob(job); } - addJob(job); } }; return scheduler.schedule(r, rescheduleTimeMS, TimeUnit.MILLISECONDS); diff --git a/src/main/java/altk/comm/engine/CommEngine.java b/src/main/java/altk/comm/engine/CommEngine.java index 05a87d9..89a2147 100644 --- a/src/main/java/altk/comm/engine/CommEngine.java +++ b/src/main/java/altk/comm/engine/CommEngine.java @@ -157,9 +157,6 @@ public abstract class CommEngine extends HttpServlet myLogger = Logger.getLogger(CommEngine.class); myLogger.info("init() invoked"); -// File propertiesFile = new File( -// getClass().getClassLoader().getResource("properties").getFile() -// ); CommonLogger.startup.info("Using lo4j properites file " + log4j_properties); CommonLogger.startup.info("Using configuration file " + propertiesFile.getAbsolutePath()); diff --git a/src/main/java/altk/comm/engine/exception/BroadcastError.java b/src/main/java/altk/comm/engine/exception/BroadcastError.java index 5838d15..fe96e0f 100644 --- a/src/main/java/altk/comm/engine/exception/BroadcastError.java +++ b/src/main/java/altk/comm/engine/exception/BroadcastError.java @@ -8,6 +8,7 @@ public enum BroadcastError INTERNAL_ERROR, CONFIGURAION_ERROR, SERVICE_PROVIDER_ERROR, - PLATFORM_ERROR - + PLATFORM_ERROR, + TTS_ERROR, + UNEXPECTED_EXCEPTION }