From 253c78ab711b9d5375dc39b1bec2ee940992171c Mon Sep 17 00:00:00 2001 From: ymlam Date: Fri, 15 Oct 2021 10:17:37 -0400 Subject: [PATCH] Add stateSemaphore to ensure modifications to state value is synchronized. --- src/main/java/altk/comm/engine/Broadcast.java | 230 ++++++++++-------- 1 file changed, 122 insertions(+), 108 deletions(-) diff --git a/src/main/java/altk/comm/engine/Broadcast.java b/src/main/java/altk/comm/engine/Broadcast.java index f47043f..7fccefa 100644 --- a/src/main/java/altk/comm/engine/Broadcast.java +++ b/src/main/java/altk/comm/engine/Broadcast.java @@ -41,6 +41,7 @@ public abstract class Broadcast private String broadcastId; private BroadcastState state = BroadcastState.ACCEPTED; + private Object stateSemaphore = new Object(); String haltReason; String stateErrorText; @@ -335,62 +336,62 @@ public abstract class Broadcast { myLogger.debug("Waiting for jobs"); readyQueue.wait(); + // go back to look for jobs + continue; } catch (Exception e) { - // Do nothing + // go back to look for jobs + continue; } } + updateServiceActivityCount(batch.size()); } - if (batch.size() > 0) + // Process jobs. + // Mark start time + long now = System.currentTimeMillis(); + for (Job job : batch) { - // Process jobs. - // Mark start time - long now = System.currentTimeMillis(); - for (Job job : batch) - { - job.startTime = now; - } - - // Service the jobs - // But first get dependent resource - // which includes allocation from capacity. Only returns when the required allocation - // is obtained. Example, RTP port allocation, limit due to total number of allowable calls. - ServicePrerequisites prerequisites = secureServicePrerequisites(); - - try - { - updateServiceActivityCount(1); - int transactions = processJobs(batch, serviceProviderPeer, prerequisites); - incrementTransactions(transactions); - } - catch (EngineException e) - { - // Aborting - setState(BroadcastState.ABORTING, e.errorCodeText, e.errorText); - } - catch (Throwable t) - { - // This is unexpected. Log stack trace - myLogger.error("Caught unexpected Throwable", t); - terminate(BroadcastState.ABORTED, t + ": " + t.getMessage()); - } - finally - { - updateServiceActivityCount(-1); - } - if (sleepBetweenJobs > 0) - { - try - { - Thread.sleep(sleepBetweenJobs); - } - catch (InterruptedException e1) - { - // Do nothing? - } - } + job.startTime = now; + } + + // Service the jobs + // But first get dependent resource + // which includes allocation from capacity. Only returns when the required allocation + // is obtained. Example, RTP port allocation, limit due to total number of allowable calls. + ServicePrerequisites prerequisites = secureServicePrerequisites(); + + try + { + int transactions = processJobs(batch, serviceProviderPeer, prerequisites); + incrementTransactions(transactions); + } + catch (EngineException e) + { + // Aborting + setState(BroadcastState.ABORTING, e.errorCodeText, e.errorText); + } + catch (Throwable t) + { + // This is unexpected. Log stack trace + myLogger.error("Caught unexpected Throwable", t); + terminate(BroadcastState.ABORTED, t + ": " + t.getMessage()); + } + finally + { + updateServiceActivityCount(-batch.size()); + } + if (sleepBetweenJobs > 0) + { + try + { + Thread.sleep(sleepBetweenJobs); + } + catch (InterruptedException e1) + { + // Do nothing? + } } } } @@ -429,10 +430,6 @@ public abstract class Broadcast transactions += delta; } } - public int pendingJobs() - { - return readyQueue.size() + scheduledJobs; - } /** * Experimental formulation where it takes over directing @@ -493,7 +490,7 @@ public abstract class Broadcast { readyQueue.add(mkJob(recipient)); } - if (getState() == BroadcastState.ALLDONE) return; + if (state == BroadcastState.ALLDONE) return; } catch (BroadcastException e) { @@ -641,34 +638,37 @@ public abstract class Broadcast boolean isLegal; BroadcastState prev = null; if (state == newState) return new StateChangeResult(StateChangeStatus.NO_CHANGE, state, null); - List to = toStates.get(state); - isLegal = (to == null? false : to.contains(newState)); - prev = state; - if (isLegal) + synchronized(stateSemaphore) { - this.haltReason = haltReason; - this.stateErrorText = stateErrorText; - - CommonLogger.activity.info(String.format("Broadcast %s: State transitioned from %s to %s", broadcastId, state, newState)); - state = newState; - changeStateTime = System.currentTimeMillis(); - if (state == BroadcastState.RUNNING) serviceStartTime = changeStateTime; - if (prev == BroadcastState.RUNNING) serviceEndTime = changeStateTime; - if (postback != null) - { - if (state == BroadcastState.ALLDONE) postback.queueReport(mkStatusReport()); - else postback.queueReportFirst(mkStatusReport()); - } - return new StateChangeResult(StateChangeStatus.SUCCESS, newState, prev); - } - else - { - // log illegal state transition with call trace - Exception e = new Exception(String.format("Broadast %s ignored illegal transition from %s to %s", broadcastId, prev, newState)); - myLogger.error(e.getMessage()); - myLogger.debug("This exception is not thrown -- only for debugging information", e); - - return new StateChangeResult(StateChangeStatus.FORBIDDEN, prev, null); + List to = toStates.get(state); + isLegal = (to == null? false : to.contains(newState)); + prev = state; + if (isLegal) + { + this.haltReason = haltReason; + this.stateErrorText = stateErrorText; + + CommonLogger.activity.info(String.format("Broadcast %s: State transitioned from %s to %s", broadcastId, state, newState)); + state = newState; + changeStateTime = System.currentTimeMillis(); + if (state == BroadcastState.RUNNING) serviceStartTime = changeStateTime; + if (prev == BroadcastState.RUNNING) serviceEndTime = changeStateTime; + if (postback != null) + { + if (state == BroadcastState.ALLDONE) postback.queueReport(mkStatusReport()); + else postback.queueReportFirst(mkStatusReport()); + } + return new StateChangeResult(StateChangeStatus.SUCCESS, newState, prev); + } + else + { + // log illegal state transition with call trace + Exception e = new Exception(String.format("Broadast %s ignored illegal transition from %s to %s", broadcastId, prev, newState)); + myLogger.error(e.getMessage()); + myLogger.debug("This exception is not thrown -- only for debugging information", e); + + return new StateChangeResult(StateChangeStatus.FORBIDDEN, prev, null); + } } } @@ -720,7 +720,7 @@ public abstract class Broadcast responseXML.append("\""); } responseXML.append(" accepted='"); - responseXML.append(e != null || getState() == BroadcastState.COMPLETED ? "FALSE" : "TRUE"); + responseXML.append(e != null || state == BroadcastState.COMPLETED ? "FALSE" : "TRUE"); responseXML.append("'"); if (e == null) { @@ -927,10 +927,8 @@ public abstract class Broadcast */ protected int getActiveJobCount() { - return serviceActivityCount; + return serviceActivityCount; } - - /** * Parses broadcastId and return if notInService is true. @@ -1128,7 +1126,7 @@ public abstract class Broadcast { return true; } - if (pendingJobs() == 0) + if (getRemainingJobCount() == 0) { wakeUpServiceThreads(); return true; @@ -1309,8 +1307,16 @@ public abstract class Broadcast public ScheduledFuture rescheduleJob(final Job job, long rescheduleTimeMS) { - scheduledJobs++; - Runnable r = new Runnable() { public void run() { scheduledJobs--; addJob(job);}}; + synchronized(readyQueue) { + scheduledJobs++; + } + Runnable r = new Runnable() { public void run() { + synchronized(readyQueue) { + scheduledJobs--; + } + addJob(job); + } + }; return scheduler.schedule(r, rescheduleTimeMS, TimeUnit.MILLISECONDS); } @@ -1326,7 +1332,7 @@ public abstract class Broadcast case RUNNING: case PAUSING: case PAUSED: - return pendingJobs(); + return getRemainingJobCount(); default: return 0; } @@ -1334,7 +1340,7 @@ public abstract class Broadcast public int getCompletedJobCount() { - return jobsTotal - pendingJobs() - serviceActivityCount; + return jobsTotal - getRemainingJobCount() - serviceActivityCount; } public String getBroadcastType() { @@ -1342,26 +1348,34 @@ public abstract class Broadcast } public PostbackThreadActionOnEmpty getPostbackThreadActionOnEmpty() { + myLogger.debug("getPostbackThreadActionOnEmpty(): broadcast state " + state); if (state.isFinal) return PostbackThreadActionOnEmpty.STOP; - if (getActiveJobCount() == 0) { - if (state == BroadcastState.PAUSING) { - setState(BroadcastState.PAUSED); - return PostbackThreadActionOnEmpty.WAIT; - } - if (state == BroadcastState.CANCELING) { - setState(BroadcastState.CANCELED); - return PostbackThreadActionOnEmpty.STOP; - } - else if (state == BroadcastState.ABORTING) { - setState(BroadcastState.ABORTED); - return PostbackThreadActionOnEmpty.STOP; - } - else if (state == BroadcastState.COMPLETED) { - setState(BroadcastState.ALLDONE); - return PostbackThreadActionOnEmpty.STOP; - } - } - return PostbackThreadActionOnEmpty.WAIT; + int activeJobCount = getActiveJobCount(); + myLogger.debug("getPostbackThreadActionOnEmpty(): activeJobCount = " + activeJobCount); + + if (activeJobCount > 0) { + return PostbackThreadActionOnEmpty.WAIT; + } + + if (state == BroadcastState.PAUSING) { + return setState(BroadcastState.PAUSED).stateChangeStatus == StateChangeStatus.SUCCESS? + PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.WAIT; + } + if (state == BroadcastState.CANCELING) { + return setState(BroadcastState.CANCELED).stateChangeStatus == StateChangeStatus.SUCCESS? + PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.STOP; + } + else if (state == BroadcastState.ABORTING) { + return setState(BroadcastState.ABORTED).stateChangeStatus == StateChangeStatus.SUCCESS? + PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.STOP; + } + else if (state == BroadcastState.COMPLETED) { + return setState(BroadcastState.ALLDONE).stateChangeStatus == StateChangeStatus.SUCCESS? + PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.STOP; + } + else { + return PostbackThreadActionOnEmpty.WAIT; + } } }