@@ -44,6 +44,8 @@ public abstract class Broadcast
String haltReason;
String stateErrorText;
public final long receiveTime;
public long serviceStartTime;
public long serviceEndTime;
public long changeStateTime;
protected String activityRecordIdParamName;
private String jobReportRootNodeName;
@@ -69,11 +71,11 @@ public abstract class Broadcast
protected List<Service> serviceThreadPool;
private Object resumeFlag; // Semaphore for dispatcher threads to resume.
protected List<Recipient> recipientList;
private int completedJobCount;
private ScheduledExecutorService scheduler;
private int jobsTotal;
private int scheduledJobs;
private Integer transactions;
private Integer serviceActivityCount;
@@ -88,7 +90,8 @@ public abstract class Broadcast
PURGED(true), // Final state
ABORTED(true), // final state
EXPIRED(true), // final state
COMPLETED(true); // Final state
COMPLETED,
ALLDONE(true); // Final state
final public boolean isFinal;
@@ -207,6 +210,9 @@ public abstract class Broadcast
BroadcastState.CANCELING, // User action
BroadcastState.PURGED // User action
));
toStates.put(BroadcastState.COMPLETED, Arrays.asList(
BroadcastState.ALLDONE // when all posting back is complete
));
}
public static class StateChangeResult
@@ -224,6 +230,13 @@ public abstract class Broadcast
}
}
protected enum PostbackThreadActionOnEmpty
{
CONTINUE,
STOP,
WAIT
}
protected class Service extends Thread
{
Object serviceProviderPeer;
@@ -319,7 +332,8 @@ public abstract class Broadcast
try
{
updateServiceActivityCount(1);
processJobs(batch, serviceProviderPeer, prerequisites);
int transactions = processJobs(batch, serviceProviderPeer, prerequisites);
incrementTransactions(transactions);
updateServiceActivityCount(-1);
}
catch (EngineException e)
@@ -363,7 +377,6 @@ public abstract class Broadcast
this.jobReportRootNodeName = jobReportRootNodeName;
postback = null;
completedJobCount = 0;
sleepBetweenJobs = SLEEP_BETWEEN_JOBS_DEFAULT;
readyQueue = new LinkedBlockingQueue<Job>();
serviceThreadPool = new ArrayList<Service>();
@@ -373,8 +386,16 @@ public abstract class Broadcast
receiveTime = System.currentTimeMillis();
serviceActivityCount = Integer.valueOf(0);
transactions = Integer.valueOf(0);
}
private void incrementTransactions(int delta)
{
synchronized (transactions)
{
transactions += delta;
}
}
public int pendingJobs()
{
return readyQueue.size() + scheduledJobs;
@@ -580,19 +601,14 @@ public abstract class Broadcast
this.stateErrorText = stateErrorText;
CommonLogger.activity.info(String.format("Broadcast %s: State transitioned from %s to %s", broadcastId, state, newState));
state = newState;
changeStateTime = System.currentTimeMillis();
if (state == BroadcastState.RUNNING) serviceStartTime = changeStateTime;
if (prev == BroadcastState.RUNNING) serviceEndTime = changeStateTime;
if (postback != null)
{
//synchronized(postback.postQueue)
{
postback.queueReportFirst(mkStatusReport(newState));
state = newState;
}
}
else
{
state = newState;
postback.queueReportFirst(mkStatusReport());
}
changeStateTime = System.currentTimeMillis();
return new StateChangeResult(StateChangeStatus.SUCCESS, newState, prev);
}
else
@@ -781,17 +797,6 @@ public abstract class Broadcast
* @return
*/
protected String mkStatusReport()
{
return mkStatusReport(state);
}
/**
* Creates status report. Sometimes, we need to create the report before
* actually changing BroadcastState.
* @param state - BroadcastState for this report, which is not necessarily the same as the class attribute state.
* @return status report in XML.
*/
protected String mkStatusReport(BroadcastState state)
{
StringBuffer statusBf = new StringBuffer();
String topLevelTag = broadcastType;
@@ -804,7 +809,13 @@ public abstract class Broadcast
{
statusBf.append(" launch_record_id='" + launchRecordId + "'");
}
statusBf.append(">\r\n<state>" + state + "</state><state_change_time>" + changeStateTime
statusBf.append(">\r\n");
statusBf.append("<current_time>" + System.currentTimeMillis() + "</current_time>\r\n");
if (serviceStartTime > 0) statusBf.append("<service_start_time>" + serviceStartTime
+ "</service_start_time>\r\n");
if (serviceEndTime > 0) statusBf.append("<service_end_time>" + serviceEndTime
+ "</service_end_time>\r\n");
statusBf.append("<state>" + state + "</state><state_change_time>" + changeStateTime
+ "</state_change_time>\r\n");
if (state == BroadcastState.PAUSED
|| state == BroadcastState.ABORTED)
@@ -821,6 +832,7 @@ public abstract class Broadcast
+ "</error_text>");
}
}
statusBf.append("<transactions>" + transactions + "</transactions>");
statusBf.append("<job_summary completed='" + getCompletedJobCount() +
"' ready='" + getPendingJobCount() + "'");
statusBf.append(" active='" + getActiveJobCount() + "'");
@@ -988,6 +1000,7 @@ public abstract class Broadcast
destroyResources();
postback.wrapup();
postback = null;
//setState(BroadcastState.ALLDONE);
myLogger.info("Broadcast " + getId() + " terminated");
}
@@ -1075,8 +1088,9 @@ public abstract class Broadcast
* if all service threads are idle or terminated.
* @param batch
* @param prerequisites
* @return int - number of transactions employed to service these jobs.
*/
abstract protected void processJobs(List<Job> batch, Object serviceProvider, ServicePrerequisites prerequisites)
abstract protected int processJobs(List<Job> batch, Object serviceProvider, ServicePrerequisites prerequisites)
throws EngineException;
/**
@@ -1141,7 +1155,7 @@ public abstract class Broadcast
)
{
// No more rescheduling on cancel, expire, or pause
completedJobCount++;
// completedJobCount++;
//logJobCount("Completed a job");
}
else if (rescheduleTimeMS == 0)
@@ -1162,14 +1176,44 @@ public abstract class Broadcast
*/
private void logJobCount(String title)
{
myLogger.debug(String.format("%s: completed: %d, active: %d, ready: %d, scheduled %d, total jobs: %d, remaining %d",
if (postback == null) {
myLogger.debug(title + ": postback = null");
myLogger.debug(String.format("%s: state %s, completed: %d, active: %d, ready: %d, scheduled: %d, total jobs: %d, remaining: %d, postQueue: ",
title,
state,
getCompletedJobCount(),
getActiveJobCount(),
readyQueue.size(),
scheduledJobs,
jobsTotal,
getRemainingJobCount()
));
return;
}
if (postback.postQueue == null) {
myLogger.debug(title + ": postback.postQueue = null");
myLogger.debug(String.format("%s: state %s, completed: %d, active: %d, ready: %d, scheduled: %d, total jobs: %d, remaining: %d, postQueue: ",
title,
state,
getCompletedJobCount(),
getActiveJobCount(),
readyQueue.size(),
scheduledJobs,
jobsTotal,
getRemainingJobCount()
));
return;
}
myLogger.debug(String.format("%s: state %s, completed: %d, active: %d, ready: %d, scheduled %d, total jobs: %d, remaining: %d, postQueue: %d",
title,
completedJobCount,
state,
getCompletedJobCount(),
getActiveJobCount(),
readyQueue.size(),
scheduledJobs,
jobsTotal,
getRemainingJobCount()
getRemainingJobCount(),
postback.postQueue.size()
));
}
@@ -1209,11 +1253,26 @@ public abstract class Broadcast
public int getCompletedJobCount()
{
return completedJob Count;
return jobsTotal - pendingJobs() - serviceActivity Count;
}
public String getBroadcastType() {
return broadcastType;
}
public PostbackThreadActionOnEmpty getPostbackThreadActionOnEmpty() {
logJobCount("getPostbackThreadActionOnEmpty");
if (state==BroadcastState.CANCELED || state==BroadcastState.ALLDONE) return PostbackThreadActionOnEmpty.STOP;
if (setState(BroadcastState.ALLDONE).stateChangeStatus == StateChangeStatus.SUCCESS) return PostbackThreadActionOnEmpty.CONTINUE;
return PostbackThreadActionOnEmpty.WAIT;
}
public boolean postbackThreadsShouldStop() {
if (state==BroadcastState.CANCELED || state==BroadcastState.ALLDONE) return true;
boolean shouldContinue = state==BroadcastState.COMPLETED && setState(BroadcastState.ALLDONE).stateChangeStatus == StateChangeStatus.SUCCESS;
return !shouldContinue;
//boolean doStop = setState(BroadcastState.ALLDONE).stateChangeStatus != StateChangeStatus.SUCCESS;
//return doStop;
}
}