diff --git a/src/main/java/altk/comm/engine/Broadcast.java b/src/main/java/altk/comm/engine/Broadcast.java index 53739e1..dd08221 100644 --- a/src/main/java/altk/comm/engine/Broadcast.java +++ b/src/main/java/altk/comm/engine/Broadcast.java @@ -74,12 +74,16 @@ public abstract class Broadcast private ScheduledExecutorService scheduler; private int serviceThreadPoolSize_default; private int jobsTotal; + private int scheduledJobs; + + protected int transactions; + protected int completedTransactions; public static enum BroadcastState { ACCEPTED, RUNNING, - ALLDONE, +// ALLDONE, PAUSING, PAUSED, CANCELING, @@ -175,14 +179,15 @@ public abstract class Broadcast // Transitions from RUNNING toStates.put(BroadcastState.RUNNING, Arrays.asList( + BroadcastState.COMPLETED, // after completed all sends BroadcastState.CANCELING, // User action BroadcastState.CANCELED, // User action BroadcastState.PAUSING, // User action BroadcastState.PAUSED, // User action BroadcastState.PURGED, // User action BroadcastState.ABORTED, // Service provider irrecoverable error - BroadcastState.EXPIRED, - BroadcastState.ALLDONE // Natural transition, if all ongoing calls complete and no more jobs in Dispatcher queues. + BroadcastState.EXPIRED +// BroadcastState.ALLDONE // Natural transition, if all ongoing calls complete and no more jobs in Dispatcher queues. )); // Transitions from CANCELING @@ -207,9 +212,9 @@ public abstract class Broadcast BroadcastState.PURGED // User action )); // Transitions from ALLDONE - toStates.put(BroadcastState.ALLDONE, Arrays.asList( - BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more jobs in post queue - )); + //toStates.put(BroadcastState.ALLDONE, Arrays.asList( + // BroadcastState.COMPLETED // Natural transition, if all ongoing jobs complete and no more jobs in post queue + // )); } public static class StateChangeResult @@ -242,7 +247,7 @@ public abstract class Broadcast myLogger.info("Thread starting..."); for (;;) { - if (threadsShouldStop()) + if (serviceThreadsShouldStop()) { closeServiceProvider(serviceProviderPeer); myLogger.info("Thread terminating"); @@ -301,7 +306,7 @@ public abstract class Broadcast */ prerequisites = secureServicePrerequisites(); - if (threadsShouldStop() || threadsShouldPause()) + if (serviceThreadsShouldStop() || threadsShouldPause()) { returnPrerequisites(prerequisites); continue; @@ -347,6 +352,7 @@ public abstract class Broadcast // Service the jobs try { + incrementTransactions(batch.size()); processJobs(batch, serviceProviderPeer, prerequisites); } catch (EngineException e) @@ -399,7 +405,22 @@ public abstract class Broadcast receiveTime = System.currentTimeMillis(); } - /** + + public int pendingJobs() + { + return readyQueue.size() + scheduledJobs; + } + + public void incrementTransactions(int size) { + transactions += size; + } + + public void incrementCompletedTransactions(int size) + { + completedTransactions += size; + } + + /** * Experimental formulation where it takes over directing * the activity of a Broadcast, as it should, instead of relegating * it to CommEngine. This is directly invoked by CommEngine.doPost method, @@ -843,7 +864,7 @@ public abstract class Broadcast } } statusBf.append("\r\n"); String statusReport = statusBf.toString(); @@ -1001,16 +1022,25 @@ public abstract class Broadcast myLogger.error("Caught exception while waiting for a Service thread to terminate:" + e); } } - close(); + setState(BroadcastState.COMPLETED); + destroyResources(); + postback.wrapup(); + postback = null; myLogger.info("Broadcast " + getId() + " terminated"); } + /** + * Derived class destroy resources needed for providing service + */ + protected void destroyResources() {} + /** * Derived may release resources here. */ protected void close() { - postback.shutdownWhenDone(); + myLogger.debug("In close()");; + postback.wrapup(); postback = null; } @@ -1033,10 +1063,11 @@ public abstract class Broadcast } - private boolean threadsShouldStop() + private boolean serviceThreadsShouldStop() { return state == BroadcastState.CANCELING || - state == BroadcastState.CANCELED || state.isFinal; + state == BroadcastState.CANCELED || state.isFinal + || pendingJobs() == 0; } private boolean threadsShouldPause() @@ -1104,7 +1135,7 @@ public abstract class Broadcast // No more rescheduling on cancel, expire, or pause completedJobCount++; logJobCount("Completed a job"); - if (allDone() && state==BroadcastState.RUNNING) setState(BroadcastState.ALLDONE); + //if (allDone() && state==BroadcastState.RUNNING) setState(BroadcastState.ALLDONE); } else if (rescheduleTimeMS == 0) { @@ -1118,18 +1149,18 @@ public abstract class Broadcast } /** - * Logs effectiveJobCount, completedJobCount, readyQueue.size(), + * Logs completedJobCount, readyQueue.size(), * active job count, and total. * Job statistics are collected by length of readyQueue, completedJobCount, - * and effectiveJobCount. */ private void logJobCount(String title) { - myLogger.debug(String.format("%s: completed: %d, active: %d, ready: %d, total jobs: %d, remaining %d", + myLogger.debug(String.format("%s: completed: %d, active: %d, ready: %d, scheduled %d, total jobs: %d, remaining %d", title, completedJobCount, getActiveJobCount(), readyQueue.size(), + scheduledJobs, jobsTotal, getRemainingJobCount() )); @@ -1137,17 +1168,17 @@ public abstract class Broadcast /** * Number of jobs to be completed. - * Computed from effectiveJobCount and completedJobCount * @return */ private int getRemainingJobCount() { - return jobsTotal - completedJobCount; + return readyQueue.size() + scheduledJobs; } public ScheduledFuture rescheduleJob(final Job job, long rescheduleTimeMS) { - Runnable r = new Runnable() { public void run() { addJob(job);}}; + scheduledJobs++; + Runnable r = new Runnable() { public void run() { scheduledJobs--; addJob(job);}}; return scheduler.schedule(r, rescheduleTimeMS, TimeUnit.MILLISECONDS); } @@ -1156,14 +1187,14 @@ public abstract class Broadcast return state; } - public int getReadyJobCount() + public int getPendingJobCount() { switch (state) { case RUNNING: case PAUSING: case PAUSED: - return readyQueue.size(); + return pendingJobs(); default: return 0; @@ -1182,4 +1213,15 @@ public abstract class Broadcast public boolean allDone() { return (completedJobCount == jobsTotal); } +/* + public boolean stopOnEmptyPostQueue() + { + if (state == state.ALLDONE) return true; + // Add the ALLDONE message to postQueue + + setState(BroadcastState.ALLDONE); + // Do not stop as one report has just been added by the previous setState(). + return false; + } + */ } diff --git a/src/main/java/altk/comm/engine/CommEngine.java b/src/main/java/altk/comm/engine/CommEngine.java index f1ad4f2..da9b11f 100644 --- a/src/main/java/altk/comm/engine/CommEngine.java +++ b/src/main/java/altk/comm/engine/CommEngine.java @@ -402,19 +402,19 @@ public abstract class CommEngine extends HttpServlet { out.write(broadcast.mkStatusReport()); } - out.write(""); + out.write(""); out.write(""); } } - public int getReadyJobCount() + public int getPendingJobCount() { int readyCount = 0; synchronized(broadcasts) { for (Broadcast broadcast : broadcasts.values()) { - readyCount += broadcast.getReadyJobCount(); + readyCount += broadcast.getPendingJobCount(); } } return readyCount; diff --git a/src/main/java/altk/comm/engine/Postback.java b/src/main/java/altk/comm/engine/Postback.java index ef53986..8ec4b29 100644 --- a/src/main/java/altk/comm/engine/Postback.java +++ b/src/main/java/altk/comm/engine/Postback.java @@ -87,6 +87,10 @@ public class Postback private int maxRetries; private Broadcast broadcast; + + protected int postedTransactions; + + private boolean shutdownWhenDone; private static Logger myLogger = Logger.getLogger(Postback.class); @@ -100,7 +104,6 @@ public class Postback class Sender extends Thread { - private boolean threadShouldStop; private Sender(String name) { @@ -110,7 +113,6 @@ public class Postback public void run() { - threadShouldStop = false; myLogger.info(getName() + " started"); @@ -118,32 +120,27 @@ public class Postback for (;;) // Each iteration sends a batch { - if (threadShouldStop) - { - myLogger.info(getName() + " terminating"); - System.out.println(getName() + " terminating"); - return; - } myLogger.debug("Looking for reports"); - List reportList = null; + List reportList = new ArrayList(); synchronized(postQueue) { // Each iteration examines the queue for a batch to send - for (;;) - { - reportList = new ArrayList(); + //for (;;) + //{ + //reportList = new ArrayList(); for (int i = 0; i < maxBatchSize ; i++) { report = postQueue.poll(); if (report == null) break; reportList.add(report); } + /* if (reportList.size() > 0) { myLogger.debug(String.format("Extracted %d reports, reducing postQueue size: %d", reportList.size(), postQueue.size())); postQueue.notifyAll(); break; // break out to do the work. - } + }TODO Auto-generated catch block // No reports BroadcastState toState = null; @@ -187,13 +184,14 @@ public class Postback postQueue.notifyAll(); return; } + // Nothing to do, so wait a while, and look at the // queue again. try { myLogger.debug("Going to wait " + QUEUE_WAIT * 1000); - postQueue.wait(QUEUE_WAIT * 1000); + postQueue.wait(); //QUEUE_WAIT * 1000); } catch (InterruptedException e) { @@ -203,9 +201,10 @@ public class Postback CommonLogger.health.info("Surfacing from wait"); System.out.println(getName() + " surfacing from wait"); continue; - } + */ + // } } // synchronized() - if (reportList != null && reportList.size() > 0) + if (reportList.size() > 0) { switch (post(reportList)) { @@ -234,6 +233,27 @@ public class Postback } default: } + incrementPostedTransactions(reportList.size()); + } + else + { + // empty post queue + if (threadsShouldStop()) + { + myLogger.info(getName() + " terminating"); + System.out.println(getName() + " terminating"); + return; + } + synchronized (postQueue) + { + try + { + postQueue.wait(); + } catch (InterruptedException e) + { + // Do nothing + } + } } } } @@ -265,6 +285,7 @@ public class Postback httpPost.setEntity(requestEntity); myLogger.debug("Posting to " + postBackURL + ": " + xml); response = httpclient.execute(httpPost, new BasicHttpContext()); + incrementPostedTransactions(reportList.size()); StatusLine statusLine = response.getStatusLine(); int statusCode = statusLine.getStatusCode(); if (statusCode != 200) @@ -347,16 +368,6 @@ public class Postback return PostbackStatus.SUCCESS; } - public void terminate() - { - if (threadShouldStop) return; - - threadShouldStop = true; - - //Wait for at most 100 ms for thread to stop - interrupt(); - } - } /** @@ -434,7 +445,12 @@ public class Postback } } - /** + public void incrementPostedTransactions(int size) + { + postedTransactions += size; + } + + /** * Queues report to postQueue only if the queue size has not reached the * maxQueueSize. * @param report @@ -451,7 +467,7 @@ public class Postback { if (postQueue.size() < maxQueueSize) { - myLogger.debug("Queing report" + report); + myLogger.debug("Queueing report" + report); postQueue.add(report); myLogger.debug("Added 1 report - postQueue size: " + postQueue.size()); postQueue.notifyAll(); @@ -478,12 +494,22 @@ public class Postback return false; } - - public void shutdownWhenDone() + private boolean threadsShouldStop() { - + return (shutdownWhenDone); // && postedTransactions == broadcast.transactions); + } + + public void wrapup() + { + myLogger.debug("Wrapping up"); + shutdownWhenDone = true; + try { + synchronized (postQueue) + { + postQueue.notifyAll(); + } // Wait for all postback threads to terminate for (Sender sender : senderPool) {