@@ -386,11 +386,6 @@ public abstract class Broadcast
myLogger.error("Caught unexpected Throwable", t);
myLogger.error("Caught unexpected Throwable", t);
terminate(BroadcastState.ABORTED, t + ": " + t.getMessage());
terminate(BroadcastState.ABORTED, t + ": " + t.getMessage());
}
}
finally
{
updateServiceActivityCount(-batch.size());
if (serviceActivityCount == 0) postback.wakeOneUp();
}
if (sleepBetweenJobs > 0)
if (sleepBetweenJobs > 0)
{
{
try
try
@@ -1287,8 +1282,8 @@ public abstract class Broadcast
report.initBase(job, broadcastId, launchRecordId, activityRecordIdParamName, jobReportRootNodeName);
report.initBase(job, broadcastId, launchRecordId, activityRecordIdParamName, jobReportRootNodeName);
report.init(job);
report.init(job);
postback.queueReport(report.toString());
postback.queueReport(report.toString());
updateServiceActivityCount(-1);
}
}
}
}
/**
/**
@@ -1352,6 +1347,8 @@ public abstract class Broadcast
public ScheduledFuture<?> rescheduleJob(final Job job, long rescheduleTimeMS)
public ScheduledFuture<?> rescheduleJob(final Job job, long rescheduleTimeMS)
{
{
if (rescheduleTimeMS < 0) return null;
// No more rescheduling on cancel, abort, expired, or alldone
// No more rescheduling on cancel, abort, expired, or alldone
if (state == BroadcastState.CANCELING
if (state == BroadcastState.CANCELING
|| state == BroadcastState.CANCELED
|| state == BroadcastState.CANCELED
@@ -1424,10 +1421,8 @@ public abstract class Broadcast
public PostbackThreadActionOnEmpty getPostbackThreadActionOnEmpty() {
public PostbackThreadActionOnEmpty getPostbackThreadActionOnEmpty() {
myLogger.debug("getPostbackThreadActionOnEmpty(): broadcast state " + state);
myLogger.debug("getPostbackThreadActionOnEmpty(): broadcast state " + state);
if (state.isFinal) return PostbackThreadActionOnEmpty.STOP;
if (state.isFinal) return PostbackThreadActionOnEmpty.STOP;
int activeJobCount = getActiveJobCount();
myLogger.debug("getPostbackThreadActionOnEmpty(): activeJobCount = " + activeJobCount);
if (activeJob Count > 0) {
if (serviceActivityCount > 0) {
return PostbackThreadActionOnEmpty.WAIT;
return PostbackThreadActionOnEmpty.WAIT;
}
}
@@ -1443,7 +1438,7 @@ public abstract class Broadcast
return setState(BroadcastState.ABORTED, reason, stateErrorText).stateChangeStatus == StateChangeStatus.SUCCESS?
return setState(BroadcastState.ABORTED, reason, stateErrorText).stateChangeStatus == StateChangeStatus.SUCCESS?
PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.STOP;
PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.STOP;
}
}
else if (state == BroadcastState.COMPLETED) {
else if (state == BroadcastState.COMPLETED && scheduledJobs == 0 ) {
return setState(BroadcastState.ALLDONE).stateChangeStatus == StateChangeStatus.SUCCESS?
return setState(BroadcastState.ALLDONE).stateChangeStatus == StateChangeStatus.SUCCESS?
PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.STOP;
PostbackThreadActionOnEmpty.CONTINUE : PostbackThreadActionOnEmpty.STOP;
}
}