diff --git a/src/main/java/altk/comm/engine/Broadcast.java b/src/main/java/altk/comm/engine/Broadcast.java index 6453593..8085cdf 100644 --- a/src/main/java/altk/comm/engine/Broadcast.java +++ b/src/main/java/altk/comm/engine/Broadcast.java @@ -386,11 +386,6 @@ public abstract class Broadcast myLogger.error("Caught unexpected Throwable", t); terminate(BroadcastState.ABORTED, t + ": " + t.getMessage()); } - finally - { - updateServiceActivityCount(-batch.size()); - if (serviceActivityCount == 0) postback.wakeOneUp(); - } if (sleepBetweenJobs > 0) { try @@ -1287,8 +1282,8 @@ public abstract class Broadcast report.initBase(job, broadcastId, launchRecordId, activityRecordIdParamName, jobReportRootNodeName); report.init(job); postback.queueReport(report.toString()); + updateServiceActivityCount(-1); } - } /** @@ -1352,6 +1347,8 @@ public abstract class Broadcast public ScheduledFuture rescheduleJob(final Job job, long rescheduleTimeMS) { + if (rescheduleTimeMS < 0) return null; + // No more rescheduling on cancel, abort, expired, or alldone if (state == BroadcastState.CANCELING || state == BroadcastState.CANCELED @@ -1424,10 +1421,8 @@ public abstract class Broadcast public PostbackThreadActionOnEmpty getPostbackThreadActionOnEmpty() { myLogger.debug("getPostbackThreadActionOnEmpty(): broadcast state " + state); if (state.isFinal) return PostbackThreadActionOnEmpty.STOP; - int activeJobCount = getActiveJobCount(); - myLogger.debug("getPostbackThreadActionOnEmpty(): activeJobCount = " + activeJobCount); - if (activeJobCount > 0) { + if (serviceActivityCount > 0) { return PostbackThreadActionOnEmpty.WAIT; } @@ -1443,7 +1438,7 @@ public abstract class Broadcast return setState(BroadcastState.ABORTED, reason, stateErrorText).stateChangeStatus == StateChangeStatus.SUCCESS? PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.STOP; } - else if (state == BroadcastState.COMPLETED) { + else if (state == BroadcastState.COMPLETED && scheduledJobs == 0) { return setState(BroadcastState.ALLDONE).stateChangeStatus == StateChangeStatus.SUCCESS? PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.STOP; }