diff --git a/src/main/java/altk/comm/engine/Broadcast.java b/src/main/java/altk/comm/engine/Broadcast.java index 15ff87b..14746c6 100644 --- a/src/main/java/altk/comm/engine/Broadcast.java +++ b/src/main/java/altk/comm/engine/Broadcast.java @@ -76,6 +76,8 @@ public abstract class Broadcast private int jobsTotal; private int scheduledJobs; + private Integer serviceActivityCount; + public static enum BroadcastState { ACCEPTED, @@ -238,136 +240,106 @@ public abstract class Broadcast myLogger.info("Thread starting..."); for (;;) { - if (serviceThreadsShouldStop()) - { - closeServiceProvider(serviceProviderPeer); - myLogger.info("Thread terminating"); - return; - } - - synchronized (resumeFlag) + if (serviceThreadsShouldStop()) + { + // Exit thread + myLogger.info("Thread terminating"); + System.out.println(getName() + " terminating"); + closeServiceProvider(serviceProviderPeer); + return; + } + if (serviceThreadsShouldPause()) { - if (threadsShouldPause()) + synchronized (resumeFlag) { try { + myLogger.debug("Paused"); resumeFlag.wait(); + myLogger.debug("Pause ended"); } catch (InterruptedException e) { myLogger.warn("Dispatcher thread interrupted while waiting to resume"); - return; } } } - List batch = null; - - /** - * Includes allocation from capacity. Only returns when the required allocation - * is obtained. Example, RTP port allocation, limit due to total number of allowable calls. - */ - ServicePrerequisites prerequisites = null; - synchronized(readyQueue) - { - // get a batch of jobs - Job job = readyQueue.peek(); - - if (job == null) - try - { - readyQueue.wait(); - continue; - } - catch (InterruptedException e) - { - return; - } - - myLogger.debug("Woke up from wait"); - // Check if expired - if (System.currentTimeMillis() >= expireTime) - { - setState(BroadcastState.EXPIRED); - continue; - } - /** - * Includes allocation from capacity. Only returns when the required allocation - * is obtained. Example, RTP port allocation, limit due to total number of allowable calls. - */ - prerequisites = secureServicePrerequisites(); - - if (serviceThreadsShouldStop() || threadsShouldPause()) - { - returnPrerequisites(prerequisites); - continue; - } - - // Check again if expired - if (System.currentTimeMillis() >= expireTime) - { - returnPrerequisites(prerequisites); - setState(BroadcastState.EXPIRED); - continue; - } - - - // Now that we can go ahead with this job, let us remove this from queue - readyQueue.poll(); - - batch = new ArrayList(); - batch.add(job); - + // Get a batch of jobs, if available + myLogger.debug("Looking for jobs"); + List batch = new ArrayList(); + synchronized(readyQueue) + { // We we are to get a batch of more than one, let us fill in the rest. - for (int i = 1; i < getJobBatchSize(); i++) + for (int i = 0; i < getJobBatchSize(); i++) { - job = readyQueue.poll(); + Job job = readyQueue.poll(); if (job == null) break; batch.add(job); } } - if (batch == null || batch.size()== 0) - { - // Exit thread - myLogger.info("Thread terminating"); - return; - } - // Process jobs. - // Mark start time - long now = System.currentTimeMillis(); - for (Job job : batch) - { - job.startTime = now; - } - - // Service the jobs - try - { - processJobs(batch, serviceProviderPeer, prerequisites); - } - catch (EngineException e) + if (batch.size()== 0) { - terminate(BroadcastState.ABORTED, e.getMessage()); + // wait for jobs + synchronized(readyQueue) + { + try + { + myLogger.debug("Waiting for jobs"); + readyQueue.wait(); + } + catch (Exception e) + { + // Do nothing + } + } } - catch (Throwable t) + else { - // This is unexpected. Log stack trace - myLogger.error("Caught unexpected Throwable", t); - terminate(BroadcastState.ABORTED, t + ": " + t.getMessage()); - } - - if (sleepBetweenJobs > 0) - { - try - { - Thread.sleep(sleepBetweenJobs); - } - catch (InterruptedException e1) - { - // Do nothing? - } + // Process jobs. + // Mark start time + long now = System.currentTimeMillis(); + for (Job job : batch) + { + job.startTime = now; + } + + // Service the jobs + // But first get dependent resource + // which includes allocation from capacity. Only returns when the required allocation + // is obtained. Example, RTP port allocation, limit due to total number of allowable calls. + ServicePrerequisites prerequisites = secureServicePrerequisites(); + + try + { + updateServiceActivityCount(1); + processJobs(batch, serviceProviderPeer, prerequisites); + updateServiceActivityCount(-1); + } + catch (EngineException e) + { + terminate(BroadcastState.ABORTED, e.getMessage()); + } + catch (Throwable t) + { + // This is unexpected. Log stack trace + myLogger.error("Caught unexpected Throwable", t); + terminate(BroadcastState.ABORTED, t + ": " + t.getMessage()); + } + + if (sleepBetweenJobs > 0) + { + try + { + Thread.sleep(sleepBetweenJobs); + } + catch (InterruptedException e1) + { + // Do nothing? + } + } } - } + } } } @@ -394,6 +366,7 @@ public abstract class Broadcast resumeFlag = new Object(); receiveTime = System.currentTimeMillis(); + serviceActivityCount = Integer.valueOf(0); } public int pendingJobs() @@ -573,7 +546,7 @@ public abstract class Broadcast /** * Makes a state transition to the given newState if the transition from - * the current state is legal. + * the current state is legal. Also posts back a state change notification. * @param newState * @return StateChangeResult */ @@ -607,7 +580,7 @@ public abstract class Broadcast { //synchronized(postback.postQueue) { - postback.queueReport(mkStatusReport(newState)); + postback.queueReportFirst(mkStatusReport(newState)); state = newState; } } @@ -924,25 +897,23 @@ public abstract class Broadcast switch (result.stateChangeStatus) { case SUCCESS: - responseContent = "OK"; + responseContent = "Broadcast canceled"; break; case NO_CHANGE: - responseContent = "Not canceled: Already cancelled"; + responseContent = "Already canceled"; break; case FORBIDDEN: responseContent = "Not canceled: Not allowed to cancel a broadcast in " + result.currentState + " state"; } out.write(responseContent); - synchronized(resumeFlag) - { - resumeFlag.notifyAll(); - } + wakeUpServiceThreads(); } protected void pause() { // Sets state to PAUSING, which is monitored by Broadcast.Service threads. + // EVentually, when all service activity ends, the state transitions to PAUSED setState(BroadcastState.PAUSING); } @@ -950,7 +921,7 @@ public abstract class Broadcast { synchronized (resumeFlag) { - if (threadsShouldPause()) + if (serviceThreadsShouldPause()) { setState(BroadcastState.RUNNING); resumeFlag.notifyAll(); @@ -984,26 +955,30 @@ public abstract class Broadcast public void doBroadcast() throws BroadcastException { changeStateTime = System.currentTimeMillis(); - setState(BroadcastState.RUNNING); - - // Start the dispatcher threads - for (Service thread : serviceThreadPool) - { - thread.start(); - } - - for (Service thread : serviceThreadPool) - { - try - { - thread.join(); - } - catch (InterruptedException e) - { - myLogger.error("Caught exception while waiting for a Service thread to terminate:" + e); - } - } - setState(BroadcastState.COMPLETED); + if (!serviceThreadsShouldStop()) + { + setState(BroadcastState.RUNNING); + + // Start the dispatcher threads + for (Service thread : serviceThreadPool) + { + thread.start(); + } + + // Wait for them to finish + for (Service thread : serviceThreadPool) + { + try + { + thread.join(); + } + catch (InterruptedException e) + { + myLogger.error("Caught exception while waiting for a Service thread to terminate:" + e); + } + } + setState(BroadcastState.COMPLETED); + } destroyResources(); postback.wrapup(); postback = null; @@ -1043,21 +1018,55 @@ public abstract class Broadcast // Do nothing in base class } - + /** + * Examines if service threads should stop running, or even start + * + * @return + */ private boolean serviceThreadsShouldStop() { - return state == BroadcastState.CANCELING || - state == BroadcastState.CANCELED || state.isFinal - || pendingJobs() == 0; + if (System.currentTimeMillis() >= expireTime) + { + setState(BroadcastState.EXPIRED); + wakeUpServiceThreads(); + return true; + } + if (state == BroadcastState.CANCELING || + state == BroadcastState.EXPIRED || + state == BroadcastState.CANCELED || state.isFinal) + { + return true; + } + if (pendingJobs() == 0) + { + wakeUpServiceThreads(); + return true; + } + return false; + } + + private void wakeUpServiceThreads() + { + synchronized (readyQueue) + { + readyQueue.notifyAll(); + } + synchronized (resumeFlag) + { + resumeFlag.notifyAll(); + } } - private boolean threadsShouldPause() + private boolean serviceThreadsShouldPause() { return state == BroadcastState.PAUSED || state == BroadcastState.PAUSING; } /** * job status is reported back to this broadcast, via the logAndQueueForPostBack method. + * This method should use the updateServiceActivityCount(+-1) method to allow Broadcast + * to keep track of overall service progress. The serviceActvityCount is used to determine + * if all service threads are idle or terminated. * @param batch * @param prerequisites */ @@ -1073,6 +1082,19 @@ public abstract class Broadcast { return 1; } + + protected void updateServiceActivityCount(int increment) + { + synchronized (serviceActivityCount) + { + serviceActivityCount += increment; + if (increment < 0 && serviceActivityCount <= 0) + { + if (state == BroadcastState.PAUSING) setState(BroadcastState.PAUSED); + if (state == BroadcastState.CANCELING) setState(BroadcastState.CANCELED); + } + } + } /** * Sets jobStatus in job, and post job report. @@ -1096,8 +1118,7 @@ public abstract class Broadcast */ protected void postJobStatus(Job job, long rescheduleTimeMS) { - logJobCount("Entering postJobStatus"); - myLogger.debug(job.toString() + ": rescheduleTimeMS " + rescheduleTimeMS); + //logJobCount("Entering postJobStatus"); if (postback != null) { JobReport report = mkJobReport(); @@ -1115,12 +1136,12 @@ public abstract class Broadcast { // No more rescheduling on cancel, expire, or pause completedJobCount++; - logJobCount("Completed a job"); + //logJobCount("Completed a job"); } else if (rescheduleTimeMS == 0) { addJob(job); - logJobCount("Added a job to queue"); + //logJobCount("Added a job to queue"); } else if (rescheduleTimeMS > 0) { diff --git a/src/main/java/altk/comm/engine/CommEngine.java b/src/main/java/altk/comm/engine/CommEngine.java index da9b11f..5526590 100644 --- a/src/main/java/altk/comm/engine/CommEngine.java +++ b/src/main/java/altk/comm/engine/CommEngine.java @@ -227,7 +227,7 @@ public abstract class CommEngine extends HttpServlet { for (String id : broadcasts.keySet()) { - if (broadcasts.get(id).changeStateTime - now > deadBroadcastViewingMinutes * 60 * 1000) + if (now - broadcasts.get(id).changeStateTime > deadBroadcastViewingMinutes * 60 * 1000) { Broadcast broadcast = broadcasts.get(id); completedJobCount += broadcast.getCompletedJobCount(); @@ -333,7 +333,7 @@ public abstract class CommEngine extends HttpServlet return; } broadcast.pause(); - out.print("OK"); + out.write("OK"); } @@ -348,7 +348,7 @@ public abstract class CommEngine extends HttpServlet return; } broadcast.resume(); - out.print("OK"); + out.write("OK"); } /** diff --git a/src/main/java/altk/comm/engine/Postback.java b/src/main/java/altk/comm/engine/Postback.java index 3fca9e2..908cee7 100644 --- a/src/main/java/altk/comm/engine/Postback.java +++ b/src/main/java/altk/comm/engine/Postback.java @@ -67,7 +67,7 @@ public class Postback private final String postBackURL; private final String xmlTopElement; - final Queue postQueue; + final LinkedList postQueue; private final int maxQueueSize; private List senderPool; private final String myName; @@ -366,6 +366,21 @@ public class Postback postedTransactions += size; } + /** + * Puts report at the head of post queue, disregarding size limit. + * This is suitable for posting broadcast status ahead of + * a possibly long line of transaction status. + * @param report + * @return + */ + public void queueReportFirst(String report) + { + synchronized(postQueue) + { + postQueue.offerFirst(report); + postQueue.notify(); + } + } /** * Queues report to postQueue only if the queue size has not reached the * maxQueueSize. @@ -374,9 +389,6 @@ public class Postback */ public boolean queueReport(String report) { - // Log for recovery in case of problem in posting report. - CommonLogger.activity.info("Attempting to queue report"); - synchronized(postQueue) { for (;;) @@ -385,8 +397,7 @@ public class Postback { myLogger.debug("Queueing report" + report); postQueue.add(report); - myLogger.debug("Added 1 report - postQueue size: " + postQueue.size()); - postQueue.notifyAll(); + postQueue.notify(); return true; } else