diff --git a/src/altk/comm/engine/Broadcast.java b/src/altk/comm/engine/Broadcast.java
index a4c2ce9..68a2864 100644
--- a/src/altk/comm/engine/Broadcast.java
+++ b/src/altk/comm/engine/Broadcast.java
@@ -197,6 +197,7 @@ public abstract class Broadcast
String haltReason;
String stateErrorText;
public long changeStateTime;
+ private int completedJobCount = 0;
protected class Service extends Thread
{
@@ -578,10 +579,10 @@ public abstract class Broadcast
+ "");
}
}
- statusBf.append(" -1) statusBf.append(" active='" + activeCount + "'");
- statusBf.append(">" + topLevelTag + ">\r\n");
+ statusBf.append("" + topLevelTag + ">\r\n");
String statusReport = statusBf.toString();
return statusReport;
}
@@ -604,7 +605,7 @@ public abstract class Broadcast
*
* @return number of active jobs. -1 if there is no concept of being active.
*/
- private int getActiveCount()
+ protected int getActiveJobCount()
{
return remainingJobs - readyQueue.size();
}
@@ -677,7 +678,7 @@ public abstract class Broadcast
*/
protected void cancel()
{
- if (this.getActiveCount() == 0) setState(BroadcastState.CANCELED);
+ if (this.getActiveJobCount() == 0) setState(BroadcastState.CANCELED);
// Sets state to CANCELING, which is monitored by its Broadcast.Service threads.
else setState(BroadcastState.CANCELING);
synchronized(resumeFlag)
@@ -849,7 +850,7 @@ public abstract class Broadcast
{
terminate(BroadcastState.COMPLETED);
}
- else if (getActiveCount() == 0)
+ else if (getActiveJobCount() == 0)
{
if (state == BroadcastState.CANCELING) setState(BroadcastState.CANCELED);
else if (state == BroadcastState.HALTING) setState(BroadcastState.HALTED);
@@ -868,6 +869,10 @@ public abstract class Broadcast
protected void postJobStatus(Job job, long rescheduleTimeMS)
{
postJobStatus(job);
+ if (rescheduleTimeMS < 0)
+ {
+ completedJobCount ++;
+ }
if (rescheduleTimeMS == 0)
{
addJob(job);
@@ -889,5 +894,22 @@ public abstract class Broadcast
return state;
}
+ public int getReadyJobCount()
+ {
+ switch (state)
+ {
+ case RUNNING:
+ case HALTING:
+ case HALTED:
+ return readyQueue.size();
+ default:
+ return 0;
+
+ }
+ }
+ public int getCompletedJobCount()
+ {
+ return completedJobCount;
+ }
}
diff --git a/src/altk/comm/engine/CommEngine.java b/src/altk/comm/engine/CommEngine.java
index 7fd1746..e98d873 100644
--- a/src/altk/comm/engine/CommEngine.java
+++ b/src/altk/comm/engine/CommEngine.java
@@ -71,6 +71,8 @@ public abstract class CommEngine extends HttpServlet
private long deadBroadcastViewingMinutes;
+ private int completedJobCount = 0;
+
abstract protected Broadcast mkBroadcast();
public CommEngine(String engineName)
@@ -130,14 +132,18 @@ public abstract class CommEngine extends HttpServlet
protected void purgeStaleBroadcasts()
{
long now = System.currentTimeMillis();
- for (String id : broadcasts.keySet())
+ synchronized (broadcasts)
{
- if (broadcasts.get(id).changeStateTime - now > deadBroadcastViewingMinutes * 60 * 1000)
+ for (String id : broadcasts.keySet())
{
- broadcasts.remove(id);
+ if (broadcasts.get(id).changeStateTime - now > deadBroadcastViewingMinutes * 60 * 1000)
+ {
+ Broadcast broadcast = broadcasts.get(id);
+ completedJobCount += broadcast.getCompletedJobCount();
+ broadcasts.remove(id);
+ }
}
}
-
}
/**
@@ -387,24 +393,58 @@ public abstract class CommEngine extends HttpServlet
{
out.write(broadcast.mkStatusReport());
}
-
+ out.write("");
out.write("" + tag + ">");
}
}
- public void removeBroadcast(String broadcastId)
+ public int getReadyJobCount()
{
- CommonLogger.activity.info("Removing broadcast " + broadcastId);
+ int readyCount = 0;
synchronized(broadcasts)
{
- broadcasts.remove(broadcastId);
+ for (Broadcast broadcast : broadcasts.values())
+ {
+ readyCount += broadcast.getReadyJobCount();
+ }
}
+ return readyCount;
+
}
- protected int getRemainingJobCount()
+ public int getActiveJobCount()
{
- // TODO
- return 0;
+ int activeCount = 0;
+ synchronized(broadcasts)
+ {
+ for (Broadcast broadcast : broadcasts.values())
+ {
+ activeCount += broadcast.getActiveJobCount();
+ }
+ }
+ return activeCount;
+ }
+
+ public int getCompletedJobCount()
+ {
+ int additionalCompletedJobCount = 0;
+ synchronized(broadcasts)
+ {
+ for (Broadcast broadcast : broadcasts.values())
+ {
+ additionalCompletedJobCount += broadcast.getCompletedJobCount();
+ }
+ }
+ return completedJobCount + additionalCompletedJobCount;
+ }
+
+ public void removeBroadcast(String broadcastId)
+ {
+ CommonLogger.activity.info("Removing broadcast " + broadcastId);
+ synchronized(broadcasts)
+ {
+ broadcasts.remove(broadcastId);
+ }
}
public boolean notInService()