Browse Source

Improved job counting and fixed bug of non-matching time units involved in rescheduling.

tags/Production_2016_05_28
ymlam 11 years ago
parent
commit
1853a905e5
1 changed files with 99 additions and 14 deletions
  1. +99
    -14
      src/altk/comm/engine/Broadcast.java

+ 99
- 14
src/altk/comm/engine/Broadcast.java View File

@@ -42,7 +42,19 @@ public abstract class Broadcast
String stateErrorText;
public final long receiveTime;
public long changeStateTime;
/**
* Count of jobs that are completed (excluding those that are
* being rescheduled).
*/
private int completedJobCount = 0;
/**
* Dynamically keeps count of the total number jobs scheduled
* in readyQueue. Initially it is set to be the size of the
* recipientList. Then as jobs are processed, and when one is
* to be repeated by re-adding it to the readyQueue, then this
* number is incremented by 1.
*/
private int effectiveJobCount = 0;
protected String activityRecordIdParamName;
private String jobReportRootNodeName;

@@ -67,7 +79,7 @@ public abstract class Broadcast
private List<Thread> serviceThreadPool;
private Object resumeFlag; // Semaphore for dispatcher threads to resume.
protected List<Recipient> recipientList;
private int remainingJobs;
//private int remainingJobs;
private ScheduledExecutorService scheduler;
private int serviceThreadPoolSize;
@@ -280,7 +292,8 @@ public abstract class Broadcast
{
return;
}
myLogger.debug("Found some jobs");
// Check if expired
if (System.currentTimeMillis() >= expireTime)
{
@@ -683,12 +696,16 @@ public abstract class Broadcast

/**
*
* @return number of active jobs. -1 if there is no concept of being active.
* @return number of active jobs, including those being
* rescheduled by a timer.
* Computed from effectiveJobCount, completedJobCount and readyQueue.size()
*/
protected int getActiveJobCount()
{
return remainingJobs - readyQueue.size();
return effectiveJobCount - completedJobCount - readyQueue.size();
}

/**
* Parses broadcastId and return if notInService is true.
@@ -725,7 +742,7 @@ public abstract class Broadcast
{
readyQueue.add(mkJob(recipient));
}
remainingJobs = readyQueue.size();
//remainingJobs = readyQueue.size();
}
protected abstract void initSync(EngineResources resources) throws BroadcastException;
@@ -816,6 +833,8 @@ public abstract class Broadcast
public void startProcessing() throws BroadcastException
{
effectiveJobCount = recipientList.size();
// Create service thread pool to dispatch jobs
myLogger.debug("At creating service threads, serviceThreadPoolSize = " + serviceThreadPoolSize);
for (int i = 0; i < serviceThreadPoolSize; i++)
@@ -864,7 +883,7 @@ public abstract class Broadcast
return 1;
}

/**
/**
* Sets jobStatus in job, and post job report.
* If jobStatus is final, and no rescheduling,
* then decrement number of remainingJobs,and increment completedJobCount.
@@ -874,6 +893,8 @@ public abstract class Broadcast
*/
public synchronized void postJobStatus(Job job)
{
postJobStatus(job, -1);
/*
if (postBack != null)
{
JobReport report = mkJobReport();
@@ -881,12 +902,12 @@ public abstract class Broadcast
report.init(job);
postBack.queueReport(report.toString());
}
if (job.jobStatus.isTerminal())
{
remainingJobs--;
completedJobCount++;
if (remainingJobs == 0)
{
terminate(BroadcastState.COMPLETED);
@@ -897,6 +918,7 @@ public abstract class Broadcast
else if (state == BroadcastState.PAUSING) setState(BroadcastState.PAUSED);
}
}
*/
}
/**
@@ -904,16 +926,50 @@ public abstract class Broadcast
* Optionally reschedules job.
* If no rescheduling, then decrement number of remainingJobs,
* @param job
* @param jobStatus
* @param errorText
* @param reschedule - reschedule time in milliseconds (-1 means do not reschedule).
* @param rescheduleTimeMS - reschedule time in milliseconds (-1 means do not reschedule).
*/
protected void postJobStatus(Job job, long rescheduleTimeMS)
{
postJobStatus(job);
if (rescheduleTimeMS == 0)
//postJobStatus(job);
logJobCount("Enering postJobStatus");
myLogger.debug(job.toString() + ": rescheduleTimeMS " + rescheduleTimeMS);
if (postBack != null)
{
JobReport report = mkJobReport();
report.initBase(job, broadcastId, launchRecordId, activityRecordIdParamName, jobReportRootNodeName);
report.init(job);
postBack.queueReport(report.toString());
}
//if (job.jobStatus.isTerminal())
if (rescheduleTimeMS < 0
// No more rescheduling on cancel, expire, or pause
|| state == BroadcastState.CANCELING
|| state == BroadcastState.CANCELED
|| state == BroadcastState.EXPIRED
|| state == BroadcastState.PAUSED
|| state == BroadcastState.PAUSING
)
{
//remainingJobs--;
completedJobCount++;
logJobCount("Completed a job");
if (getRemainingJobCount() == 0)
{
terminate(BroadcastState.COMPLETED);
}
else if (getActiveJobCount() == 0)
{
if (state == BroadcastState.CANCELING) setState(BroadcastState.CANCELED);
else if (state == BroadcastState.PAUSING) setState(BroadcastState.PAUSED);
}
}
else if (rescheduleTimeMS == 0)
{
addJob(job);
effectiveJobCount++;
logJobCount("Added a job to queue");
}
else if (rescheduleTimeMS > 0)
{
@@ -921,7 +977,36 @@ public abstract class Broadcast
}
}
public ScheduledFuture<?> rescheduleJob(final Job job, long rescheduleTimeMS)
/**
* Logs effectiveJobCount, completedJobCount, readyQueue.size(),
* active job count, and total which recipientList.size()
* Job statistics are collected by length of readyQueue, completedJobCount,
* and effectiveJobCount.
*/
private void logJobCount(String title)
{
myLogger.debug(String.format("%s: completed: %d, active: %d, ready: %d, total jobs: %d, remaining %d, effectiveJobCount %d",
title,
completedJobCount,
getActiveJobCount(),
readyQueue.size(),
recipientList.size(),
getRemainingJobCount(),
effectiveJobCount
));
}

/**
* Number of jobs to be completed.
* Computed from effectiveJobCount and completedJobCount
* @return
*/
private int getRemainingJobCount()
{
return effectiveJobCount - completedJobCount;
}

public ScheduledFuture<?> rescheduleJob(final Job job, long rescheduleTimeMS)
{
Runnable r = new Runnable() { public void run() { addJob(job);}};
return scheduler.schedule(r, rescheduleTimeMS, TimeUnit.MILLISECONDS);


Loading…
Cancel
Save