diff --git a/src/main/java/altk/comm/engine/Broadcast.java b/src/main/java/altk/comm/engine/Broadcast.java index dd08221..15ff87b 100644 --- a/src/main/java/altk/comm/engine/Broadcast.java +++ b/src/main/java/altk/comm/engine/Broadcast.java @@ -76,14 +76,10 @@ public abstract class Broadcast private int jobsTotal; private int scheduledJobs; - protected int transactions; - protected int completedTransactions; - public static enum BroadcastState { ACCEPTED, RUNNING, -// ALLDONE, PAUSING, PAUSED, CANCELING, @@ -187,7 +183,6 @@ public abstract class Broadcast BroadcastState.PURGED, // User action BroadcastState.ABORTED, // Service provider irrecoverable error BroadcastState.EXPIRED -// BroadcastState.ALLDONE // Natural transition, if all ongoing calls complete and no more jobs in Dispatcher queues. )); // Transitions from CANCELING @@ -211,10 +206,6 @@ public abstract class Broadcast BroadcastState.CANCELING, // User action BroadcastState.PURGED // User action )); - // Transitions from ALLDONE - //toStates.put(BroadcastState.ALLDONE, Arrays.asList( - // BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more jobs in post queue - // )); } public static class StateChangeResult @@ -352,7 +343,6 @@ public abstract class Broadcast // Service the jobs try { - incrementTransactions(batch.size()); processJobs(batch, serviceProviderPeer, prerequisites); } catch (EngineException e) @@ -410,15 +400,6 @@ public abstract class Broadcast { return readyQueue.size() + scheduledJobs; } - - public void incrementTransactions(int size) { - transactions += size; - } - - public void incrementCompletedTransactions(int size) - { - completedTransactions += size; - } /** * Experimental formulation where it takes over directing @@ -1135,7 +1116,6 @@ public abstract class Broadcast // No more rescheduling on cancel, expire, or pause completedJobCount++; logJobCount("Completed a job"); - //if (allDone() && state==BroadcastState.RUNNING) setState(BroadcastState.ALLDONE); } else if (rescheduleTimeMS == 0) { @@ -1196,8 +1176,7 @@ public abstract class Broadcast case PAUSED: return pendingJobs(); default: - return 0; - + return 0; } } @@ -1210,18 +1189,4 @@ public abstract class Broadcast return broadcastType; } - public boolean allDone() { - return (completedJobCount == jobsTotal); - } -/* - public boolean stopOnEmptyPostQueue() - { - if (state == state.ALLDONE) return true; - // Add the ALLDONE message to postQueue - - setState(BroadcastState.ALLDONE); - // Do not stop as one report has just been added by the previous setState(). - return false; - } - */ } diff --git a/src/main/java/altk/comm/engine/Postback.java b/src/main/java/altk/comm/engine/Postback.java index 8ec4b29..3fca9e2 100644 --- a/src/main/java/altk/comm/engine/Postback.java +++ b/src/main/java/altk/comm/engine/Postback.java @@ -43,9 +43,6 @@ import org.apache.log4j.Logger; import org.w3c.dom.Document; import org.w3c.dom.Node; -import altk.comm.engine.Broadcast.BroadcastState; -import altk.comm.engine.CommonLogger; - /** * Queues JobReports to be posted back to attribute postBackURL. @@ -124,85 +121,12 @@ public class Postback List reportList = new ArrayList(); synchronized(postQueue) { - // Each iteration examines the queue for a batch to send - //for (;;) - //{ - //reportList = new ArrayList(); - for (int i = 0; i < maxBatchSize ; i++) - { - report = postQueue.poll(); - if (report == null) break; - reportList.add(report); - } - /* - if (reportList.size() > 0) - { - myLogger.debug(String.format("Extracted %d reports, reducing postQueue size: %d", reportList.size(), postQueue.size())); - postQueue.notifyAll(); - break; // break out to do the work. - }TODO Auto-generated catch block - - // No reports - BroadcastState toState = null; - String reason = null; - switch (broadcast.getState()) - { - case ALLDONE: - toState = BroadcastState.COMPLETED; - reason = "All posted"; - break; - case CANCELING: - if (broadcast.getActiveJobCount() > 0) break; - toState = BroadcastState.CANCELED; - reason = "User canceled"; - break; - case PAUSING: - if (broadcast.getActiveJobCount() > 0) break; - toState = BroadcastState.PAUSED; - reason = "User paused"; - break; - default: - } - if (toState != null) - { - if (toState.isFinal) - { - broadcast.terminate(toState, reason); - myLogger.info("All posted, thread terminating"); - return; - } - else - { - broadcast.setState(toState); - return; - } - } - if (broadcast.getState().isFinal) - { - // No more. Notify all waiting postback threads and exit thread - myLogger.info("All posted, thread terminating"); - postQueue.notifyAll(); - return; - } - - // Nothing to do, so wait a while, and look at the - // queue again. - - try - { - myLogger.debug("Going to wait " + QUEUE_WAIT * 1000); - postQueue.wait(); //QUEUE_WAIT * 1000); - } - catch (InterruptedException e) - { - CommonLogger.alarm.info("Postback queue interrupted while waiting: " + e); - break; - } - CommonLogger.health.info("Surfacing from wait"); - System.out.println(getName() + " surfacing from wait"); - continue; - */ - // } + for (int i = 0; i < maxBatchSize ; i++) + { + report = postQueue.poll(); + if (report == null) break; + reportList.add(report); + } } // synchronized() if (reportList.size() > 0) { @@ -213,14 +137,6 @@ public class Postback break; case SERVER_IO_ERROR: - /* Should not requeue report for this may lead to dead lock on this queu. - // TODO: Limit retries, using rate limiting. Posting can be recovered using the activity log. - // Re-queue these reports - for (String rpt : reportList) - { - queueReport(rpt); - } - */ // Sleep for a while before retrying this PostBack server. CommonLogger.alarm.warn("Caught server IO error. sleep for " + POSTBACK_SERVER_WAIT_TIME + " seconds"); try